异步调用future/promise模式(C++版本)
Posted SuPhoebe
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步调用future/promise模式(C++版本)相关的知识,希望对你有一定的参考价值。
Future/Promise 编程模式
如何减少服务器应答时间,如何更强地进行并发。异步调用的Future/Promise1模式就是实现这一目的的手段之一。
一个 Future 就是说“将来”你需要某些东西(一般就是一个网络请求的结果),但是你现在就要发起这样的请求,并且这个请求会异步执行。或者换一个说法,你需要在后台执行一个异步请求。
C++中的应用
C++11创建了线程以后,我们不能直接从thread.join()得到结果,必须定义一个变量,在线程执行时,对这个变量赋值,然后执行join(),过程相对繁琐。
于是,在C++11中提供了future头文件,提供了上述的异步调用模式,用更加优雅的方式来实现。这儿讲解和学习一下如何使用。
<future> 头文件简介2
- Classes
- std::future
- std::future_error
- std::packaged_task
- std::promise
- std::shared_future
- Functions
- std::async
- std::future_category
future
一个模板类。
可以通过查询future的状态(future_status)来获取异步操作的结果。future_status有三种状态:
- deferred:异步操作还没开始
- ready:异步操作已经完成
- timeout:异步操作超时
获取future结果的有三种方式:get、wait、wait_for,其中get等待异步操作结束并返回结果,wait只是等待异步操作完成,没有返回值,wait_for是超时等待返回结果。
std::future_status status;
do
status = future.wait_for(std::chrono::seconds(1));
if (status == std::future_status::deferred)
std::cout << "deferred\\n";
else if (status == std::future_status::timeout)
std::cout << "timeout\\n";
else if (status == std::future_status::ready)
std::cout << "ready!\\n";
while (status != std::future_status::ready);
promise
顾名思义,承诺,承诺给别人反馈一个结果,
promise 是一个类模板,模板参数T便是产出值的类型。同样的,future 也是一个类模板,模板参数T是获取的值的类型。Promise对象可保存T类型的值,该值可被相应的future对象读取(可能在另一个线程中),这是promise提供同步的一种手段。
在构造promise时,promise对象可以与共享状态关联起来,这个共享状态可以存储一个T类型或者一个由std::exception派生出的类的值,并可以通过get_future来获取与promise对象关联的对象,调用该函数之后,两个对象共享相同的共享状态(shared state)。
Promise对象是异步provider,它可以在某一时刻设置共享状态的值。
Future对象可以返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标识变为ready,然后才能获取共享状态的值。
用传统的生产-消费者模型来解释:
#include <thread>
#include <iostream>
#include <future>
#include <chrono>
struct _data
int32_t value;
;
_data data = 0 ;
int main()
std::promise<_data> data_promise; //创建一个承诺
std::future<_data> data_future = data_promise.get_future(); //得到这个承诺封装好的期望
std::thread prepare_data_thread([](std::promise<_data> &data_promise)
std::this_thread::sleep_for(std::chrono::seconds(2)); //模拟生产过程
data_promise.set_value( 1 ); //通过set_value()反馈结果
, std::ref(data_promise));
std::thread process_data_thread([](std::future<_data> &data_future)
std::cout << data_future.get().value << std::endl; //通过get()获取结果
, std::ref(data_future));
prepare_data_thread.join();
process_data_thread.join();
system("pause");
return 0;
我们首先创建了一个 _data 类型data_promise ,而在 data_promise 里已经封装好了一个 _data类型的future,我们可以调用 promise 的 get_future() 方法得到与之对应的 future。然后我们把 data_promise 传递给了 prepare_data_thread,于是我们便可以在 prepare_data_thread 里面来产出值了,在休眠2S之后,我们调用了 set_value() 方法来产出值。我们又将和 data_promise 相关联的 data_future 传递给了 process_data_thread,所以我们便可以在 process_data_thread 里调用 data_future 的 get() 方法获取 data_promise 的产出值。这里需要注意的一点是,future 的 get() 方法是阻塞的,所以在与其成对的 promise 还未产出值,也就是未调用 set_value() 方法之前,调用 get() 的线程将会一直阻塞在 get()处直到其他任何人调用了 set_value() 方法。
简单来说,promise 用来产出值,future 用来获取值,但这个 promise 和 future 必须是配对的,也就是说 future 是通过调用 promise 的 get_future() 方法来获得的。
packaged_task
packaged_task 是对一个任务的抽象,一个包装一个可调用对象(如function,lambda表达式,bind表达式)的包装类。
std::packaged_task包装的是一个异步操作,而std::promise包装的是一个值。相较于 promise,它应该算是更高层次的一个抽象了吧。
我们可以将任务投递给任何线程去完成,然后通过 packaged_task.get_future() 方法获取的 future 来获取任务完成后的产出值。那么,下面还是原来的配方:
#include <thread>
#include <iostream>
#include <future>
struct _data
int32_t value;
;
_data data = 0 ;
int main()
std::packaged_task<_data()> prepare_data_task([]()->_data
std::this_thread::sleep_for(std::chrono::seconds(2)); //模拟数据生产过程
return 1 ;
);
auto data_future = prepare_data_task.get_future(); //获取future
std::thread do_task_thread([](std::packaged_task<_data()> &task)
task(); //调用packaged_task的调用符来运行任务
, std::ref(prepare_data_task));
std::thread process_data_thread([](std::future<_data> &data_future)
std::cout << data_future.get().value << std::endl;
, std::ref(data_future));
do_task_thread.join();
process_data_thread.join();
system("pause");
return 0;
如上例中为 _data(),返回值为 _data 类型,函数参数为 void,其中返回值类型将决定 future 的类型也就是产出值类型。
我们创建了一个任务,并投递给了 do_task_thread 去完成这个任务,然后将对应的 data_future投递给了process_data_thread ,于是我们就可以在 process_data_thread 里获取任务产出值了。同样地,获取值之前必须等待任务的完成。
看这几个函数为我们省去了使用 mutex,condition_variable 这些枯燥的细节,也降低了我们出错的可能。让我们更加优雅轻松地实现在线程之间通信。
但是绝对不要以为 future 是线程安全的,future.get()只能被调用一次,多次调用会触发异常,如果想要在多个线程中多次获取产出值请查阅 shared_future。
async
std::async大概的工作过程:先将异步操作用std::packaged_task包装起来,然后将异步操作的结果放到std::promise中,这个过程就是创造future的过程。外面再通过future.get/wait来获取这个未来的结果。
可以说,std::async帮我们将std::future、std::promise和std::packaged_task三者结合了起来。
当我想要异步完成一个操作,并在主线程中阻塞地或者非阻塞地获取这个异步操作的结果时,通常我们的做法是 new 一个子线程来完成这个异步操作,并结合锁,条件变量在主线程中来获取共享内存中的值。async 为我们实现了这些细节,让我们无需再亲自去开辟线程,并且提供了更多可选的功能。
std::async的原型:
async(std::launch::async | std::launch::deferred, f, args...)
第一个参数是线程的创建策略,默认的策略是立即创建线程:
std::launch::async:在调用async就开始创建线程。
std::launch::deferred:延迟加载方式创建线程。调用async时不创建线程,直到调用了future的get或者wait时才创建线程。
第二个参数是线程函数,和packaged_task封装的异步操作一样,后面的参数是线程函数的参数。
为了与之前的内容形成对比,我们仍然采用老套的生产-消费模型来举例:
#include <thread>
#include <iostream>
#include <chrono>
#include <future>
struct _data
int32_t value;
;
_data data = 0 ;
int main()
auto start = std::chrono::steady_clock::now();
std::future<_data> data_future = std::async(std::launch::async, []()->_data
std::this_thread::sleep_for(std::chrono::seconds(1)); //模拟生产过程
return 1 ;
);
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << data_future.get().value << std::endl; //使用产出值
auto end = std::chrono::steady_clock::now();
std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << std::endl;
system("pause");
return 0;
async 返回一个与函数返回值相对应类型的 future,通过它我们可以在其他任何地方获取异步结果。
由于我们给 async 提供了 std::launch::async 策略,所以生产过程将被异步执行,具体执行的时间取决于各种因素,最终输出的时间为 2000ms+,可见生产过程和主线程是并发执行的。
除了 std::launch::async,还有一个 std::launch::deferred 策略,它会延迟线程地创造,也就是说只有当我们调用 future.get() 时子线程才会被创建以执行任务。
参考文献
以上是关于异步调用future/promise模式(C++版本)的主要内容,如果未能解决你的问题,请参考以下文章
C++运行报错:terminate called after throwing an instance of ‘std::future_error‘(promise对象调用set前不能销毁!)
C++运行报错:terminate called after throwing an instance of ‘std::future_error‘(promise对象调用set前不能销毁!)