聊聊C++异步编程-2
Posted 闲人草堂
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊C++异步编程-2相关的知识,希望对你有一定的参考价值。
上篇 谈到了在C++11中对线程的底层操作方法,但在实际项目中,使用这样的方式开发效率低而且容易犯错。那有没有更方便,简洁,安全的方式呢,其实是有的,我把它称为High-Level操作(虽然这个名称分类未必准确)。
High-Level 操作
future & promise
c++11起提供了future,promise来满足跨线程取值的需求。想象一下,在此之前是如何操作的。能用的手段只有通过共享变量,通过condition_variable来提醒其他线程取值,更早之前,只能通过mutex和多个共享变量,组合传递出“变量的更改状态”、“变量值”等信息。
具体future和promise是如何工作的?网上有个经典的示意图(下图)。有线程1和2,线程1希望从线程2中获取特定值,其步骤如下:
线程1:创建promise对象,并从该promise对象中获得对应的future对象。线程1将promise对象传递给线程2,完成其他工作后,通过future::get()方法等待从线程2中取值。此时线程1被阻塞。完成取值后继续剩下的工作。
线程2:接受传入的promise对象,通过promise的set_XXX等方法设置特定值。然后继续线程2自身的工作。
从上述流程可以看出,promise与future是成对出现的。生产者通过promise赋值,消费者通过future取值。示意代码如下,比较简单明了。
void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> &accumulate_promise)
{
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum); // Notify future
}
int test_future_promise(int argc, char *argv[])
{
try {
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate, numbers.begin(), numbers.end(),
std::ref(accumulate_promise));
//accumulate_future.wait(); // wait for result
std::cout << "result=" << accumulate_future.get() << '\n';
work_thread.join(); // wait for thread completion
}
catch (std::exception &e)
{
std::cerr << e.what();
}
return 0;
}
生产者通过promise的set_value方法赋值。在set_value方法内获取promise内部存储的mutex,并更新对象。注意: set_value不能设置多次,否则会丢出异常。
消费者通过future的get来阻塞线程获取对应的值。此处要注意future和promise都是不可拷贝,但可以转移,因为其内部存储了mutex。注意:get只能被调用一次,如果希望多次调用需要使用shared_future。为什么会这样?我们看下两个future的get实现:
//future::get
_Ty get() { // block until ready then return the stored result or
// throw the stored exception
future _Local{_STD move(*this)};
return _STD move(_Local._Get_value());
}
// shared_future:get
const _Ty& get() const { // block until ready then return the stored result or
// throw the stored exception
return this->_Get_value();
}
看出区别了吧,future中的get是的返回值是move出来的,当第二次调用get就无法获得有效值了。而shared_future中的返回const 引用,可以多次get。除此以外,future不支持拷贝构造,而shared_future支持。
packaged_task & async
packaged_task 就像是一个捆绑了std::function的std::future,感觉略有点鸡肋。我们通过下面的代码,来看看如何使用它。packaged_task中可以传入函数对象,函数指针,lambda函数等。注意,这些函数并不是在package_task构造时被执行,需要手动invoke,才能执行。可以在当前线程执行,也可以被move到其他线程。
packaged_task提供get_future能从中取得future,借此可以获取绑定函数的返回值。
void task_thread()
{
std::packaged_task<int(int,int)> task([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = task.get_future();
std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "task_thread:\t" << result.get() << '\n';
}
async 是更上层的封装,使用起来也更简洁。我们先看下async的函数形式,这是c++17以后,c++20以前的形式。重载版本有两个,区别在于第一个参数是否是std::lanuch策略
template< class Function, class... Args >
std::future<std::invoke_result_t<std::decay_t<Function>,
std::decay_t<Args>...>>
async( std::launch policy, Function&& f, Args&&... args );
template< class Function, class... Args>
std::future<std::invoke_result_t<std::decay_t<Function>,
std::decay_t<Args>...>>
async( Function&& f, Args&&... args );
std::launch policy 有两种:async 或 deferred。两者的区别在于,async是在新线程中异步执行,通过返回的future的get函数来取值。而deferred 是在当前线程中同步执行,但请注意,并不是立即执行,而是延后到get函数被调用的时候才执行,这种取值的行为又被称为lazy evaluation。
没有std::launch的重载版本默认是在新线程中执行。
#include <iostream>#include <vector>#include <algorithm>#include <numeric>#include <future>#include <string>#include <mutex>
std::mutex m;
struct X {
void foo(int i, const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << ' ' << i << '\n';
}
void bar(const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << '\n';
}
int operator()(int i) {
std::lock_guard<std::mutex> lk(m);
std::cout << i << '\n';
return i + 10;
}
};
template <typename RandomIt>
int parallel_sum(RandomIt beg, RandomIt end)
{
auto len = end - beg;
if (len < 1000)
return std::accumulate(beg, end, 0);
RandomIt mid = beg + len/2;
auto handle = std::async(std::launch::async,
parallel_sum<RandomIt>, mid, end);
int sum = parallel_sum(beg, mid);
return sum + handle.get();
}
int main()
{
std::vector<int> v(10000, 1);
std::cout << "The sum is " << parallel_sum(v.begin(), v.end()) << '\n';
X x;
// Calls (&x)->foo(42, "Hello") with default policy:
// may print "Hello 42" concurrently or defer execution
auto a1 = std::async(&X::foo, &x, 42, "Hello");
// Calls x.bar("world!") with deferred policy
// prints "world!" when a2.get() or a2.wait() is called
auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
// Calls X()(43); with async policy
// prints "43" concurrently
auto a3 = std::async(std::launch::async, X(), 43);
a2.wait(); // prints "world!"
std::cout << a3.get() << '\n'; // prints "53"
}
到此介绍完了c++11中thread库提供的所有异步操作。
参考
thispointer.com/c11-mul
cnblogs.com/wangshaowei
stackoverflow.com/quest
open-std.org/jtc1/sc22/
以上是关于聊聊C++异步编程-2的主要内容,如果未能解决你的问题,请参考以下文章