/root/bitcoin/src/util/threadpool.h
Line | Count | Source |
1 | | // Copyright (c) The Bitcoin Core developers |
2 | | // Distributed under the MIT software license, see the accompanying |
3 | | // file COPYING or https://www.opensource.org/licenses/mit-license.php. |
4 | | |
5 | | #ifndef BITCOIN_UTIL_THREADPOOL_H |
6 | | #define BITCOIN_UTIL_THREADPOOL_H |
7 | | |
8 | | #include <sync.h> |
9 | | #include <tinyformat.h> |
10 | | #include <util/check.h> |
11 | | #include <util/expected.h> |
12 | | #include <util/thread.h> |
13 | | |
14 | | #include <algorithm> |
15 | | #include <condition_variable> |
16 | | #include <functional> |
17 | | #include <future> |
18 | | #include <queue> |
19 | | #include <ranges> |
20 | | #include <thread> |
21 | | #include <type_traits> |
22 | | #include <utility> |
23 | | #include <vector> |
24 | | |
25 | | /** |
26 | | * @brief Fixed-size thread pool for running arbitrary tasks concurrently. |
27 | | * |
28 | | * The thread pool maintains a set of worker threads that consume and execute |
29 | | * tasks submitted through Submit(). Once started, tasks can be queued and |
30 | | * processed asynchronously until Stop() is called. |
31 | | * |
32 | | * ### Thread-safety and lifecycle |
33 | | * - `Start()` and `Stop()` must be called from a controller (non-worker) thread. |
34 | | * Calling `Stop()` from a worker thread will deadlock, as it waits for all |
35 | | * workers to join, including the current one. |
36 | | * |
37 | | * - `Submit()` can be called from any thread, including workers. It safely |
38 | | * enqueues new work for execution as long as the pool has active workers. |
39 | | * |
40 | | * - `Interrupt()` stops new task submission and lets queued ones drain |
41 | | * in the background. Callers can continue other shutdown steps and call |
42 | | * Stop() at the end to ensure no remaining tasks are left to execute. |
43 | | * |
44 | | * - `Stop()` prevents further task submission and blocks until all the |
45 | | * queued ones are completed. |
46 | | */ |
47 | | class ThreadPool |
48 | | { |
49 | | private: |
50 | | std::string m_name; |
51 | | Mutex m_mutex; |
52 | | std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex); |
53 | | std::condition_variable m_cv; |
54 | | // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool. |
55 | | // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals. |
56 | | // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable |
57 | | bool m_interrupt GUARDED_BY(m_mutex){false}; |
58 | | std::vector<std::thread> m_workers GUARDED_BY(m_mutex); |
59 | | |
60 | | void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
61 | 0 | { |
62 | 0 | WAIT_LOCK(m_mutex, wait_lock); Line | Count | Source | 272 | 0 | #define WAIT_LOCK(cs, name) UniqueLock name(LOCK_ARGS(cs)) Line | Count | Source | 270 | 0 | #define LOCK_ARGS(cs) MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__ |
|
|
63 | 0 | for (;;) { |
64 | 0 | std::packaged_task<void()> task; |
65 | 0 | { |
66 | | // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one. |
67 | 0 | if (!m_interrupt && m_work_queue.empty()) { |
68 | | // Block until the pool is interrupted or a task is available. |
69 | 0 | m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); }); |
70 | 0 | } |
71 | | |
72 | | // If stopped and no work left, exit worker |
73 | 0 | if (m_interrupt && m_work_queue.empty()) { |
74 | 0 | return; |
75 | 0 | } |
76 | | |
77 | 0 | task = std::move(m_work_queue.front()); |
78 | 0 | m_work_queue.pop(); |
79 | 0 | } |
80 | | |
81 | 0 | { |
82 | | // Execute the task without the lock |
83 | 0 | REVERSE_LOCK(wait_lock, m_mutex); Line | Count | Source | 252 | 0 | #define REVERSE_LOCK(g, cs) typename std::decay<decltype(g)>::type::reverse_lock UNIQUE_NAME(revlock)(g, cs, #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 0 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 0 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 0 | #define PASTE(x, y) x ## y |
|
|
|
|
84 | 0 | task(); |
85 | 0 | } |
86 | 0 | } |
87 | 0 | } |
88 | | |
89 | | public: |
90 | 0 | explicit ThreadPool(const std::string& name) : m_name(name) {} |
91 | | |
92 | | ~ThreadPool() |
93 | 0 | { |
94 | 0 | Stop(); // In case it hasn't been stopped. |
95 | 0 | } |
96 | | |
97 | | /** |
98 | | * @brief Start worker threads. |
99 | | * |
100 | | * Creates and launches `num_workers` threads that begin executing tasks |
101 | | * from the queue. If the pool is already started, throws. |
102 | | * |
103 | | * Must be called from a controller (non-worker) thread. |
104 | | */ |
105 | | void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
106 | 0 | { |
107 | 0 | assert(num_workers > 0); |
108 | 0 | LOCK(m_mutex); Line | Count | Source | 266 | 0 | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 0 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 0 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 0 | #define PASTE(x, y) x ## y |
|
|
|
|
109 | 0 | if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping"); |
110 | 0 | if (!m_workers.empty()) throw std::runtime_error("Thread pool already started"); |
111 | | |
112 | | // Create workers |
113 | 0 | m_workers.reserve(num_workers); |
114 | 0 | for (int i = 0; i < num_workers; i++) { |
115 | 0 | m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });Line | Count | Source | 1172 | 0 | #define strprintf tfm::format |
|
116 | 0 | } |
117 | 0 | } |
118 | | |
119 | | /** |
120 | | * @brief Stop all worker threads and wait for them to exit. |
121 | | * |
122 | | * Sets the interrupt flag, wakes all waiting workers, and joins them. |
123 | | * Any remaining tasks in the queue will be processed before returning. |
124 | | * |
125 | | * Must be called from a controller (non-worker) thread. |
126 | | * Concurrent calls to Start() will be rejected while Stop() is in progress. |
127 | | */ |
128 | | void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
129 | 0 | { |
130 | | // Notify workers and join them |
131 | 0 | std::vector<std::thread> threads_to_join; |
132 | 0 | { |
133 | 0 | LOCK(m_mutex); Line | Count | Source | 266 | 0 | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 0 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 0 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 0 | #define PASTE(x, y) x ## y |
|
|
|
|
134 | | // Ensure Stop() is not called from a worker thread while workers are still registered, |
135 | | // otherwise a self-join deadlock would occur. |
136 | 0 | auto id = std::this_thread::get_id(); |
137 | 0 | for (const auto& worker : m_workers) assert(worker.get_id() != id); |
138 | | // Early shutdown to return right away on any concurrent Submit() call |
139 | 0 | m_interrupt = true; |
140 | 0 | threads_to_join.swap(m_workers); |
141 | 0 | } |
142 | 0 | m_cv.notify_all(); |
143 | | // Help draining queue |
144 | 0 | while (ProcessTask()) {} |
145 | | // Free resources |
146 | 0 | for (auto& worker : threads_to_join) worker.join(); |
147 | | |
148 | | // Since we currently wait for tasks completion, sanity-check empty queue |
149 | 0 | LOCK(m_mutex); Line | Count | Source | 266 | 0 | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 0 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 0 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 0 | #define PASTE(x, y) x ## y |
|
|
|
|
150 | 0 | Assume(m_work_queue.empty()); Line | Count | Source | 125 | 0 | #define Assume(val) inline_assertion_check<false>(val, std::source_location::current(), #val) |
|
151 | | // Re-allow Start() now that all workers have exited |
152 | 0 | m_interrupt = false; |
153 | 0 | } |
154 | | |
155 | | enum class SubmitError { |
156 | | Inactive, |
157 | | Interrupted, |
158 | | }; |
159 | | |
160 | | template <class F> |
161 | | using Future = std::future<std::invoke_result_t<F>>; |
162 | | |
163 | | template <class R> |
164 | | using RangeFuture = Future<std::ranges::range_reference_t<R>>; |
165 | | |
166 | | template <class F> |
167 | | using PackagedTask = std::packaged_task<std::invoke_result_t<F>()>; |
168 | | |
169 | | /** |
170 | | * @brief Enqueues a new task for asynchronous execution. |
171 | | * |
172 | | * @param fn Callable to execute asynchronously. |
173 | | * @return On success, a future containing fn's result. |
174 | | * On failure, an error indicating why the task was rejected: |
175 | | * - SubmitError::Inactive: Pool has no workers (never started or already stopped). |
176 | | * - SubmitError::Interrupted: Pool task acceptance has been interrupted. |
177 | | * |
178 | | * Thread-safe: Can be called from any thread, including within the provided 'fn' callable. |
179 | | * |
180 | | * @warning Ignoring the returned future requires guarding the task against |
181 | | * uncaught exceptions, as they would otherwise be silently discarded. |
182 | | */ |
183 | | template <class F> |
184 | | [[nodiscard]] util::Expected<Future<F>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
185 | 0 | { |
186 | 0 | PackagedTask<F> task{std::forward<F>(fn)}; |
187 | 0 | auto future{task.get_future()}; |
188 | 0 | { |
189 | 0 | LOCK(m_mutex); Line | Count | Source | 266 | 0 | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 0 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 0 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 0 | #define PASTE(x, y) x ## y |
|
|
|
| LOCK(m_mutex); Line | Count | Source | 266 | 0 | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 0 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 0 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 0 | #define PASTE(x, y) x ## y |
|
|
|
| LOCK(m_mutex); Line | Count | Source | 266 | 0 | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 0 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 0 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 0 | #define PASTE(x, y) x ## y |
|
|
|
|
190 | 0 | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; |
191 | 0 | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; |
192 | | |
193 | 0 | m_work_queue.emplace(std::move(task)); |
194 | 0 | } |
195 | 0 | m_cv.notify_one(); |
196 | 0 | return {std::move(future)}; |
197 | 0 | } Unexecuted instantiation: util::Expected<std::future<std::invoke_result<ThrowTask>::type>, ThreadPool::SubmitError> ThreadPool::Submit<ThrowTask>(ThrowTask&&) Unexecuted instantiation: util::Expected<std::future<std::invoke_result<CounterTask>::type>, ThreadPool::SubmitError> ThreadPool::Submit<CounterTask>(CounterTask&&) Unexecuted instantiation: httpserver.cpp:util::Expected<std::future<std::invoke_result<MaybeDispatchRequestToWorker(std::shared_ptr<http_bitcoin::HTTPRequest>)::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<MaybeDispatchRequestToWorker(std::shared_ptr<http_bitcoin::HTTPRequest>)::$_0>(MaybeDispatchRequestToWorker(std::shared_ptr<http_bitcoin::HTTPRequest>)::$_0&&) |
198 | | |
199 | | /** |
200 | | * @brief Enqueues a range of tasks for asynchronous execution. |
201 | | * |
202 | | * @param fns Callables to execute asynchronously. |
203 | | * @return On success, a vector of futures containing each element of fns's result in order. |
204 | | * On failure, an error indicating why the range was rejected: |
205 | | * - SubmitError::Inactive: Pool has no workers (never started or already stopped). |
206 | | * - SubmitError::Interrupted: Pool task acceptance has been interrupted. |
207 | | * |
208 | | * This is more efficient when submitting many tasks at once, since |
209 | | * the queue lock is only taken once internally and all worker threads are |
210 | | * notified. For single tasks, Submit() is preferred since only one worker |
211 | | * thread is notified. |
212 | | * |
213 | | * Thread-safe: Can be called from any thread, including within submitted callables. |
214 | | * |
215 | | * @warning Ignoring the returned futures requires guarding tasks against |
216 | | * uncaught exceptions, as they would otherwise be silently discarded. |
217 | | */ |
218 | | template <std::ranges::sized_range R> |
219 | | requires(!std::is_lvalue_reference_v<R>) |
220 | | [[nodiscard]] util::Expected<std::vector<RangeFuture<R>>, SubmitError> Submit(R&& fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
221 | | { |
222 | | std::vector<RangeFuture<R>> futures; |
223 | | futures.reserve(std::ranges::size(fns)); |
224 | | |
225 | | { |
226 | | LOCK(m_mutex); |
227 | | if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive}; |
228 | | if (m_interrupt) return util::Unexpected{SubmitError::Interrupted}; |
229 | | for (auto&& fn : fns) { |
230 | | PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)}; |
231 | | futures.emplace_back(task.get_future()); |
232 | | m_work_queue.emplace(std::move(task)); |
233 | | } |
234 | | } |
235 | | m_cv.notify_all(); |
236 | | return {std::move(futures)}; |
237 | | } |
238 | | |
239 | | /** |
240 | | * @brief Execute a single queued task synchronously. |
241 | | * Removes one task from the queue and executes it on the calling thread. |
242 | | */ |
243 | | bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
244 | 0 | { |
245 | 0 | std::packaged_task<void()> task; |
246 | 0 | { |
247 | 0 | LOCK(m_mutex); Line | Count | Source | 266 | 0 | #define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__) Line | Count | Source | 11 | 0 | #define UNIQUE_NAME(name) PASTE2(name, __COUNTER__) Line | Count | Source | 9 | 0 | #define PASTE2(x, y) PASTE(x, y) Line | Count | Source | 8 | 0 | #define PASTE(x, y) x ## y |
|
|
|
|
248 | 0 | if (m_work_queue.empty()) return false; |
249 | | |
250 | | // Pop the task |
251 | 0 | task = std::move(m_work_queue.front()); |
252 | 0 | m_work_queue.pop(); |
253 | 0 | } |
254 | 0 | task(); |
255 | 0 | return true; |
256 | 0 | } |
257 | | |
258 | | /** |
259 | | * @brief Stop accepting new tasks and begin asynchronous shutdown. |
260 | | * |
261 | | * Wakes all worker threads so they can drain the queue and exit. |
262 | | * Unlike Stop(), this function does not wait for threads to finish. |
263 | | * |
264 | | * Note: The next step in the pool lifecycle is calling Stop(), which |
265 | | * releases any dangling resources and resets the pool state |
266 | | * for shutdown or restart. |
267 | | */ |
268 | | void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
269 | 0 | { |
270 | 0 | WITH_LOCK(m_mutex, m_interrupt = true); Line | Count | Source | 297 | 0 | #define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }()) |
|
271 | 0 | m_cv.notify_all(); |
272 | 0 | } |
273 | | |
274 | | size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
275 | 0 | { |
276 | 0 | return WITH_LOCK(m_mutex, return m_work_queue.size()); Line | Count | Source | 297 | 0 | #define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }()) |
|
277 | 0 | } |
278 | | |
279 | | size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) |
280 | 0 | { |
281 | 0 | return WITH_LOCK(m_mutex, return m_workers.size()); Line | Count | Source | 297 | 0 | #define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }()) |
|
282 | 0 | } |
283 | | }; |
284 | | |
285 | 0 | constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept { |
286 | 0 | switch (err) { |
287 | 0 | case ThreadPool::SubmitError::Inactive: |
288 | 0 | return "No active workers"; |
289 | 0 | case ThreadPool::SubmitError::Interrupted: |
290 | 0 | return "Interrupted"; |
291 | 0 | } |
292 | 0 | Assume(false); // Unreachable Line | Count | Source | 125 | 0 | #define Assume(val) inline_assertion_check<false>(val, std::source_location::current(), #val) |
|
293 | 0 | return "Unknown error"; |
294 | 0 | } |
295 | | |
296 | | #endif // BITCOIN_UTIL_THREADPOOL_H |