Cpp Concurrency In Action(读书笔记3)——同步并发操作
Posted 阳安子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Cpp Concurrency In Action(读书笔记3)——同步并发操作相关的知识,希望对你有一定的参考价值。
等待一个事件或其他条件
第一,它可以持续的检查共享数据标志(用于做保护工作的互斥量),直到另一线程完成工作时对这个标志进行重设。
第二个选择是在等待线程在检查间隙,使用 std::this_thread::sleep_for() 进行周期性的间歇:
#include <iostream> #include <thread> #include <mutex> class wait_test { bool flag; std::mutex m; public: wait_test(bool _flag):flag(_flag){} void setFlag(bool _flag) { std::unique_lock<std::mutex> lk(m); flag = _flag; } bool getFlag() { std::unique_lock<std::mutex> lk(m); return flag; } void wait_for_flag() { std::unique_lock<std::mutex> lk(m); while (!flag) { lk.unlock(); // 1 解锁互斥量 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms lk.lock(); // 3 再锁互斥量 } } }; void funA(wait_test &wt, int &i) { while (!wt.getFlag()) { ++i; } } void funB(wait_test &wt, int &i) { std::cout << "begin\t" << i << std::endl; wt.wait_for_flag();//等待主线程set std::cout << "end\t" << i << std::endl; } int main() { wait_test wt{ false }; int i{ 0 }; std::thread t1{ funA,std::ref(wt),std::ref(i) }, t2{ funB,std::ref(wt),std::ref(i) }; t1.detach(); t2.detach(); wt.setFlag(true); system("pause"); return 0; }第三个选择(也是优先的选择)是,使用C++标准库提供的工具去等待事件的发生。通过另一线程触发等待事件的机制是最基本的唤醒方式(例如:流水线上存在额外的任务时),这种机制就称为“条件变量”(condition variable)。从概念上来说,一个条件变量会与多个事件或其他条件相关,并且一个或多个线程会等待条件的达成。当某些线程被终止时,为了唤醒等待线程(允许等待线程继续执行)终止的线程将会向等待着的线程广播“条件达成”的信息。
等待条件达成
C++标准库对条件变量有两套实现: std::condition_variable 和 std::condition_variable_any 。这两个实现都包含在 <condition_variable> 头文件的声明中。两者都需要与一个互斥量一起才能工作(互斥量是为了同步);前者仅限于与 std::mutex 一起工作,而后者可以和任何满足最低标准的互斥量一起工作,从而加上了_any的后缀。因为 std::condition_variable_any
更加通用,这就可能从体积、性能,以及系统资源的使用方面产生额外的开销,所以 std::condition_variable 一般作为首选的类型,当对灵活性有硬性要求时,我们才会去考虑 std::condition_variable_any 。
使用 std::condition_variable 处理数据等待
#include <iostream> #include <thread> #include <mutex> #include <queue> struct data_chunk { int m; }; struct A { std::mutex mut; std::queue<data_chunk> data_queue; // 1 std::condition_variable data_cond; bool more_data_to_prepare() { return data_queue.size() < 10; } bool is_last_chunk() { return data_queue.size() == 3; } }; int i = 0; data_chunk prepare_data() { data_chunk r; r.m = ++i; return r; } void data_preparation_thread(A &a) { std::cout << "preparation begin"<< std::endl; while (a.more_data_to_prepare()) { const data_chunk data = prepare_data(); std::lock_guard<std::mutex> lk(a.mut); a.data_queue.push(data); // 2 std::cout << "preparation notify" << std::endl; a.data_cond.notify_one(); // 3 } std::cout << "preparation end" << std::endl; } void process(const data_chunk &d) { std::cout << d.m << std::endl; } void data_processing_thread(A &a) { while (true) { std::unique_lock<std::mutex> lk(a.mut); // 4 a.data_cond.wait( lk, [&a] {return !a.data_queue.empty();}); // 5 std::cout << "process wait end" << std::endl; data_chunk data = a.data_queue.front(); a.data_queue.pop(); lk.unlock(); // 6 process(data); if (a.is_last_chunk()) break; } } int main() { A a; std::thread t1{ data_preparation_thread,std::ref(a) }, t2{ data_processing_thread,std::ref(a) }; t1.join(); t2.join(); system("pause"); return 0; }当等待线程重新获取互斥量并检查条件时,如果它并非直接响应另一个线程的通知,这就是所谓的“伪唤醒”(spurious wakeup)。因为任何伪唤醒的数量和频率都是不确定的,这里不建议使用一个有副作用的函数做条件检查。当你这样做了,就必须做好多次产生副作用的心理准备。
用 std::unique_lock 而不使用 std::lock_guard ——等待中的线程必须在等待期间解锁互斥量,并在这这之后对互斥量再次上锁,而 std::lock_guard 没有这么灵活。
解锁 std::unique_lock 的灵活性,不仅适用于对wait()的调用;它还可以用于有待处理但还未处理的数据。
使用条件变量构建线程安全队列
使用条件变量的线程安全队列(完整版),并且融入上一段程序:#include <queue> #include <memory> #include <mutex> #include <condition_variable>//头文件 template<typename T> class threadsafe_queue { private: mutable std::mutex mut; // 1 互斥量必须是可变的 std::queue<T> data_queue; std::condition_variable data_cond; public: threadsafe_queue() {} threadsafe_queue(threadsafe_queue const& other) { std::lock_guard<std::mutex> lk(other.mut); data_queue = other.data_queue; } void push(T new_value) { std::lock_guard<std::mutex> lk(mut); data_queue.push(new_value); data_cond.notify_one(); } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this] {return !data_queue.empty();}); value = data_queue.front(); data_queue.pop(); } std::shared_ptr<T> wait_and_pop() { std::unique_lock<std::mutex> lk(mut); data_cond.wait(lk, [this] {return !data_queue.empty();}); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool try_pop(T& value) { std::lock_guard<std::mutex> lk(mut); if (data_queue.empty()) return false; value = data_queue.front(); data_queue.pop(); return true; } std::shared_ptr<T> try_pop() { std::lock_guard<std::mutex> lk(mut); if (data_queue.empty()) return std::shared_ptr<T>(); std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); data_queue.pop(); return res; } bool empty() const { std::lock_guard<std::mutex> lk(mut); return data_queue.empty(); } }; threadsafe_queue<data_chunk> data_queue; // 1 void data_preparation_thread() { while (more_data_to_prepare()) { data_chunk const data = prepare_data(); data_queue.push(data); // 2 } } void data_processing_thread() { while (true) { data_chunk data; data_queue.wait_and_pop(data); // 3 process(data); if (is_last_chunk(data)) break; } }
使用期望等待一次性事件
C++标准库模型将这种一次性事件称为“期望” (future)。
当事件发生时(并且期望状态为就绪),这个“期望”就不能被重置。
在C++标准库中,有两种“期望”,使用两种类型模板实现,声明在头文件中: 唯一期望(uniquefutures)( std::future<> )和共享期望(shared futures)( std::shared_future<> )。后者的实现中,所有实例会在同时变为就绪状态,并且他们可以访问与事件相关的任何数据。这种数据关联与模板有关,比如 std::unique_ptr 和 std::shared_ptr 的模板参数就是相关联的数据类型。在与数据无关的
地方,可以使用 std::future<void> 与 std::shared_future<void> 的特化模板。
地方,可以使用 std::future<void> 与 std::shared_future<void> 的特化模板。
带返回值的后台任务
当任务的结果你不着急要时,你可以使用 std::async 启动一个异步任务。与 std::thread 对象等待运行方式的不同, std::async 会返回一个 std::future 对象,这个对象持有最终计算出来的结果。当你需要这个值时,你只需要调用这个对象的get()成员函数;并且直到“期望”状态为就绪的情况下,线程才会阻塞;之后,返回计算结果。
使用 std::future 从异步任务中获取返回值:
#include <future> #include <iostream> int find_the_answer_to_ltuae() { std::this_thread::sleep_for(std::chrono::milliseconds(100)); return 10; } void do_other_stuff() { std::this_thread::sleep_for(std::chrono::milliseconds(120)); } int main() { std::future<int> the_answer = std::async(find_the_answer_to_ltuae); do_other_stuff(); std::cout << "The answer is " << the_answer.get() << std::endl; system("pause"); return 0; }std::async 允许你通过添加额外的调用参数,向函数传递额外的参数。当第一个参数是一个指向成员函数的指针,第二个参数提供有这个函数成员类的具体对象(不是直接的,就是通过指针,还可以包装在 std::ref 中),剩余的参数可作为成员函数的参数传入。否则,第二个和随后的参数将作为函数的参数,或作为指定可调用对象的第一个参数。
使用 std::async 向函数传递参数
#include <string> #include <future> #include <iostream> struct X { int m; void foo(int i, std::string const& s) { std::cout << s << "\t" << i << std::endl; } std::string bar(std::string const &s) { return "bar("+s+")"; } }; struct Y { double operator()(double d) { return d + 1.1; } }; X baz(X& _x) { ++_x.m; return _x; } class move_only { public: move_only() = default; move_only(move_only&&) = default; move_only(move_only const&) = delete; move_only& operator=(move_only&&) = default; move_only& operator=(move_only const&) = delete; void operator()() { std::cout << "move_only()" << std::endl; } }; void fun() { X x; auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用p->foo(42, "hello"),p是指向x的指针,指针 auto f2 = std::async(&X::bar, x, "goodbye"); // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本,具体对象 std::cout << f2.get() << std::endl; Y y; auto f3 = std::async(Y(), 3.141); // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到 std::cout << f3.get() << std::endl; auto f4 = std::async(std::ref(y), 2.718); // 调用y(2.718) std::cout << f4.get() << std::endl; x.m = 1; auto f5 = std::async(baz, std::ref(x)); // 调用baz(x) std::cout << f5.get().m << std::endl; auto f6 = std::async(move_only()); // 调用tmp(),tmp是通过std::move(move_only())构造得到 } int main() { fun(); system("pause"); return 0; }在默认情况下,这取决于 std::async 是否启动一个线程,或是否在期望等待时同步任务。在大多数情况下(估计这就是你想要的结果),但是你也可以在函数调用之前,向 std::async 传递一个额外参数。这个参数的类型是 std::launch ,还可以是std::launch::defered ,用来表明函数调用被延迟到wait()或get()函数调用时才执行, std::launch::async 表明函数必须在其所在的独立线程上执行, std::launch::deferred | std::launch::async 表明实现可以选择这两种方式的一种。最后一个选项是默认的。当函数调用被延迟,它可能不会在运行了。
baz函数更改,方便观察效果:
X baz(X& _x,int i) { _x.m=i; std::cout << _x.m<<"调用" << std::endl; return _x; }调用:
auto f7 = std::async(std::launch::async, Y(), 1.2); // 在新线程上执行 std::cout <<"f7\t"<< f7.get() << std::endl; auto f8 = std::async(std::launch::deferred, baz, std::ref(x),2); // 在wait()或get()调用时执行 auto f9 = std::async( std::launch::deferred | std::launch::async, baz, std::ref(x),3); // 实现选择执行方式 std::cout << "f9\t"<<f9.get().m << std::endl; auto f10 = std::async(baz, std::ref(x),4); f8.wait(); // 调用延迟函数,后台运行,此时如果有结果就前行,否则阻塞 std::cout << "f8\t" << f8.get().m << std::endl; std::cout <<"f10\t"<< f10.get().m << std::endl;
这不是让一个 std::future 与一个任务实例相关联的唯一方式;你也可以将任务包装入一个 std::packaged_task<> 实例中,或通过编写代码的方式,使用 std::promise<> 类型模板显示设置值。与 std::promise<> 对比, std::packaged_task<> 具有更高层的抽象,所以我们从“高抽象”的模板说起。
任务与期望
以上是关于Cpp Concurrency In Action(读书笔记3)——同步并发操作的主要内容,如果未能解决你的问题,请参考以下文章
“Concurrency in Action”原子操作示例的正确性
C++并发编程----异常安全的并行算法(《C++ Concurrency in Action》 读书笔记)
C++并发编程----异常安全的并行算法(《C++ Concurrency in Action》 读书笔记)
尝试从“C++ Concurrency in Action”(第 133 页)一书中用原子理解代码示例