Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/boostorg/capy
8 : //
9 :
10 : #include <boost/capy/ex/thread_pool.hpp>
11 : #include <boost/capy/core/intrusive_queue.hpp>
12 : #include <condition_variable>
13 : #include <mutex>
14 : #include <stop_token>
15 : #include <thread>
16 : #include <vector>
17 :
18 : namespace boost {
19 : namespace capy {
20 :
21 : //------------------------------------------------------------------------------
22 :
23 : class thread_pool::impl
24 : {
25 : struct work : intrusive_queue<work>::node
26 : {
27 : any_coro h_;
28 :
29 120 : explicit work(any_coro h) noexcept
30 120 : : h_(h)
31 : {
32 120 : }
33 :
34 120 : void run()
35 : {
36 120 : auto h = h_;
37 120 : delete this;
38 120 : h.resume();
39 120 : }
40 :
41 0 : void destroy()
42 : {
43 0 : delete this;
44 0 : }
45 : };
46 :
47 : std::mutex mutex_;
48 : std::condition_variable_any cv_;
49 : intrusive_queue<work> q_;
50 : std::vector<std::jthread> threads_;
51 : std::size_t num_threads_;
52 : std::once_flag start_flag_;
53 :
54 : public:
55 49 : ~impl()
56 : {
57 49 : stop();
58 49 : threads_.clear();
59 :
60 49 : while(auto* w = q_.pop())
61 0 : w->destroy();
62 49 : }
63 :
64 : explicit
65 49 : impl(std::size_t num_threads)
66 49 : : num_threads_(num_threads)
67 : {
68 49 : if(num_threads_ == 0)
69 1 : num_threads_ = std::thread::hardware_concurrency();
70 49 : if(num_threads_ == 0)
71 0 : num_threads_ = 1;
72 49 : }
73 :
74 : void
75 120 : post(any_coro h)
76 : {
77 120 : ensure_started();
78 120 : auto* w = new work(h);
79 : {
80 120 : std::lock_guard<std::mutex> lock(mutex_);
81 120 : q_.push(w);
82 120 : }
83 120 : cv_.notify_one();
84 120 : }
85 :
86 : void
87 49 : stop() noexcept
88 : {
89 80 : for (auto& t : threads_)
90 31 : t.request_stop();
91 49 : cv_.notify_all();
92 49 : }
93 :
94 : private:
95 : void
96 120 : ensure_started()
97 : {
98 120 : std::call_once(start_flag_, [this]{
99 19 : threads_.reserve(num_threads_);
100 50 : for(std::size_t i = 0; i < num_threads_; ++i)
101 62 : threads_.emplace_back([this](std::stop_token st){ run(st); });
102 19 : });
103 120 : }
104 :
105 : void
106 31 : run(std::stop_token st)
107 : {
108 : for(;;)
109 : {
110 151 : work* w = nullptr;
111 : {
112 151 : std::unique_lock<std::mutex> lock(mutex_);
113 351 : if(!cv_.wait(lock, st, [this]{ return !q_.empty(); }))
114 62 : return;
115 120 : w = q_.pop();
116 151 : }
117 120 : w->run();
118 120 : }
119 : }
120 : };
121 :
122 : //------------------------------------------------------------------------------
123 :
124 49 : thread_pool::
125 : ~thread_pool()
126 : {
127 49 : shutdown();
128 49 : destroy();
129 49 : delete impl_;
130 49 : }
131 :
132 49 : thread_pool::
133 49 : thread_pool(std::size_t num_threads)
134 49 : : impl_(new impl(num_threads))
135 : {
136 49 : }
137 :
138 : void
139 0 : thread_pool::
140 : stop() noexcept
141 : {
142 0 : impl_->stop();
143 0 : }
144 :
145 : //------------------------------------------------------------------------------
146 :
147 : void
148 120 : thread_pool::executor_type::
149 : post(any_coro h) const
150 : {
151 120 : pool_->impl_->post(h);
152 120 : }
153 :
154 : } // capy
155 : } // boost
|