c++基于ThreadPool实现灵活的异步任务
Posted 特立独行的猫a
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了c++基于ThreadPool实现灵活的异步任务相关的知识,希望对你有一定的参考价值。
在工作中有时会有这种需求,在延时中执行一些任务,等待任务超时或者任务返回结果再往下执行。如果不做封装,可能会怎么做?每次都进入while true?
......
auto start = std::chrono::system_clock::now();
auto timeout = 500;
while (true)
auto now = std::chrono::system_clock::now();
auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(now- start );
if (_duration > timeout)
LOGGING_ERROR("duration is %d", timeout.count());
break;
//do something
std::this_thread::sleep_for(std::chrono::milliseconds(10));
或许也能完成目的,但是写法很不优雅。且程序执行到此处进入while(true)只能等在这里了,其他啥活都干不了。
可以怎么优化?可以把这部分任务放入线程中异步执行。
// 耗时操作
auto fetchDataFromDB = [](std::string recvdData,std::function<int(std::string &)> cback)
// Make sure that function takes 5 seconds to complete
std::this_thread::sleep_for(seconds(5));
//Do stuff like creating DB Connection and fetching Data
if(cback != nullptr)
std::string out = "this is from callback ";
cback(out);
return "DB_" + recvdData;
;
//把fetchDataFromDB这一IO耗时任务放到线程里异步执行
//
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data0",
[&](std::string &result)
std::cout << "callback result from thread:" << result << std::endl;
return 0;
);
// do otherthings 可以做些其他工作
......
// 等待异步的结果
std::string result = resultFromDB.get(); // waits for fetchDataFromDB to return
然而每次都要创建线程损耗也不小,可以基于线程池进一步改造。
me::ThreadPool pool(4);
......
//把fetchDataFromDB这一IO耗时操作放到pool中
pool.commit(fetchDataFromDB,"Data1",[&](std::string &result)
std::cout << "callback result from pool thread:" << result << std::endl;
return 0;
);
然而,这种还是不够灵活。比如不能灵活控制耗时任务的超时时间和检测频率。
想要的是这种效果:
//!
//! \\brief 任务运行函数
//!
//! \\param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数
//! \\param duration 超时时间
//! \\param timer_duration 嘀嗒周期
//! \\return
//!
taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration)
则使用起来就简单多了。
......
auto f = [&]
return checkIO(errorCode);
;
// f的执行频率为30毫秒执行一次,超时时间5秒钟
auto future = misc::TaskRunner::getInstance()->taskRunner(f,std::chrono::seconds(5),30);
misc::TaskRunner::getInstance()->waitForResult(future);
以下为TaskRunner的封装实现:
#define KeepRunning (0U)
#define StopRunning (1U)
#include <ThreadPool.h>
namespace misc
//!
//! \\brief 任务运行器
//!
class TaskRunner
public:
//!
//! \\brief 单例模型
//! \\return
//!
static TaskRunner *getInstance()
static TaskRunner w;
return &w;
~TaskRunner()
public:
using taskFunction = std::function<int()>;
using taskResult_t = std::shared_future<int>;
//!
//! \\brief 任务运行函数
//!
//! 如果运行超时,则返回失败代码(未启用超时功能)
//!
//! \\param f 嘀嗒时期调用的函数,如果使用 std::bind 函数则会自动匹配该函数
//! \\param duration 超时时间
//! \\return
//!
taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration)
return start(f, duration);
//!
//! \\brief 任务运行函数
//!
//! 如果运行超时,则返回失败代码(未启用超时功能)
//!
//! \\param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数
//! \\param duration 超时时间
//! \\return
//!
taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration)
return start(f, duration);
//!
//! \\brief 任务运行函数
//!
//! 如果运行超时,则返回失败代码(未启用超时功能)
//!
//! \\param f 嘀嗒时期调用的函数,如果使用 std::bind 函数则会自动匹配该函数
//! \\param duration 超时时间
//! \\param timer_duration 嘀嗒周期,因为默认的嘀嗒时期为
//! 20um,如果想要自主决定,则使用该函数 \\return
//!
taskResult_t taskRunner(taskFunction &f, const std::chrono::steady_clock::duration &duration, int timer_duration)
return start(f, duration, timer_duration);
//!
//! \\brief 任务运行函数
//!
//! 如果运行超时,则返回失败代码(未启用超时功能)
//!
//! \\param f 嘀嗒时期调用的函数,如果使用 lambda 表达式则会自动匹配该函数
//! \\param duration 超时时间
//! \\param timer_duration 嘀嗒周期,因为默认的嘀嗒时期为
//! 20um,如果想要自主决定,则使用该函数 \\return
//!
taskResult_t taskRunner(taskFunction &&f, const std::chrono::steady_clock::duration &duration, int timer_duration)
return start(f, duration, timer_duration);
int waitForResult(const taskResult_t &result)
if (result.valid())
result.wait();
return 1;
private:
#define NOW std::chrono::system_clock::now()
TaskRunner() : pool_(2)
std::shared_future<int> start(const taskFunction &f, const std::chrono::steady_clock::duration &duration,
uint64_t interval = 1)
auto taskResultPtr = std::make_shared<resultMap_t>();
taskResultPtr->isUsed_ = false;
taskResultPtr->id_ = idFactory_++;
auto id = taskResultPtr->id_;
taskResultPtr->taskResult_ = pool_.enqueue([id, f, duration, interval, this]
auto func = f;
return taskRunner(id, func, NOW, duration, interval);
);
return taskResultPtr->taskResult_;
int taskRunner(uint32_t id, const taskFunction &f, const std::chrono::system_clock::time_point &now,
const std::chrono::steady_clock::duration &timeout /* millisecond */,
uint32_t interval /* millisecond */)
while (true)
auto _duration = std::chrono::duration_cast<std::chrono::milliseconds>(NOW - now);
if (_duration > timeout)
LOGGING_ERROR("duration is %d", timeout.count());
return 1;
if (f() == StopRunning)
LOGGING_WARN("function is running out.");
break;
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
return 0;
void waitAll()
pool_.Wait();
private:
ThreadPool pool_;
struct resultMap_t
unsigned int id_;
bool isUsed_;
taskResult_t taskResult_;
;
using resultMapPtr_t = std::shared_ptr<resultMap_t>;
std::atomic_uint idFactory_ = ATOMIC_VAR_INIT(1);
;
// namespace misc
以上是关于c++基于ThreadPool实现灵活的异步任务的主要内容,如果未能解决你的问题,请参考以下文章
使用 Task 替换 ThreadPool ,异步监测所有线程(任务)是否全部执行完毕