Bitcoin Core Fuzz Coverage Report

Coverage Report

Created: 2026-03-24 13:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/bitcoin/src/util/threadpool.h
Line
Count
Source
1
// Copyright (c) The Bitcoin Core developers
2
// Distributed under the MIT software license, see the accompanying
3
// file COPYING or https://www.opensource.org/licenses/mit-license.php.
4
5
#ifndef BITCOIN_UTIL_THREADPOOL_H
6
#define BITCOIN_UTIL_THREADPOOL_H
7
8
#include <sync.h>
9
#include <tinyformat.h>
10
#include <util/check.h>
11
#include <util/expected.h>
12
#include <util/thread.h>
13
14
#include <algorithm>
15
#include <condition_variable>
16
#include <functional>
17
#include <future>
18
#include <queue>
19
#include <ranges>
20
#include <thread>
21
#include <type_traits>
22
#include <utility>
23
#include <vector>
24
25
/**
26
 * @brief Fixed-size thread pool for running arbitrary tasks concurrently.
27
 *
28
 * The thread pool maintains a set of worker threads that consume and execute
29
 * tasks submitted through Submit(). Once started, tasks can be queued and
30
 * processed asynchronously until Stop() is called.
31
 *
32
 * ### Thread-safety and lifecycle
33
 * - `Start()` and `Stop()` must be called from a controller (non-worker) thread.
34
 *   Calling `Stop()` from a worker thread will deadlock, as it waits for all
35
 *   workers to join, including the current one.
36
 *
37
 * - `Submit()` can be called from any thread, including workers. It safely
38
 *   enqueues new work for execution as long as the pool has active workers.
39
 *
40
 * - `Interrupt()` stops new task submission and lets queued ones drain
41
 *   in the background. Callers can continue other shutdown steps and call
42
 *   Stop() at the end to ensure no remaining tasks are left to execute.
43
 *
44
 * - `Stop()` prevents further task submission and blocks until all the
45
 *   queued ones are completed.
46
 */
47
class ThreadPool
48
{
49
private:
50
    std::string m_name;
51
    Mutex m_mutex;
52
    std::queue<std::packaged_task<void()>> m_work_queue GUARDED_BY(m_mutex);
53
    std::condition_variable m_cv;
54
    // Note: m_interrupt must be guarded by m_mutex, and cannot be replaced by an unguarded atomic bool.
55
    // This ensures threads blocked on m_cv reliably observe the change and proceed correctly without missing signals.
56
    // Ref: https://en.cppreference.com/w/cpp/thread/condition_variable
57
    bool m_interrupt GUARDED_BY(m_mutex){false};
58
    std::vector<std::thread> m_workers GUARDED_BY(m_mutex);
59
60
    void WorkerThread() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
61
0
    {
62
0
        WAIT_LOCK(m_mutex, wait_lock);
Line
Count
Source
272
0
#define WAIT_LOCK(cs, name) UniqueLock name(LOCK_ARGS(cs))
Line
Count
Source
270
0
#define LOCK_ARGS(cs) MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__
63
0
        for (;;) {
64
0
            std::packaged_task<void()> task;
65
0
            {
66
                // Wait only if needed; avoid sleeping when a new task was submitted while we were processing another one.
67
0
                if (!m_interrupt && m_work_queue.empty()) {
68
                    // Block until the pool is interrupted or a task is available.
69
0
                    m_cv.wait(wait_lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) { return m_interrupt || !m_work_queue.empty(); });
70
0
                }
71
72
                // If stopped and no work left, exit worker
73
0
                if (m_interrupt && m_work_queue.empty()) {
74
0
                    return;
75
0
                }
76
77
0
                task = std::move(m_work_queue.front());
78
0
                m_work_queue.pop();
79
0
            }
80
81
0
            {
82
                // Execute the task without the lock
83
0
                REVERSE_LOCK(wait_lock, m_mutex);
Line
Count
Source
252
0
#define REVERSE_LOCK(g, cs) typename std::decay<decltype(g)>::type::reverse_lock UNIQUE_NAME(revlock)(g, cs, #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
84
0
                task();
85
0
            }
86
0
        }
87
0
    }
88
89
public:
90
0
    explicit ThreadPool(const std::string& name) : m_name(name) {}
91
92
    ~ThreadPool()
93
0
    {
94
0
        Stop(); // In case it hasn't been stopped.
95
0
    }
96
97
    /**
98
     * @brief Start worker threads.
99
     *
100
     * Creates and launches `num_workers` threads that begin executing tasks
101
     * from the queue. If the pool is already started, throws.
102
     *
103
     * Must be called from a controller (non-worker) thread.
104
     */
105
    void Start(int num_workers) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
106
0
    {
107
0
        assert(num_workers > 0);
108
0
        LOCK(m_mutex);
Line
Count
Source
266
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
109
0
        if (m_interrupt) throw std::runtime_error("Thread pool has been interrupted or is stopping");
110
0
        if (!m_workers.empty()) throw std::runtime_error("Thread pool already started");
111
112
        // Create workers
113
0
        m_workers.reserve(num_workers);
114
0
        for (int i = 0; i < num_workers; i++) {
115
0
            m_workers.emplace_back(&util::TraceThread, strprintf("%s_pool_%d", m_name, i), [this] { WorkerThread(); });
Line
Count
Source
1172
0
#define strprintf tfm::format
116
0
        }
117
0
    }
118
119
    /**
120
     * @brief Stop all worker threads and wait for them to exit.
121
     *
122
     * Sets the interrupt flag, wakes all waiting workers, and joins them.
123
     * Any remaining tasks in the queue will be processed before returning.
124
     *
125
     * Must be called from a controller (non-worker) thread.
126
     * Concurrent calls to Start() will be rejected while Stop() is in progress.
127
     */
128
    void Stop() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
129
0
    {
130
        // Notify workers and join them
131
0
        std::vector<std::thread> threads_to_join;
132
0
        {
133
0
            LOCK(m_mutex);
Line
Count
Source
266
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
134
            // Ensure Stop() is not called from a worker thread while workers are still registered,
135
            // otherwise a self-join deadlock would occur.
136
0
            auto id = std::this_thread::get_id();
137
0
            for (const auto& worker : m_workers) assert(worker.get_id() != id);
138
            // Early shutdown to return right away on any concurrent Submit() call
139
0
            m_interrupt = true;
140
0
            threads_to_join.swap(m_workers);
141
0
        }
142
0
        m_cv.notify_all();
143
        // Help draining queue
144
0
        while (ProcessTask()) {}
145
        // Free resources
146
0
        for (auto& worker : threads_to_join) worker.join();
147
148
        // Since we currently wait for tasks completion, sanity-check empty queue
149
0
        LOCK(m_mutex);
Line
Count
Source
266
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
150
0
        Assume(m_work_queue.empty());
Line
Count
Source
125
0
#define Assume(val) inline_assertion_check<false>(val, std::source_location::current(), #val)
151
        // Re-allow Start() now that all workers have exited
152
0
        m_interrupt = false;
153
0
    }
154
155
    enum class SubmitError {
156
        Inactive,
157
        Interrupted,
158
    };
159
160
    template <class F>
161
    using Future = std::future<std::invoke_result_t<F>>;
162
163
    template <class R>
164
    using RangeFuture = Future<std::ranges::range_reference_t<R>>;
165
166
    template <class F>
167
    using PackagedTask = std::packaged_task<std::invoke_result_t<F>()>;
168
169
    /**
170
     * @brief Enqueues a new task for asynchronous execution.
171
     *
172
     * @param  fn Callable to execute asynchronously.
173
     * @return On success, a future containing fn's result.
174
     *         On failure, an error indicating why the task was rejected:
175
     *         - SubmitError::Inactive: Pool has no workers (never started or already stopped).
176
     *         - SubmitError::Interrupted: Pool task acceptance has been interrupted.
177
     *
178
     * Thread-safe: Can be called from any thread, including within the provided 'fn' callable.
179
     *
180
     * @warning Ignoring the returned future requires guarding the task against
181
     *          uncaught exceptions, as they would otherwise be silently discarded.
182
     */
183
    template <class F>
184
    [[nodiscard]] util::Expected<Future<F>, SubmitError> Submit(F&& fn) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
185
0
    {
186
0
        PackagedTask<F> task{std::forward<F>(fn)};
187
0
        auto future{task.get_future()};
188
0
        {
189
0
            LOCK(m_mutex);
Line
Count
Source
266
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
            LOCK(m_mutex);
Line
Count
Source
266
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
            LOCK(m_mutex);
Line
Count
Source
266
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
190
0
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
191
0
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
192
193
0
            m_work_queue.emplace(std::move(task));
194
0
        }
195
0
        m_cv.notify_one();
196
0
        return {std::move(future)};
197
0
    }
Unexecuted instantiation: util::Expected<std::future<std::invoke_result<ThrowTask>::type>, ThreadPool::SubmitError> ThreadPool::Submit<ThrowTask>(ThrowTask&&)
Unexecuted instantiation: util::Expected<std::future<std::invoke_result<CounterTask>::type>, ThreadPool::SubmitError> ThreadPool::Submit<CounterTask>(CounterTask&&)
Unexecuted instantiation: httpserver.cpp:util::Expected<std::future<std::invoke_result<MaybeDispatchRequestToWorker(std::shared_ptr<http_bitcoin::HTTPRequest>)::$_0>::type>, ThreadPool::SubmitError> ThreadPool::Submit<MaybeDispatchRequestToWorker(std::shared_ptr<http_bitcoin::HTTPRequest>)::$_0>(MaybeDispatchRequestToWorker(std::shared_ptr<http_bitcoin::HTTPRequest>)::$_0&&)
198
199
    /**
200
     * @brief Enqueues a range of tasks for asynchronous execution.
201
     *
202
     * @param  fns Callables to execute asynchronously.
203
     * @return On success, a vector of futures containing each element of fns's result in order.
204
     *         On failure, an error indicating why the range was rejected:
205
     *         - SubmitError::Inactive: Pool has no workers (never started or already stopped).
206
     *         - SubmitError::Interrupted: Pool task acceptance has been interrupted.
207
     *
208
     * This is more efficient when submitting many tasks at once, since
209
     * the queue lock is only taken once internally and all worker threads are
210
     * notified. For single tasks, Submit() is preferred since only one worker
211
     * thread is notified.
212
     *
213
     * Thread-safe: Can be called from any thread, including within submitted callables.
214
     *
215
     * @warning Ignoring the returned futures requires guarding tasks against
216
     *          uncaught exceptions, as they would otherwise be silently discarded.
217
     */
218
    template <std::ranges::sized_range R>
219
        requires(!std::is_lvalue_reference_v<R>)
220
    [[nodiscard]] util::Expected<std::vector<RangeFuture<R>>, SubmitError> Submit(R&& fns) noexcept EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
221
    {
222
        std::vector<RangeFuture<R>> futures;
223
        futures.reserve(std::ranges::size(fns));
224
225
        {
226
            LOCK(m_mutex);
227
            if (m_workers.empty()) return util::Unexpected{SubmitError::Inactive};
228
            if (m_interrupt) return util::Unexpected{SubmitError::Interrupted};
229
            for (auto&& fn : fns) {
230
                PackagedTask<std::ranges::range_reference_t<R>> task{std::move(fn)};
231
                futures.emplace_back(task.get_future());
232
                m_work_queue.emplace(std::move(task));
233
            }
234
        }
235
        m_cv.notify_all();
236
        return {std::move(futures)};
237
    }
238
239
    /**
240
     * @brief Execute a single queued task synchronously.
241
     * Removes one task from the queue and executes it on the calling thread.
242
     */
243
    bool ProcessTask() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
244
0
    {
245
0
        std::packaged_task<void()> task;
246
0
        {
247
0
            LOCK(m_mutex);
Line
Count
Source
266
0
#define LOCK(cs) UniqueLock UNIQUE_NAME(criticalblock)(MaybeCheckNotHeld(cs), #cs, __FILE__, __LINE__)
Line
Count
Source
11
0
#define UNIQUE_NAME(name) PASTE2(name, __COUNTER__)
Line
Count
Source
9
0
#define PASTE2(x, y) PASTE(x, y)
Line
Count
Source
8
0
#define PASTE(x, y) x ## y
248
0
            if (m_work_queue.empty()) return false;
249
250
            // Pop the task
251
0
            task = std::move(m_work_queue.front());
252
0
            m_work_queue.pop();
253
0
        }
254
0
        task();
255
0
        return true;
256
0
    }
257
258
    /**
259
     * @brief Stop accepting new tasks and begin asynchronous shutdown.
260
     *
261
     * Wakes all worker threads so they can drain the queue and exit.
262
     * Unlike Stop(), this function does not wait for threads to finish.
263
     *
264
     * Note: The next step in the pool lifecycle is calling Stop(), which
265
     *       releases any dangling resources and resets the pool state
266
     *       for shutdown or restart.
267
     */
268
    void Interrupt() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
269
0
    {
270
0
        WITH_LOCK(m_mutex, m_interrupt = true);
Line
Count
Source
297
0
#define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }())
271
0
        m_cv.notify_all();
272
0
    }
273
274
    size_t WorkQueueSize() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
275
0
    {
276
0
        return WITH_LOCK(m_mutex, return m_work_queue.size());
Line
Count
Source
297
0
#define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }())
277
0
    }
278
279
    size_t WorkersCount() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
280
0
    {
281
0
        return WITH_LOCK(m_mutex, return m_workers.size());
Line
Count
Source
297
0
#define WITH_LOCK(cs, code) (MaybeCheckNotHeld(cs), [&]() -> decltype(auto) { LOCK(cs); code; }())
282
0
    }
283
};
284
285
0
constexpr std::string_view SubmitErrorString(const ThreadPool::SubmitError err) noexcept {
286
0
    switch (err) {
287
0
        case ThreadPool::SubmitError::Inactive:
288
0
            return "No active workers";
289
0
        case ThreadPool::SubmitError::Interrupted:
290
0
            return "Interrupted";
291
0
    }
292
0
    Assume(false); // Unreachable
Line
Count
Source
125
0
#define Assume(val) inline_assertion_check<false>(val, std::source_location::current(), #val)
293
0
    return "Unknown error";
294
0
}
295
296
#endif // BITCOIN_UTIL_THREADPOOL_H