c++基于ThreadPool实现灵活的异步任务

Posted 特立独行的猫a

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c++基于ThreadPool实现灵活的异步任务相关的知识,希望对你有一定的参考价值。

在工作中有时会有这种需求,在延时中执行一些任务,等待任务超时或者任务返回结果再往下执行。如果不做封装,可能会怎么做?每次都进入while true?

......
auto start = std::chrono::system_clock::now();
auto timeout = 500;
while (true)
      auto now  = std::chrono::system_clock::now();
      auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(now- start );
      if (_duration > timeout)
        LOGGING_ERROR("duration is %d", timeout.count());
        break;
      
      //do something

      std::this_thread::sleep_for(std::chrono::milliseconds(10));

或许也能完成目的,但是写法很不优雅。且程序执行到此处进入while(true)只能等在这里了,其他啥活都干不了。
可以怎么优化?可以把这部分任务放入线程中异步执行。 

// 耗时操作
auto fetchDataFromDB = [](std::string recvdData,std::function<int(std::string &)> cback) 
		// Make sure that function takes 5 seconds to complete
		std::this_thread::sleep_for(seconds(5));
		//Do stuff like creating DB Connection and fetching Data
		if(cback != nullptr)
			std::string out = "this is from callback ";
			cback(out);
		
		return "DB_" + recvdData;
	;
 
//把fetchDataFromDB这一IO耗时任务放到线程里异步执行
//
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data0",
			[&](std::string &result)
			std::cout << "callback result from thread:" << result << std::endl;
			return 0;
			);	


// do otherthings 可以做些其他工作
......
// 等待异步的结果
std::string result = resultFromDB.get(); // waits for fetchDataFromDB to return

然而每次都要创建线程损耗也不小,可以基于线程池进一步改造。

me::ThreadPool pool(4);
......
//把fetchDataFromDB这一IO耗时操作放到pool中
pool.commit(fetchDataFromDB,"Data1",[&](std::string &result)
			std::cout << "callback result from pool thread:" << result << std::endl;
			return 0;
			);

然而,这种还是不够灵活。比如不能灵活控制耗时任务的超时时间和检测频率。

想要的是这种效果:

  //!
  //! \\brief 任务运行函数
  //!
  //! \\param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数
  //! \\param duration 超时时间
  //! \\param timer_duration 嘀嗒周期
  //! \\return
  //!
  taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration)

则使用起来就简单多了。

 ...... 
 auto f = [&]
  
    return checkIO(errorCode);
  ;
 
 // f的执行频率为30毫秒执行一次,超时时间5秒钟
 auto future = misc::TaskRunner::getInstance()->taskRunner(f,std::chrono::seconds(5),30);

 misc::TaskRunner::getInstance()->waitForResult(future);

以下为TaskRunner的封装实现:

#define KeepRunning (0U)
#define StopRunning (1U)

#include <ThreadPool.h>

namespace misc

//!
//! \\brief 任务运行器
//!
class TaskRunner

public:
  //!
  //! \\brief 单例模型
  //! \\return
  //!
  static TaskRunner *getInstance()
  
    static TaskRunner w;
    return &w;
  
  ~TaskRunner()   

public:
  using taskFunction = std::function<int()>;
  using taskResult_t = std::shared_future<int>;
  //!
  //! \\brief 任务运行函数
  //!
  //! 如果运行超时,则返回失败代码(未启用超时功能)
  //!
  //! \\param f 嘀嗒时期调用的函数,如果使用 std::bind 函数则会自动匹配该函数
  //! \\param duration 超时时间
  //! \\return
  //!
  taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration)
  
    return start(f, duration);
  
  //!
  //! \\brief 任务运行函数
  //!
  //! 如果运行超时,则返回失败代码(未启用超时功能)
  //!
  //! \\param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数
  //! \\param duration 超时时间
  //! \\return
  //!
  taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration)
  
    return start(f, duration);
  
  //!
  //! \\brief 任务运行函数
  //!
  //! 如果运行超时,则返回失败代码(未启用超时功能)
  //!
  //! \\param f 嘀嗒时期调用的函数,如果使用 std::bind 函数则会自动匹配该函数
  //! \\param duration 超时时间
  //! \\param timer_duration 嘀嗒周期,因为默认的嘀嗒时期为
  //! 20um,如果想要自主决定,则使用该函数 \\return
  //!
  taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration, int timer_duration)
  
    return start(f, duration, timer_duration);
  
  //!
  //! \\brief 任务运行函数
  //!
  //! 如果运行超时,则返回失败代码(未启用超时功能)
  //!
  //! \\param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数
  //! \\param duration 超时时间
  //! \\param timer_duration 嘀嗒周期,因为默认的嘀嗒时期为
  //! 20um,如果想要自主决定,则使用该函数 \\return
  //!
  taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration)
  
    return start(f, duration, timer_duration);
  


  int waitForResult(const taskResult_t &result)
  
    if (result.valid())
    
      result.wait();
    

    return 1;
  

private:
#define NOW std::chrono::system_clock::now()

  TaskRunner() : pool_(2) 

  std::shared_future<int> start(const taskFunction &f, const std::chrono::steady_clock::duration &duration,
                                uint64_t interval = 1)
  
    auto taskResultPtr = std::make_shared<resultMap_t>();
    taskResultPtr->isUsed_ = false;
    taskResultPtr->id_ = idFactory_++;
    auto id = taskResultPtr->id_;

    taskResultPtr->taskResult_ = pool_.enqueue([id, f, duration, interval, this] 
      auto func = f;
      return taskRunner(id, func, NOW, duration, interval);
    );

    return taskResultPtr->taskResult_;
  

  int taskRunner(uint32_t id, const taskFunction &f, const std::chrono::system_clock::time_point &now,
                 const std::chrono::steady_clock::duration &timeout /* millisecond */,
                 uint32_t interval /* millisecond */)
  
    while (true)
    
      auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(NOW - now);
      if (_duration > timeout)
      
        LOGGING_ERROR("duration is %d", timeout.count());
        return 1;
      

      if (f() == StopRunning)
      
        LOGGING_WARN("function is running out.");
        break;
      

      std::this_thread::sleep_for(std::chrono::milliseconds(interval));
    
    return 0;
  

  void waitAll()
  
      pool_.Wait();
  

private:
  ThreadPool pool_;

  struct resultMap_t
  
    unsigned int id_;
    bool isUsed_;
    taskResult_t taskResult_;
  ;
  using resultMapPtr_t = std::shared_ptr<resultMap_t>;

  std::atomic_uint idFactory_ = ATOMIC_VAR_INIT(1);
;
  // namespace misc

以上是关于c++基于ThreadPool实现灵活的异步任务的主要内容,如果未能解决你的问题,请参考以下文章

C++版ThreadPool实现

高效线程池(threadpool)的实现

使用 Task 替换 ThreadPool ,异步监测所有线程(任务)是否全部执行完毕

Nodejs事件引擎libuv源码剖析之:高效线程池(threadpool)的实现

.NET Threadpool 工作线程和异步 IO 线程

Thread与Task区别