17class OPENDHT_PUBLIC ThreadPool
20 static ThreadPool& computation();
21 static ThreadPool& io();
24 ThreadPool(
unsigned minThreads,
unsigned maxThreads = 0);
27 void run(std::function<
void()>&& cb);
30 std::future<T> get(std::function<T()>&& cb)
32 auto ret = std::make_shared<std::promise<T>>();
33 run([cb = std::move(cb), ret]()
mutable {
38 ret->set_exception(std::current_exception());
43 return ret->get_future();
46 std::shared_future<T> getShared(std::function<T()>&& cb)
48 return get(std::move(cb));
51 void stop(
bool wait =
true);
57 std::condition_variable cv_ {};
58 std::queue<std::function<void()>> tasks_ {};
59 std::vector<std::unique_ptr<std::thread>> threads_;
60 unsigned readyThreads_ {0};
64 const unsigned maxThreads_;
65 std::chrono::steady_clock::duration threadExpirationDelay {std::chrono::minutes(5)};
66 double threadDelayRatio_ {2};
68 void threadEnded(std::thread&);
71class OPENDHT_PUBLIC Executor :
public std::enable_shared_from_this<Executor>
74 Executor(
ThreadPool& pool,
unsigned maxConcurrent = 1)
76 , maxConcurrent_(maxConcurrent)
79 void run(std::function<
void()>&& task);
82 std::reference_wrapper<ThreadPool> threadPool_;
83 const unsigned maxConcurrent_ {1};
85 unsigned current_ {0};
86 std::queue<std::function<void()>> tasks_ {};
88 void run_(std::function<
void()>&& task);
92class OPENDHT_PUBLIC ExecutionContext
97 , state_(std::make_shared<SharedState>())
100 ~ExecutionContext() { state_->destroy(); }
103 void stop() { state_->destroy(
false); }
105 void run(std::function<
void()>&& task)
107 std::lock_guard<std::mutex> lock(state_->mtx);
108 if (state_->shutdown_)
110 state_->pendingTasks++;
111 threadPool_.get().run([task = std::move(task), state = state_] { state->run(task); });
118 std::condition_variable cv {};
119 unsigned pendingTasks {0};
120 unsigned ongoingTasks {0};
122 bool shutdown_ {
false};
124 std::atomic_bool destroyed {
false};
126 void destroy(
bool wait =
true)
128 std::unique_lock<std::mutex> lock(mtx);
132 cv.wait(lock, [
this] {
return pendingTasks == 0 && ongoingTasks == 0; });
136 cv.wait(lock, [
this] {
return ongoingTasks == 0; });
141 void run(
const std::function<
void()>& task)
144 std::lock_guard<std::mutex> lock(mtx);
152 std::lock_guard<std::mutex> lock(mtx);
158 std::reference_wrapper<ThreadPool> threadPool_;
159 std::shared_ptr<SharedState> state_;