asio::io_service 和 thread_group 生命周期问题

Posted

技术标签:

【中文标题】asio::io_service 和 thread_group 生命周期问题【英文标题】:asio::io_service and thread_group lifecycle issue 【发布时间】:2016-03-22 03:11:06 【问题描述】:

看看answers like this one,我们可以做这样的事情:

boost::asio::io_service ioservice;
boost::thread_group threadpool;

    boost::asio::io_service::work work(ioService);
    threadpool.create_thread(boost::bind(&boost::asio::io_service::run, ioService));
    threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));

threadpool.join_all();

但是,就我而言,我想做类似的事情:

while (condition)

    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));
    ioService.post(boost::bind(...));
    threadpool.join_all();

    // DO SOMETHING WITH RESULTS

但是,boost::asio::io_service::work work(ioService) 行不合适,据我所知,如果不重新创建池中的每个线程,我无法重新创建它。

在我的代码中,线程创建开销似乎可以忽略不计(实际上比以前基于互斥锁的代码性能更好),但有没有更简洁的方法来做到这一点?

【问题讨论】:

【参考方案1】:
while (condition)

    //... stuff
    threadpool.join_all();

    //... 

没有任何意义,因为您只能加入一次线程。一旦加入,它们就消失了。您不想一直启动新线程(使用线程池 + 任务队列¹)。

由于您不想真正停止线程,因此您可能不想破坏工作。如果您坚持,shared_ptr<work>optional<work> 效果很好(只需 my_work.reset() 它)

¹ 更新建议:

带有任务队列的简单thread_pool:(在boost thread throwing exception "thread_resource_error: resource temporarily unavailable"中) 基于io_service 本身的队列(使用work)c++ work queues with blocking

更新

“解决方案#2”的简单扩展可以等待所有任务完成,而无需加入工人/破坏池:

  void drain() 
      unique_lock<mutex> lk(mx);
      namespace phx = boost::phoenix;
      cv.wait(lk, phx::empty(phx::ref(_queue)));
  

请注意,为了可靠运行,还需要在出队时向条件变量发出信号:

      cv.notify_all(); // in order to signal drain

注意事项

    这是一个邀请竞争条件的接口(队列可以接受来自多个线程的作业,因此一旦drain() 返回,另一个线程可能已经发布了新任务)

    当队列为空时发出信号,而不是在任务完成时发出信号。队列无法知道这一点,如果需要,请使用任务中的屏障/信号条件(本例中为the_work)。队列/调度机制与那里无关。

演示

Live On Coliru

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

class thread_pool

  private:
      mutex mx;
      condition_variable cv;

      typedef function<void()> job_t;
      std::deque<job_t> _queue;

      thread_group pool;

      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      
          while (auto job = q.dequeue())
              (*job)();
      

  public:
      thread_pool() : shutdown(false) 
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      

      void enqueue(job_t job) 
      
          lock_guard<mutex> lk(mx);
          _queue.push_back(std::move(job));

          cv.notify_one();
      

      void drain() 
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;
          cv.wait(lk, phx::empty(phx::ref(_queue)));
      

      optional<job_t> dequeue() 
      
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;

          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

          if (_queue.empty())
              return none;

          auto job = std::move(_queue.front());
          _queue.pop_front();

          cv.notify_all(); // in order to signal drain

          return std::move(job);
      

      ~thread_pool()
      
          shutdown = true;
          
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          

          pool.join_all();
      
;

void the_work(int id)

    std::cout << "worker " << id << " entered\n";

    // no more synchronization; the pool size determines max concurrency
    std::cout << "worker " << id << " start work\n";
    this_thread::sleep_for(chrono::milliseconds(2));
    std::cout << "worker " << id << " done\n";


int main()

    thread_pool pool; // uses 1 thread per core

    for (auto i = 0ull; i < 20; ++i) 
        for (int i = 0; i < 10; ++i)
            pool.enqueue(bind(the_work, i));

        pool.drain(); // make the queue empty, leave the threads
        std::cout << "Queue empty\n";
    

    // destructing pool joins the worker threads

【讨论】:

添加了完整实现正确线程池的链接,包括基于 Boost Asio 的 io_service 的链接 感谢非常有用的链接,但唯一缺少的是如何“等到ioService 队列为空”来替换threadpool.join_all() 析构函数做到了——只要寻找join真的。 我想这不是我想问的;如您所说,您只能join一次,但我想继续使用线程池,只需等待作业队列清空即可。我可以扩展您的解决方案 #2 来支持它。 @KenY-N 查看更新。也阅读[CAVEATS](下次也试着更好地表达你的问题。你的问题清楚地显示了循环内的加入调用。我很高兴听到这不是你真正想要的,但为什么它在你的问题中...... .)

以上是关于asio::io_service 和 thread_group 生命周期问题的主要内容,如果未能解决你的问题,请参考以下文章

C++ boost::asio::io_service创建线程池thread_group简单实例

从多个线程调用 boost::asio::io_service 运行函数

boost::asio::io_service类

boost::asio io_service 和 std::containers 的线程安全

在 boost::asio::io_service 上调用 run 时崩溃

Boost::Asio : io_service.run() vs poll() 或者我如何在主循环中集成 boost::asio