src/ex/thread_pool.cpp

100.0% Lines (119/119) 100.0% Functions (24/24)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/detail/intrusive.hpp>
13 #include <boost/capy/test/thread_name.hpp>
14 #include <algorithm>
15 #include <atomic>
16 #include <condition_variable>
17 #include <cstdio>
18 #include <mutex>
19 #include <thread>
20 #include <vector>
21
22 /*
23 Thread pool implementation using a shared work queue.
24
25 Work items are coroutine handles wrapped in intrusive list nodes, stored
26 in a single queue protected by a mutex. Worker threads wait on a
27 condition_variable until work is available or stop is requested.
28
29 Threads are started lazily on first post() via std::call_once to avoid
30 spawning threads for pools that are constructed but never used. Each
31 thread is named with a configurable prefix plus index for debugger
32 visibility.
33
34 Work tracking: on_work_started/on_work_finished maintain an atomic
35 outstanding_work_ counter. join() blocks until this counter reaches
36 zero, then signals workers to stop and joins threads.
37
38 Two shutdown paths:
39 - join(): waits for outstanding work to drain, then stops workers.
40 - stop(): immediately signals workers to exit; queued work is abandoned.
41 - Destructor: stop() then join() (abandon + wait for threads).
42 */
43
44 namespace boost {
45 namespace capy {
46
47 //------------------------------------------------------------------------------
48
49 class thread_pool::impl
50 {
51 struct work : detail::intrusive_queue<work>::node
52 {
53 std::coroutine_handle<> h_;
54
55 784x explicit work(std::coroutine_handle<> h) noexcept
56 784x : h_(h)
57 {
58 784x }
59
60 594x void run()
61 {
62 594x auto h = h_;
63 594x delete this;
64 594x h.resume();
65 594x }
66
67 190x void destroy()
68 {
69 190x auto h = h_;
70 190x delete this;
71 190x if(h && h != std::noop_coroutine())
72 138x h.destroy();
73 190x }
74 };
75
76 std::mutex mutex_;
77 std::condition_variable cv_;
78 detail::intrusive_queue<work> q_;
79 std::vector<std::thread> threads_;
80 std::atomic<std::size_t> outstanding_work_{0};
81 bool stop_{false};
82 bool joined_{false};
83 std::size_t num_threads_;
84 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
85 std::once_flag start_flag_;
86
87 public:
88 149x ~impl()
89 {
90 339x while(auto* w = q_.pop())
91 190x w->destroy();
92 149x }
93
94 149x impl(std::size_t num_threads, std::string_view thread_name_prefix)
95 149x : num_threads_(num_threads)
96 {
97 149x if(num_threads_ == 0)
98 4x num_threads_ = std::max(
99 2x std::thread::hardware_concurrency(), 1u);
100
101 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
102 149x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
103 149x thread_name_prefix_[n] = '\0';
104 149x }
105
106 void
107 784x post(std::coroutine_handle<> h)
108 {
109 784x ensure_started();
110 784x auto* w = new work(h);
111 {
112 784x std::lock_guard<std::mutex> lock(mutex_);
113 784x q_.push(w);
114 784x }
115 784x cv_.notify_one();
116 784x }
117
118 void
119 328x on_work_started() noexcept
120 {
121 328x outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
122 328x }
123
124 void
125 328x on_work_finished() noexcept
126 {
127 328x if(outstanding_work_.fetch_sub(
128 328x 1, std::memory_order_acq_rel) == 1)
129 {
130 77x std::lock_guard<std::mutex> lock(mutex_);
131 77x if(joined_ && !stop_)
132 4x stop_ = true;
133 77x cv_.notify_all();
134 77x }
135 328x }
136
137 void
138 159x join() noexcept
139 {
140 {
141 159x std::unique_lock<std::mutex> lock(mutex_);
142 159x if(joined_)
143 10x return;
144 149x joined_ = true;
145
146 149x if(outstanding_work_.load(
147 149x std::memory_order_acquire) == 0)
148 {
149 92x stop_ = true;
150 92x cv_.notify_all();
151 }
152 else
153 {
154 57x cv_.wait(lock, [this]{
155 62x return stop_;
156 });
157 }
158 159x }
159
160 312x for(auto& t : threads_)
161 163x if(t.joinable())
162 163x t.join();
163 }
164
165 void
166 151x stop() noexcept
167 {
168 {
169 151x std::lock_guard<std::mutex> lock(mutex_);
170 151x stop_ = true;
171 151x }
172 151x cv_.notify_all();
173 151x }
174
175 private:
176 void
177 784x ensure_started()
178 {
179 784x std::call_once(start_flag_, [this]{
180 93x threads_.reserve(num_threads_);
181 256x for(std::size_t i = 0; i < num_threads_; ++i)
182 326x threads_.emplace_back([this, i]{ run(i); });
183 93x });
184 784x }
185
186 void
187 163x run(std::size_t index)
188 {
189 // Build name; set_current_thread_name truncates to platform limits.
190 char name[16];
191 163x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
192 163x set_current_thread_name(name);
193
194 for(;;)
195 {
196 757x work* w = nullptr;
197 {
198 757x std::unique_lock<std::mutex> lock(mutex_);
199 757x cv_.wait(lock, [this]{
200 1216x return !q_.empty() ||
201 1216x stop_;
202 });
203 757x if(stop_)
204 326x return;
205 594x w = q_.pop();
206 757x }
207 594x if(w)
208 594x w->run();
209 594x }
210 }
211 };
212
213 //------------------------------------------------------------------------------
214
215 149x thread_pool::
216 ~thread_pool()
217 {
218 149x impl_->stop();
219 149x impl_->join();
220 149x shutdown();
221 149x destroy();
222 149x delete impl_;
223 149x }
224
225 149x thread_pool::
226 149x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
227 149x : impl_(new impl(num_threads, thread_name_prefix))
228 {
229 149x this->set_frame_allocator(std::allocator<void>{});
230 149x }
231
232 void
233 10x thread_pool::
234 join() noexcept
235 {
236 10x impl_->join();
237 10x }
238
239 void
240 2x thread_pool::
241 stop() noexcept
242 {
243 2x impl_->stop();
244 2x }
245
246 //------------------------------------------------------------------------------
247
248 thread_pool::executor_type
249 145x thread_pool::
250 get_executor() const noexcept
251 {
252 145x return executor_type(
253 145x const_cast<thread_pool&>(*this));
254 }
255
256 void
257 328x thread_pool::executor_type::
258 on_work_started() const noexcept
259 {
260 328x pool_->impl_->on_work_started();
261 328x }
262
263 void
264 328x thread_pool::executor_type::
265 on_work_finished() const noexcept
266 {
267 328x pool_->impl_->on_work_finished();
268 328x }
269
270 void
271 784x thread_pool::executor_type::
272 post(std::coroutine_handle<> h) const
273 {
274 784x pool_->impl_->post(h);
275 784x }
276
277 } // capy
278 } // boost
279