Coverage Report

Created: 2026-06-01 18:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/bitcoin/src/util/threadpool.h
Line
Count
Source
1
// Copyright (c) The Bitcoin Core developers
2
// Distributed under the MIT software license, see the accompanying
3
// file COPYING or https://www.opensource.org/licenses/mit-license.php.
4
5
#ifndef BITCOIN_UTIL_THREADPOOL_H
6
#define BITCOIN_UTIL_THREADPOOL_H
7
8
#include <sync.h>
9
#include <tinyformat.h>
10
#include <util/check.h>
11
#include <util/expected.h>
12
#include <util/thread.h>
13
14
#include <algorithm>
15
#include <condition_variable>
16
#include <functional>
17
#include <future>
18
#include <queue>
19
#include <ranges>
20
#include <thread>
21
#include <type_traits>
22
#include <utility>
23
#include <vector>
24
25
/**
26
 * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
27
 *
28
 * The thread pool maintains a set of worker threads that consume and execute
29
 * tasks submitted through Submit(). Once started, tasks can be queued and
30
 * processed asynchronously until Stop() is called.
31
 *
32
 * ### Thread-safety and lifecycle
33
 * - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
34
 *   Calling `Stop()` from a worker thread will deadlock, as it waits for all
35
 *   workers to join, including the current one.
36
 *
37
 * - `Submit()` can be called from any thread, including workers. It safely
38
 *   enqueues new work for execution as long as the pool has active workers.
39
 *
40
 * - `Interrupt()` stops new task submission and lets queued ones drain
41
 *   in the background. Callers can continue other shutdown steps and call
42
 *   Stop() at the end to ensure no remaining tasks are left to execute.
43
 *
44
 * - `Stop()` prevents further task submission and blocks until all the
45
 *   queued ones are completed.
46
 */
47
class ThreadPool
48
{
49
private:
50
    std::string m_name;
51
    Mutex m_mutex;
52
    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
53
    std::condition_variable m_cv;
54
    // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
55
    // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
56
    // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
57
    bool m_interrupt GUARDED_BY(m_mutex){false};
58
    std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
59
60
    void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
61
0
    {
62
0
        WAIT_LOCK(m_mutex, wait_lock);
63
952
        for (;;) {
64
952
            std::packaged_task<void()> task;
65
952
            {
66
                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
67
952
                if (!m_interrupt && m_work_queue.empty()) {
  Branch (67:21): [True: 661, False: 291]
  Branch (67:37): [True: 661, False: 0]
68
                    // Block until the pool is interrupted or a task is available.
69
2.54k
                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
  Branch (69:91): [True: 929, False: 1.61k]
  Branch (69:106): [True: 952, False: 661]
70
661
                }
71
72
                // If stopped and no work left, exit worker
73
1.22k
                if (m_interrupt && m_work_queue.empty()) {
  Branch (73:21): [True: 1.22k, False: 18.4E]
  Branch (73:36): [True: 1.22k, False: 0]
74
1.22k
                    return;
75
1.22k
                }
76
77
18.4E
                task = std::move(m_work_queue.front());
78
18.4E
                m_work_queue.pop();
79
18.4E
            }
80
81
0
            {
82
                // Execute the task without the lock
83
18.4E
                REVERSE_LOCK(wait_lock, m_mutex);
84
18.4E
                task();
85
18.4E
            }
86
18.4E
        }
87
0
    }
88
89
public:
90
0
    explicit ThreadPool(const std::string& name) : m_name(name) {}
91
92
    ~ThreadPool()
93
0
    {
94
0
        Stop(); // In case it hasn't been stopped.
95
0
    }
96
97
    /**
98
     * @brief Start worker threads.
99
     *
100
     * Creates and launches `num_workers` threads that begin executing tasks
101
     * from the queue. If the pool is already started, throws.
102
     *
103
     * Must be called from a controller (non-worker) thread.
104
     */
105
    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
106
0
    {
107
0
        assert(num_workers > 0);
  Branch (107:9): [True: 0, False: 0]
108
0
        LOCK(m_mutex);
109
0
        if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping");
  Branch (109:13): [True: 0, False: 0]
110
0
        if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
  Branch (110:13): [True: 0, False: 0]
111
112
        // Create workers
113
0
        m_workers.reserve(num_workers);
114
0
        for (int i = 0; i < num_workers; i++) {
  Branch (114:25): [True: 0, False: 0]
115
0
            m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
116
0
        }
117
0
    }
118
119
    /**
120
     * @brief Stop all worker threads and wait for them to exit.
121
     *
122
     * Sets the interrupt flag, wakes all waiting workers, and joins them.
123
     * Any remaining tasks in the queue will be processed before returning.
124
     *
125
     * Must be called from a controller (non-worker) thread.
126
     * Concurrent calls to Start() will be rejected while Stop() is in progress.
127
     */
128
    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
129
305
    {
130
        // Notify workers and join them
131
305
        std::vector<std::thread> threads_to_join;
132
305
        {
133
305
            LOCK(m_mutex);
134
            // Ensure Stop() is not called from a worker thread while workers are still registered,
135
            // otherwise a self-join deadlock would occur.
136
305
            auto id = std::this_thread::get_id();
137
305
            for (const auto& worker : m_workers) assert(worker.get_id() != id);
  Branch (137:37): [True: 1.22k, False: 305]
  Branch (137:50): [True: 1.22k, False: 0]
138
            // Early shutdown to return right away on any concurrent Submit() call
139
305
            m_interrupt = true;
140
305
            threads_to_join.swap(m_workers);
141
305
        }
142
0
        m_cv.notify_all();
143
        // Help draining queue
144
305
        while (ProcessTask()) {}
  Branch (144:16): [True: 0, False: 305]
145
        // Free resources
146
1.22k
        for (auto& worker : threads_to_join) worker.join();
  Branch (146:27): [True: 1.22k, False: 305]
147
148
        // Since we currently wait for tasks completion, sanity-check empty queue
149
305
        LOCK(m_mutex);
150
305
        Assume(m_work_queue.empty());
151
        // Re-allow Start() now that all workers have exited
152
305
        m_interrupt = false;
153
305
    }
154
155
    enum class SubmitError {
156
        Inactive,
157
        Interrupted,
158
    };
159
160
    template <class F>
161
    using Future = std::future<std::invoke_result_t<F>>;
162
163
    template <class R>
164
    using RangeFuture = Future<std::ranges::range_reference_t<R>>;
165
166
    template <class F>
167
    using PackagedTask = std::packaged_task<std::invoke_result_t<F>()>;
168
169
    /**
170
     * @brief Enqueues a new task for asynchronous execution.
171
     *
172
     * @param  fn Callable to execute asynchronously.
173
     * @return On success, a future containing fn's result.
174
     *         On failure, an error indicating why the task was rejected:
175
     *         - SubmitError::Inactive: Pool has no workers (never started or already stopped).
176
     *         - SubmitError::Interrupted: Pool task acceptance has been interrupted.
177
     *
178
     * Thread-safe: Can be called from any thread, including within the provided 'fn' callable.
179
     *
180
     * @warning Ignoring the returned future requires guarding the task against
181
     *          uncaught exceptions, as they would otherwise be silently discarded.
182
     */
183
    template <class F>
184
    [[nodiscard]] util::Expected<Future<F>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
185
952
    {
186
952
        PackagedTask<F> task{std::forward<F>(fn)};
187
952
        auto future{task.get_future()};
188
952
        {
189
952
            LOCK(m_mutex);
190
952
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
  Branch (190:17): [True: 0, False: 952]
191
952
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
  Branch (191:17): [True: 0, False: 952]
192
193
952
            m_work_queue.emplace(std::move(task));
194
952
        }
195
0
        m_cv.notify_one();
196
952
        return {std::move(future)};
197
952
    }
198
199
    /**
200
     * @brief Enqueues a range of tasks for asynchronous execution.
201
     *
202
     * @param  fns Callables to execute asynchronously.
203
     * @return On success, a vector of futures containing each element of fns's result in order.
204
     *         On failure, an error indicating why the range was rejected:
205
     *         - SubmitError::Inactive: Pool has no workers (never started or already stopped).
206
     *         - SubmitError::Interrupted: Pool task acceptance has been interrupted.
207
     *
208
     * This is more efficient when submitting many tasks at once, since
209
     * the queue lock is only taken once internally and all worker threads are
210
     * notified. For single tasks, Submit() is preferred since only one worker
211
     * thread is notified.
212
     *
213
     * Thread-safe: Can be called from any thread, including within submitted callables.
214
     *
215
     * @warning Ignoring the returned futures requires guarding tasks against
216
     *          uncaught exceptions, as they would otherwise be silently discarded.
217
     */
218
    template <std::ranges::sized_range R>
219
        requires(!std::is_lvalue_reference_v<R>)
220
    [[nodiscard]] util::Expected<std::vector<RangeFuture<R>>, SubmitError> Submit(R&& fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
221
    {
222
        std::vector<RangeFuture<R>> futures;
223
        futures.reserve(std::ranges::size(fns));
224
225
        {
226
            LOCK(m_mutex);
227
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
228
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
229
            for (auto&& fn : fns) {
230
                PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)};
231
                futures.emplace_back(task.get_future());
232
                m_work_queue.emplace(std::move(task));
233
            }
234
        }
235
        m_cv.notify_all();
236
        return {std::move(futures)};
237
    }
238
239
    /**
240
     * @brief Execute a single queued task synchronously.
241
     * Removes one task from the queue and executes it on the calling thread.
242
     */
243
    bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
244
305
    {
245
305
        std::packaged_task<void()> task;
246
305
        {
247
305
            LOCK(m_mutex);
248
305
            if (m_work_queue.empty()) return false;
  Branch (248:17): [True: 305, False: 0]
249
250
            // Pop the task
251
0
            task = std::move(m_work_queue.front());
252
0
            m_work_queue.pop();
253
0
        }
254
0
        task();
255
0
        return true;
256
305
    }
257
258
    /**
259
     * @brief Stop accepting new tasks and begin asynchronous shutdown.
260
     *
261
     * Wakes all worker threads so they can drain the queue and exit.
262
     * Unlike Stop(), this function does not wait for threads to finish.
263
     *
264
     * Note: The next step in the pool lifecycle is calling Stop(), which
265
     *       releases any dangling resources and resets the pool state
266
     *       for shutdown or restart.
267
     */
268
    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
269
305
    {
270
305
        WITH_LOCK(m_mutex, m_interrupt = true);
271
305
        m_cv.notify_all();
272
305
    }
273
274
    size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
275
952
    {
276
952
        return WITH_LOCK(m_mutex, return m_work_queue.size());
277
952
    }
278
279
    size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
280
0
    {
281
0
        return WITH_LOCK(m_mutex, return m_workers.size());
282
0
    }
283
};
284
285
0
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
286
0
    switch (err) {
  Branch (286:13): [True: 0, False: 0]
287
0
        case ThreadPool::SubmitError::Inactive:
  Branch (287:9): [True: 0, False: 0]
288
0
            return "No active workers";
289
0
        case ThreadPool::SubmitError::Interrupted:
  Branch (289:9): [True: 0, False: 0]
290
0
            return "Interrupted";
291
0
    }
292
0
    Assume(false); // Unreachable
293
0
    return "Unknown error";
294
0
}
295
296
#endif // BITCOIN_UTIL_THREADPOOL_H