异步调用future/promise模式(C++版本)

Posted SuPhoebe

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步调用future/promise模式(C++版本)相关的知识,希望对你有一定的参考价值。

Future/Promise 编程模式

如何减少服务器应答时间,如何更强地进行并发。异步调用的Future/Promise1模式就是实现这一目的的手段之一。

一个 Future 就是说“将来”你需要某些东西(一般就是一个网络请求的结果),但是你现在就要发起这样的请求,并且这个请求会异步执行。或者换一个说法,你需要在后台执行一个异步请求。

C++中的应用

C++11创建了线程以后,我们不能直接从thread.join()得到结果,必须定义一个变量,在线程执行时,对这个变量赋值,然后执行join(),过程相对繁琐。

于是,在C++11中提供了future头文件,提供了上述的异步调用模式,用更加优雅的方式来实现。这儿讲解和学习一下如何使用。

<future> 头文件简介2

  1. Classes
    • std::future
    • std::future_error
    • std::packaged_task
    • std::promise
    • std::shared_future
  2. Functions
    • std::async
    • std::future_category

future

一个模板类。

可以通过查询future的状态(future_status)来获取异步操作的结果。future_status有三种状态:

  • deferred:异步操作还没开始
  • ready:异步操作已经完成
  • timeout:异步操作超时

获取future结果的有三种方式:get、wait、wait_for,其中get等待异步操作结束并返回结果,wait只是等待异步操作完成,没有返回值,wait_for是超时等待返回结果。

    std::future_status status;
    do 
        status = future.wait_for(std::chrono::seconds(1));
        if (status == std::future_status::deferred) 
            std::cout << "deferred\\n";
         else if (status == std::future_status::timeout) 
            std::cout << "timeout\\n";
         else if (status == std::future_status::ready) 
            std::cout << "ready!\\n";
        
     while (status != std::future_status::ready);

promise

顾名思义,承诺,承诺给别人反馈一个结果,

promise 是一个类模板,模板参数T便是产出值的类型。同样的,future 也是一个类模板,模板参数T是获取的值的类型。Promise对象可保存T类型的值,该值可被相应的future对象读取(可能在另一个线程中),这是promise提供同步的一种手段。

在构造promise时,promise对象可以与共享状态关联起来,这个共享状态可以存储一个T类型或者一个由std::exception派生出的类的值,并可以通过get_future来获取与promise对象关联的对象,调用该函数之后,两个对象共享相同的共享状态(shared state)。

Promise对象是异步provider,它可以在某一时刻设置共享状态的值。
Future对象可以返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标识变为ready,然后才能获取共享状态的值。

用传统的生产-消费者模型来解释:

#include <thread>
#include <iostream>
#include <future>
#include <chrono>
 
struct _data

    int32_t value;
;
 
_data data =  0 ;
 
int main()

    std::promise<_data> data_promise;      //创建一个承诺
    std::future<_data> data_future = data_promise.get_future();     //得到这个承诺封装好的期望
 
    std::thread prepare_data_thread([](std::promise<_data> &data_promise)
        std::this_thread::sleep_for(std::chrono::seconds(2));    //模拟生产过程
 
        data_promise.set_value( 1 );       //通过set_value()反馈结果
    , std::ref(data_promise));
 
    std::thread process_data_thread([](std::future<_data> &data_future)
        std::cout << data_future.get().value << std::endl;    //通过get()获取结果
    , std::ref(data_future));
 
    prepare_data_thread.join();
    process_data_thread.join();
 
    system("pause");
    return 0;

我们首先创建了一个 _data 类型data_promise ,而在 data_promise 里已经封装好了一个 _data类型的future,我们可以调用 promise 的 get_future() 方法得到与之对应的 future。然后我们把 data_promise 传递给了 prepare_data_thread,于是我们便可以在 prepare_data_thread 里面来产出值了,在休眠2S之后,我们调用了 set_value() 方法来产出值。我们又将和 data_promise 相关联的 data_future 传递给了 process_data_thread,所以我们便可以在 process_data_thread 里调用 data_future 的 get() 方法获取 data_promise 的产出值。这里需要注意的一点是,future 的 get() 方法是阻塞的,所以在与其成对的 promise 还未产出值,也就是未调用 set_value() 方法之前,调用 get() 的线程将会一直阻塞在 get()处直到其他任何人调用了 set_value() 方法。

简单来说,promise 用来产出值,future 用来获取值,但这个 promise 和 future 必须是配对的,也就是说 future 是通过调用 promise 的 get_future() 方法来获得的。

packaged_task

packaged_task 是对一个任务的抽象,一个包装一个可调用对象(如function,lambda表达式,bind表达式)的包装类。

std::packaged_task包装的是一个异步操作,而std::promise包装的是一个值。相较于 promise,它应该算是更高层次的一个抽象了吧。

我们可以将任务投递给任何线程去完成,然后通过 packaged_task.get_future() 方法获取的 future 来获取任务完成后的产出值。那么,下面还是原来的配方:

#include <thread>
#include <iostream>
#include <future>
 
struct _data

	int32_t value;
;
 
_data data =  0 ;
 
int main()

	std::packaged_task<_data()> prepare_data_task([]()->_data
		std::this_thread::sleep_for(std::chrono::seconds(2));    //模拟数据生产过程
 
		return 1 ;
	);
	auto data_future = prepare_data_task.get_future();          //获取future
 
	std::thread do_task_thread([](std::packaged_task<_data()> &task)
		task();              //调用packaged_task的调用符来运行任务
	, std::ref(prepare_data_task));
 
	std::thread process_data_thread([](std::future<_data> &data_future)
		std::cout << data_future.get().value << std::endl;
	, std::ref(data_future));
 
	do_task_thread.join();
	process_data_thread.join();
 
	system("pause");
	return 0;

如上例中为 _data(),返回值为 _data 类型,函数参数为 void,其中返回值类型将决定 future 的类型也就是产出值类型。

我们创建了一个任务,并投递给了 do_task_thread 去完成这个任务,然后将对应的 data_future投递给了process_data_thread ,于是我们就可以在 process_data_thread 里获取任务产出值了。同样地,获取值之前必须等待任务的完成。

看这几个函数为我们省去了使用 mutex,condition_variable 这些枯燥的细节,也降低了我们出错的可能。让我们更加优雅轻松地实现在线程之间通信。

但是绝对不要以为 future 是线程安全的,future.get()只能被调用一次,多次调用会触发异常,如果想要在多个线程中多次获取产出值请查阅 shared_future。

async

std::async大概的工作过程:先将异步操作用std::packaged_task包装起来,然后将异步操作的结果放到std::promise中,这个过程就是创造future的过程。外面再通过future.get/wait来获取这个未来的结果。

可以说,std::async帮我们将std::future、std::promise和std::packaged_task三者结合了起来。

当我想要异步完成一个操作,并在主线程中阻塞地或者非阻塞地获取这个异步操作的结果时,通常我们的做法是 new 一个子线程来完成这个异步操作,并结合锁,条件变量在主线程中来获取共享内存中的值。async 为我们实现了这些细节,让我们无需再亲自去开辟线程,并且提供了更多可选的功能。

std::async的原型:

async(std::launch::async | std::launch::deferred, f, args...)

第一个参数是线程的创建策略,默认的策略是立即创建线程:

std::launch::async:在调用async就开始创建线程。
std::launch::deferred:延迟加载方式创建线程。调用async时不创建线程,直到调用了future的get或者wait时才创建线程。
第二个参数是线程函数,和packaged_task封装的异步操作一样,后面的参数是线程函数的参数。

为了与之前的内容形成对比,我们仍然采用老套的生产-消费模型来举例:

#include <thread>
#include <iostream>
#include <chrono>
#include <future>
 
struct _data

	int32_t value;
;
 
_data data =  0 ;
 
int main()

	auto start = std::chrono::steady_clock::now();
 
	std::future<_data> data_future = std::async(std::launch::async, []()->_data
		std::this_thread::sleep_for(std::chrono::seconds(1));           //模拟生产过程
		return  1 ;
	);
 
	std::this_thread::sleep_for(std::chrono::seconds(2));
	std::cout << data_future.get().value << std::endl;              //使用产出值
 
	auto end = std::chrono::steady_clock::now();
	std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << std::endl;
 
	system("pause");
	return 0;

async 返回一个与函数返回值相对应类型的 future,通过它我们可以在其他任何地方获取异步结果。
由于我们给 async 提供了 std::launch::async 策略,所以生产过程将被异步执行,具体执行的时间取决于各种因素,最终输出的时间为 2000ms+,可见生产过程和主线程是并发执行的。

除了 std::launch::async,还有一个 std::launch::deferred 策略,它会延迟线程地创造,也就是说只有当我们调用 future.get() 时子线程才会被创建以执行任务。

参考文献


  1. Futures and promises ↩︎

  2. C++11 多线程 future/promise简介 ↩︎

以上是关于异步调用future/promise模式(C++版本)的主要内容,如果未能解决你的问题,请参考以下文章

C++运行报错:terminate called after throwing an instance of ‘std::future_error‘(promise对象调用set前不能销毁!)

C++运行报错:terminate called after throwing an instance of ‘std::future_error‘(promise对象调用set前不能销毁!)

folly教程系列之:future/promise

聊聊C++异步编程-2

netty的Future异步回调难理解?手写个带回调异步框架就懂了附源码

xmake经验总结1:解决c++ future/promise抛出std::system_error的问题