1111#include < boost/capy/ex/thread_pool.hpp>
1212#include < boost/capy/detail/intrusive.hpp>
1313#include < boost/capy/detail/thread_name.hpp>
14+ #include < atomic>
1415#include < condition_variable>
1516#include < cstdio>
1617#include < mutex>
17- #include < stop_token>
1818#include < thread>
1919#include < vector>
2020
2323
2424 Work items are coroutine handles wrapped in intrusive list nodes, stored
2525 in a single queue protected by a mutex. Worker threads wait on a
26- condition_variable_any that integrates with std::stop_token for clean
27- shutdown.
26+ condition_variable until work is available or stop is requested.
2827
2928 Threads are started lazily on first post() via std::call_once to avoid
3029 spawning threads for pools that are constructed but never used. Each
3130 thread is named with a configurable prefix plus index for debugger
3231 visibility.
3332
34- Shutdown sequence: stop() requests all threads to stop via their stop
35- tokens, then the destructor joins threads and destroys any remaining
36- queued work without executing it.
33+ Shutdown sequence: stop() sets the stop flag and notifies all threads,
34+ then the destructor joins threads and destroys any remaining queued
35+ work without executing it.
3736*/
3837
3938namespace boost {
@@ -66,9 +65,10 @@ class thread_pool::impl
6665 };
6766
6867 std::mutex mutex_;
69- std::condition_variable_any cv_;
68+ std::condition_variable cv_;
7069 detail::intrusive_queue<work> q_;
71- std::vector<std::jthread> threads_;
70+ std::vector<std::thread> threads_;
71+ std::atomic<bool > stop_{false };
7272 std::size_t num_threads_;
7373 char thread_name_prefix_[13 ]{}; // 12 chars max + null terminator
7474 std::once_flag start_flag_;
@@ -77,7 +77,9 @@ class thread_pool::impl
7777 ~impl ()
7878 {
7979 stop ();
80- threads_.clear ();
80+ for (auto & t : threads_)
81+ if (t.joinable ())
82+ t.join ();
8183
8284 while (auto * w = q_.pop ())
8385 w->destroy ();
@@ -111,8 +113,7 @@ class thread_pool::impl
111113 void
112114 stop () noexcept
113115 {
114- for (auto & t : threads_)
115- t.request_stop ();
116+ stop_.store (true , std::memory_order_release);
116117 cv_.notify_all ();
117118 }
118119
@@ -123,13 +124,12 @@ class thread_pool::impl
123124 std::call_once (start_flag_, [this ]{
124125 threads_.reserve (num_threads_);
125126 for (std::size_t i = 0 ; i < num_threads_; ++i)
126- threads_.emplace_back (
127- [this , i](std::stop_token st){ run (st, i); });
127+ threads_.emplace_back ([this , i]{ run (i); });
128128 });
129129 }
130130
131131 void
132- run (std::stop_token st, std:: size_t index)
132+ run (std::size_t index)
133133 {
134134 // Build name; set_current_thread_name truncates to platform limits.
135135 char name[16 ];
@@ -141,11 +141,16 @@ class thread_pool::impl
141141 work* w = nullptr ;
142142 {
143143 std::unique_lock<std::mutex> lock (mutex_);
144- if (!cv_.wait (lock, st, [this ]{ return !q_.empty (); }))
144+ cv_.wait (lock, [this ]{
145+ return !q_.empty () ||
146+ stop_.load (std::memory_order_acquire);
147+ });
148+ if (stop_.load (std::memory_order_acquire) && q_.empty ())
145149 return ;
146150 w = q_.pop ();
147151 }
148- w->run ();
152+ if (w)
153+ w->run ();
149154 }
150155 }
151156};
0 commit comments