My Project 3.7.1
C++ Distributed Hash Table
Loading...
Searching...
No Matches
thread_pool.h
1// Copyright (c) 2014-2026 Savoir-faire Linux Inc.
2// SPDX-License-Identifier: MIT
3#pragma once
4
5#include "def.h"
6
7#include <condition_variable>
8#include <vector>
9#include <queue>
10#include <future>
11#include <functional>
12
13#include <ciso646> // fix windows compiler bug
14
15namespace dht {
16
17class OPENDHT_PUBLIC ThreadPool
18{
19public:
20 static ThreadPool& computation();
21 static ThreadPool& io();
22
23 ThreadPool();
24 ThreadPool(unsigned minThreads, unsigned maxThreads = 0);
25 ~ThreadPool();
26
27 void run(std::function<void()>&& cb);
28
29 template<class T>
30 std::future<T> get(std::function<T()>&& cb)
31 {
32 auto ret = std::make_shared<std::promise<T>>();
33 run([cb = std::move(cb), ret]() mutable {
34 try {
35 ret->set_value(cb());
36 } catch (...) {
37 try {
38 ret->set_exception(std::current_exception());
39 } catch (...) {
40 }
41 }
42 });
43 return ret->get_future();
44 }
45 template<class T>
46 std::shared_future<T> getShared(std::function<T()>&& cb)
47 {
48 return get(std::move(cb));
49 }
50
51 void stop(bool wait = true);
52 void join();
53 void detach();
54
55private:
56 std::mutex lock_ {};
57 std::condition_variable cv_ {};
58 std::queue<std::function<void()>> tasks_ {};
59 std::vector<std::unique_ptr<std::thread>> threads_;
60 unsigned readyThreads_ {0};
61 bool running_ {true};
62
63 unsigned minThreads_;
64 const unsigned maxThreads_;
65 std::chrono::steady_clock::duration threadExpirationDelay {std::chrono::minutes(5)};
66 double threadDelayRatio_ {2};
67
68 void threadEnded(std::thread&);
69};
70
71class OPENDHT_PUBLIC Executor : public std::enable_shared_from_this<Executor>
72{
73public:
74 Executor(ThreadPool& pool, unsigned maxConcurrent = 1)
75 : threadPool_(pool)
76 , maxConcurrent_(maxConcurrent)
77 {}
78
79 void run(std::function<void()>&& task);
80
81private:
82 std::reference_wrapper<ThreadPool> threadPool_;
83 const unsigned maxConcurrent_ {1};
84 std::mutex lock_ {};
85 unsigned current_ {0};
86 std::queue<std::function<void()>> tasks_ {};
87
88 void run_(std::function<void()>&& task);
89 void schedule();
90};
91
92class OPENDHT_PUBLIC ExecutionContext
93{
94public:
95 ExecutionContext(ThreadPool& pool)
96 : threadPool_(pool)
97 , state_(std::make_shared<SharedState>())
98 {}
99
100 ~ExecutionContext() { state_->destroy(); }
101
103 void stop() { state_->destroy(false); }
104
105 void run(std::function<void()>&& task)
106 {
107 std::lock_guard<std::mutex> lock(state_->mtx);
108 if (state_->shutdown_)
109 return;
110 state_->pendingTasks++;
111 threadPool_.get().run([task = std::move(task), state = state_] { state->run(task); });
112 }
113
114private:
115 struct SharedState
116 {
117 std::mutex mtx {};
118 std::condition_variable cv {};
119 unsigned pendingTasks {0};
120 unsigned ongoingTasks {0};
122 bool shutdown_ {false};
124 std::atomic_bool destroyed {false};
125
126 void destroy(bool wait = true)
127 {
128 std::unique_lock<std::mutex> lock(mtx);
129 if (destroyed)
130 return;
131 if (wait) {
132 cv.wait(lock, [this] { return pendingTasks == 0 && ongoingTasks == 0; });
133 }
134 shutdown_ = true;
135 if (not wait) {
136 cv.wait(lock, [this] { return ongoingTasks == 0; });
137 }
138 destroyed = true;
139 }
140
141 void run(const std::function<void()>& task)
142 {
143 {
144 std::lock_guard<std::mutex> lock(mtx);
145 pendingTasks--;
146 ongoingTasks++;
147 }
148 if (destroyed)
149 return;
150 task();
151 {
152 std::lock_guard<std::mutex> lock(mtx);
153 ongoingTasks--;
154 cv.notify_all();
155 }
156 }
157 };
158 std::reference_wrapper<ThreadPool> threadPool_;
159 std::shared_ptr<SharedState> state_;
160};
161
162} // namespace dht