LCOV - code coverage report
Current view: top level - libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 88.2 % 68 60
Test Date: 2026-01-20 10:35:38 Functions: 87.5 % 16 14

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/boostorg/capy
       8              : //
       9              : 
      10              : #include <boost/capy/ex/thread_pool.hpp>
      11              : #include <boost/capy/core/intrusive_queue.hpp>
      12              : #include <condition_variable>
      13              : #include <mutex>
      14              : #include <stop_token>
      15              : #include <thread>
      16              : #include <vector>
      17              : 
      18              : namespace boost {
      19              : namespace capy {
      20              : 
      21              : //------------------------------------------------------------------------------
      22              : 
      23              : class thread_pool::impl
      24              : {
      25              :     struct work : intrusive_queue<work>::node
      26              :     {
      27              :         any_coro h_;
      28              : 
      29          120 :         explicit work(any_coro h) noexcept
      30          120 :             : h_(h)
      31              :         {
      32          120 :         }
      33              : 
      34          120 :         void run()
      35              :         {
      36          120 :             auto h = h_;
      37          120 :             delete this;
      38          120 :             h.resume();
      39          120 :         }
      40              : 
      41            0 :         void destroy()
      42              :         {
      43            0 :             delete this;
      44            0 :         }
      45              :     };
      46              : 
      47              :     std::mutex mutex_;
      48              :     std::condition_variable_any cv_;
      49              :     intrusive_queue<work> q_;
      50              :     std::vector<std::jthread> threads_;
      51              :     std::size_t num_threads_;
      52              :     std::once_flag start_flag_;
      53              : 
      54              : public:
      55           49 :     ~impl()
      56              :     {
      57           49 :         stop();
      58           49 :         threads_.clear();
      59              : 
      60           49 :         while(auto* w = q_.pop())
      61            0 :             w->destroy();
      62           49 :     }
      63              : 
      64              :     explicit
      65           49 :     impl(std::size_t num_threads)
      66           49 :         : num_threads_(num_threads)
      67              :     {
      68           49 :         if(num_threads_ == 0)
      69            1 :             num_threads_ = std::thread::hardware_concurrency();
      70           49 :         if(num_threads_ == 0)
      71            0 :             num_threads_ = 1;
      72           49 :     }
      73              : 
      74              :     void
      75          120 :     post(any_coro h)
      76              :     {
      77          120 :         ensure_started();
      78          120 :         auto* w = new work(h);
      79              :         {
      80          120 :             std::lock_guard<std::mutex> lock(mutex_);
      81          120 :             q_.push(w);
      82          120 :         }
      83          120 :         cv_.notify_one();
      84          120 :     }
      85              : 
      86              :     void
      87           49 :     stop() noexcept
      88              :     {
      89           80 :         for (auto& t : threads_)
      90           31 :             t.request_stop();
      91           49 :         cv_.notify_all();
      92           49 :     }
      93              : 
      94              : private:
      95              :     void
      96          120 :     ensure_started()
      97              :     {
      98          120 :         std::call_once(start_flag_, [this]{
      99           19 :             threads_.reserve(num_threads_);
     100           50 :             for(std::size_t i = 0; i < num_threads_; ++i)
     101           62 :                 threads_.emplace_back([this](std::stop_token st){ run(st); });
     102           19 :         });
     103          120 :     }
     104              : 
     105              :     void
     106           31 :     run(std::stop_token st)
     107              :     {
     108              :         for(;;)
     109              :         {
     110          151 :             work* w = nullptr;
     111              :             {
     112          151 :                 std::unique_lock<std::mutex> lock(mutex_);
     113          351 :                 if(!cv_.wait(lock, st, [this]{ return !q_.empty(); }))
     114           62 :                     return;
     115          120 :                 w = q_.pop();
     116          151 :             }
     117          120 :             w->run();
     118          120 :         }
     119              :     }
     120              : };
     121              : 
     122              : //------------------------------------------------------------------------------
     123              : 
     124           49 : thread_pool::
     125              : ~thread_pool()
     126              : {
     127           49 :     shutdown();
     128           49 :     destroy();
     129           49 :     delete impl_;
     130           49 : }
     131              : 
     132           49 : thread_pool::
     133           49 : thread_pool(std::size_t num_threads)
     134           49 :     : impl_(new impl(num_threads))
     135              : {
     136           49 : }
     137              : 
     138              : void
     139            0 : thread_pool::
     140              : stop() noexcept
     141              : {
     142            0 :     impl_->stop();
     143            0 : }
     144              : 
     145              : //------------------------------------------------------------------------------
     146              : 
     147              : void
     148          120 : thread_pool::executor_type::
     149              : post(any_coro h) const
     150              : {
     151          120 :     pool_->impl_->post(h);
     152          120 : }
     153              : 
     154              : } // capy
     155              : } // boost
        

Generated by: LCOV version 2.3