使用boost asio的线程池

Posted

技术标签:

【中文标题】使用boost asio的线程池【英文标题】:Thread pool using boost asio 【发布时间】:2012-08-26 06:35:22 【问题描述】:

我正在尝试使用 boost::asio 创建一个有限的线程池类。但是我被困在某一点上,有人可以帮助我吗?

唯一的问题是我应该在哪里减少计数器?

代码没有按预期工作。

问题是我不知道我的线程什么时候完成执行,以及我如何知道它已经返回到池中

#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>

using namespace std;
using namespace boost;

class ThreadPool

    static int count;
    int NoOfThread;
    thread_group grp;
    mutex mutex_;
    asio::io_service io_service;
    int counter;
    stack<thread*> thStk ;

public:
    ThreadPool(int num)
       
        NoOfThread = num;
        counter = 0;
        mutex::scoped_lock lock(mutex_);

        if(count == 0)
            count++;
        else
            return;

        for(int i=0 ; i<num ; ++i)
        
            thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
        
    
    ~ThreadPool()
    
        io_service.stop();
        grp.join_all();
    

    thread* getThread()
    
        if(counter > NoOfThread)
        
            cout<<"run out of threads \n";
            return NULL;
        

        counter++;
        thread* ptr = thStk.top();
        thStk.pop();
        return ptr;
    
;
int ThreadPool::count = 0;


struct callable

    void operator()()
    
        cout<<"some task for thread \n";
    
;

int main( int argc, char * argv[] )


    callable x;
    ThreadPool pool(10);
    thread* p = pool.getThread();
    cout<<p->get_id();

    //how i can assign some function to thread pointer ?
    //how i can return thread pointer after work done so i can add 
//it back to stack?


    return 0;

【问题讨论】:

@jupiter 感谢编辑,但这不是最终代码cplusplus.com/forum/general/77981 请通过链接直到底部你会看到修改后的代码@end。 请查看此处发布的最终代码cplusplus.com/forum/general/77981 减少计数器不是唯一的问题(或者我看错了代码,请编辑你的问题)。仅举几例:您的线程池将不执行任何操作(您调用 io_service::run() 时没有发布任何内容,因此它将立即完成并且将来的 post()-ed 任务无法执行);只有第一个创建的 ThreadPool 对象会创建一些线程;变量不受互斥体(或其他方式)保护。 【参考方案1】:

简而言之,您需要用另一个函数包装用户提供的任务:

调用用户函数或可调用对象。 锁定互斥体并减少计数器。

我可能不了解此线程池的所有要求。因此,为清楚起见,以下是我认为的要求的明确列表:

池管理线程的生命周期。用户应该不能删除驻留在池中的线​​程。 用户可以以非侵入方式将任务分配给池。 在分配任务时,如果池中的所有线程当前都在运行其他任务,则该任务将被丢弃。

在提供实现之前,我想强调几个关键点:

一旦线程启动,它将一直运行到完成、取消或终止。线程正在执行的函数不能被重新分配。为了允许单个线程在其生命周期中执行多个函数,线程将希望使用将从队列中读取的函数启动,例如io_service::run(),并将可调用类型发布到事件队列中,例如来自io_service::post()。 如果io_service 中没有待处理的工作、io_service 已停止或线程正在运行的处理程序抛出异常,则io_service::run() 返回。为了防止io_serivce::run()在没有未完成的工作时返回,可以使用io_service::work类。 定义任务的类型要求(即任务的类型必须可以通过object() 语法调用)而不是要求类型(即任务必须继承自process),为用户提供了更大的灵活性。它允许用户提供任务作为函数指针或提供空值operator() 的类型。

使用boost::asio实现:

#include <boost/asio.hpp>
#include <boost/thread.hpp>

class thread_pool

private:
  boost::asio::io_service io_service_;
  boost::asio::io_service::work work_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : work_( io_service_ ),
      available_( pool_size )
  
    for ( std::size_t i = 0; i < pool_size; ++i )
    
      threads_.create_thread( boost::bind( &boost::asio::io_service::run,
                                           &io_service_ ) );
    
  

  /// @brief Destructor.
  ~thread_pool()
  
    // Force all threads to return from io_service::run().
    io_service_.stop();

    // Suppress all exceptions.
    try
    
      threads_.join_all();
    
    catch ( const std::exception& ) 
  

  /// @brief Adds a task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Post a wrapped task into the queue.
    io_service_.post( boost::bind( &thread_pool::wrap_task, this,
                                   boost::function< void() >( task ) ) );
  

private:
  /// @brief Wrap a task so that the available count can be increased once
  ///        the user provided task has completed.
  void wrap_task( boost::function< void() > task )
  
    // Run the user supplied task.
    try
    
      task();
    
    // Suppress all exceptions.
    catch ( const std::exception& ) 

    // Task has finished, so increment count of available threads.
    boost::unique_lock< boost::mutex > lock( mutex_ );
    ++available_;
  
;

一些关于实现的cmets:

需要围绕用户的任务进行异常处理。如果用户的函数或可调用对象抛出非boost::thread_interrupted 类型的异常,则调用std::terminate()。这是 Boost.Thread 的 exceptions in thread functions 行为的结果。 Boost.Asio 的effect of exceptions thrown from handlers 也值得一读。 如果用户通过boost::bind 提供task,那么嵌套的boost::bind 将无法编译。需要以下选项之一: 不支持boost::bind创建的task。 元编程根据boost::bind 的结果是否为用户类型执行编译时分支,以便boost::protect 可以使用,因为boost::protect 仅在某些函数对象上正常运行。李> 使用其他类型间接传递task 对象。我选择使用boost::function 以提高可读性,但会丢失确切的类型。 boost::tuple 虽然可读性稍差,但也可用于保留确切的类型,如 Boost.Asio 的 serialization 示例所示。

应用程序代码现在可以非侵入性地使用thread_pool 类型:

void work() ;

struct worker

  void operator()() ;
;

void more_work( int ) ;

int main()
 
  thread_pool pool( 2 );
  pool.run_task( work );                        // Function pointer.
  pool.run_task( worker() );                    // Callable object.
  pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.

thread_pool 可以在没有 Boost.Asio 的情况下创建,并且对于维护者来说可能会稍微容易一些,因为他们不再需要了解 Boost.Asio 的行为,例如 io_service::run() 什么时候返回,以及 @ 是什么987654354@对象:

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class thread_pool

private:
  std::queue< boost::function< void() > > tasks_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
  boost::condition_variable condition_;
  bool running_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : available_( pool_size ),
      running_( true )
  
    for ( std::size_t i = 0; i < pool_size; ++i )
    
      threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
    
  

  /// @brief Destructor.
  ~thread_pool()
  
    // Set running flag to false then notify all threads.
    
      boost::unique_lock< boost::mutex > lock( mutex_ );
      running_ = false;
      condition_.notify_all();
    

    try
    
      threads_.join_all();
    
    // Suppress all exceptions.
    catch ( const std::exception& ) 
  

  /// @brief Add task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Set task and signal condition variable so that a worker thread will
    // wake up andl use the task.
    tasks_.push( boost::function< void() >( task ) );
    condition_.notify_one();
  

private:
  /// @brief Entry point for pool threads.
  void pool_main()
  
    while( running_ )
    
      // Wait on condition variable while the task is empty and the pool is
      // still running.
      boost::unique_lock< boost::mutex > lock( mutex_ );
      while ( tasks_.empty() && running_ )
      
        condition_.wait( lock );
      
      // If pool is no longer running, break out.
      if ( !running_ ) break;

      // Copy task locally and remove from the queue.  This is done within
      // its own scope so that the task object is destructed immediately
      // after running the task.  This is useful in the event that the
      // function contains shared_ptr arguments bound via bind.
      
        boost::function< void() > task = tasks_.front();
        tasks_.pop();

        lock.unlock();

        // Run the task.
        try
        
          task();
        
        // Suppress all exceptions.
        catch ( const std::exception& ) 
      

      // Task has finished, so increment count of available threads.
      lock.lock();
      ++available_;
     // while running_
  
;

【讨论】:

如果我们还使用其他 asio 对象,例如 boost::asio::ip::udp::socket,它在内部为异步操作发布任务,我们如何使用线程池? 这不是一个池,因为只要池已满,它就会停止添加任务。新任务应该保存在等待执行的容器中。 @squid 是一个基于原发者问题特征的池。由于这些特征既不常见也不明确,因此被明确列出。池可以使用各种不同的策略来管理任务队列的大小。 @squid 我有各种策略,例如排队直到内存分配失败,将队列限制为固定的最大大小,基于启发式算法无法排队(例如最旧的任务已在队列中等待给定的时间段),排队但产生额外的线程(永久或临时)。 @squid 是的,这可能是一种情况。我经常看到嵌入式设备上使用固定大小的队列来管理内存使用和防止可用空间碎片。

以上是关于使用boost asio的线程池的主要内容,如果未能解决你的问题,请参考以下文章

使用 boost::asio::thread_pool 的 C++ 线程池,为啥我不能重用我的线程?

使用 boost 创建线程池

Boost.Asio 是不是可以使用单独的线程进行读写?

多线程和Boost::Asio

C++ boost::asio::io_service创建线程池thread_group简单实例

C++ boost::asio::io_service创建线程池thread_group简单实例