Cpp Concurrency In Action(读书笔记3)——同步并发操作

Posted 阳安子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Cpp Concurrency In Action(读书笔记3)——同步并发操作相关的知识,希望对你有一定的参考价值。

等待一个事件或其他条件

第一,它可以持续的检查共享数据标志(用于做保护工作的互斥量),直到另一线程完成工作时对这个标志进行重设。
第二个选择是在等待线程在检查间隙,使用 std::this_thread::sleep_for() 进行周期性的间歇:
#include <iostream>
#include <thread>
#include <mutex>
class wait_test {
	bool flag;
	std::mutex m;
public:
	wait_test(bool _flag):flag(_flag){}
	void setFlag(bool _flag)
	{
		std::unique_lock<std::mutex> lk(m);
		flag = _flag;
	}
	bool getFlag()
	{
		std::unique_lock<std::mutex> lk(m);
		return flag;
	}
	void wait_for_flag()
	{
		std::unique_lock<std::mutex> lk(m);
		while (!flag)
		{
			lk.unlock(); // 1 解锁互斥量
			std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms
			lk.lock(); // 3 再锁互斥量
		}
	}
};
void funA(wait_test &wt, int &i)
{
	while (!wt.getFlag())
	{
		++i;
	}
}
void funB(wait_test &wt, int &i)
{
	std::cout << "begin\t" << i << std::endl;
	wt.wait_for_flag();//等待主线程set
	std::cout << "end\t" << i << std::endl;
}
int main()
{
	wait_test wt{ false };
	int i{ 0 };
	std::thread t1{ funA,std::ref(wt),std::ref(i) }, t2{ funB,std::ref(wt),std::ref(i) };
	t1.detach();
	t2.detach();
	wt.setFlag(true);
	system("pause");
	return 0;
}
第三个选择(也是优先的选择)是,使用C++标准库提供的工具去等待事件的发生。通过另一线程触发等待事件的机制是最基本的唤醒方式(例如:流水线上存在额外的任务时),这种机制就称为“条件变量”(condition variable)。从概念上来说,一个条件变量会与多个事件或其他条件相关,并且一个或多个线程会等待条件的达成。当某些线程被终止时,为了唤醒等待线程(允许等待线程继续执行)终止的线程将会向等待着的线程广播“条件达成”的信息。

等待条件达成

C++标准库对条件变量有两套实现: std::condition_variable 和 std::condition_variable_any 。这两个实现都包含在 <condition_variable> 头文件的声明中。两者都需要与一个互斥量一起才能工作(互斥量是为了同步);前者仅限于与 std::mutex 一起工作,而后者可以和任何满足最低标准的互斥量一起工作,从而加上了_any的后缀。因为 std::condition_variable_any 更加通用,这就可能从体积、性能,以及系统资源的使用方面产生额外的开销,所以 std::condition_variable 一般作为首选的类型,当对灵活性有硬性要求时,我们才会去考虑 std::condition_variable_any 。
使用 std::condition_variable 处理数据等待
#include <iostream>
#include <thread>
#include <mutex>
#include <queue>
struct data_chunk {
	int m;
};
struct A {
	std::mutex mut;
	std::queue<data_chunk> data_queue; // 1
	std::condition_variable data_cond;
	bool more_data_to_prepare()
	{
		return data_queue.size() < 10;
	}
	bool is_last_chunk()
	{
		return data_queue.size() == 3;
	}
};
int i = 0;
data_chunk prepare_data()
{
	data_chunk r;
	r.m = ++i;
	return r;
}
void data_preparation_thread(A &a)
{
	std::cout << "preparation  begin"<< std::endl;
	while (a.more_data_to_prepare())
	{
		const data_chunk data = prepare_data();
		std::lock_guard<std::mutex> lk(a.mut);
		a.data_queue.push(data); // 2
		std::cout << "preparation notify" << std::endl;
		a.data_cond.notify_one(); // 3
	}
	std::cout << "preparation  end" << std::endl;
}
void process(const data_chunk &d)
{
	std::cout << d.m << std::endl;
}
void data_processing_thread(A &a)
{
	while (true)
	{
		std::unique_lock<std::mutex> lk(a.mut); // 4
		a.data_cond.wait(
			lk, [&a] {return !a.data_queue.empty();}); // 5
		std::cout << "process  wait end" << std::endl;
		data_chunk data = a.data_queue.front();
		a.data_queue.pop();
		lk.unlock(); // 6
		process(data);
		if (a.is_last_chunk())
			break;
	}
}
int main()
{
	A a;
	std::thread t1{ data_preparation_thread,std::ref(a) },
		t2{ data_processing_thread,std::ref(a) };
	t1.join();
	t2.join();
	system("pause");
	return 0;
}
当等待线程重新获取互斥量并检查条件时,如果它并非直接响应另一个线程的通知,这就是所谓的“伪唤醒”(spurious wakeup)。因为任何伪唤醒的数量和频率都是不确定的,这里不建议使用一个有副作用的函数做条件检查。当你这样做了,就必须做好多次产生副作用的心理准备。
用 std::unique_lock 而不使用 std::lock_guard ——等待中的线程必须在等待期间解锁互斥量,并在这这之后对互斥量再次上锁,而 std::lock_guard 没有这么灵活。
解锁 std::unique_lock 的灵活性,不仅适用于对wait()的调用;它还可以用于有待处理但还未处理的数据。

使用条件变量构建线程安全队列

使用条件变量的线程安全队列(完整版),并且融入上一段程序:
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>//头文件
template<typename T>
class threadsafe_queue
{
private:
	mutable std::mutex mut; // 1 互斥量必须是可变的
	std::queue<T> data_queue;
	std::condition_variable data_cond;
public:
	threadsafe_queue()
	{}
	threadsafe_queue(threadsafe_queue const& other)
	{
		std::lock_guard<std::mutex> lk(other.mut);
		data_queue = other.data_queue;
	}
	void push(T new_value)
	{
		std::lock_guard<std::mutex> lk(mut);
		data_queue.push(new_value);
		data_cond.notify_one();
	}
	void wait_and_pop(T& value)
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this] {return !data_queue.empty();});
		value = data_queue.front();
		data_queue.pop();
	}
	std::shared_ptr<T> wait_and_pop()
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this] {return !data_queue.empty();});
		std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
		data_queue.pop();
		return res;
	}
	bool try_pop(T& value)
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return false;
		value = data_queue.front();
		data_queue.pop();
		return true;
	}
	std::shared_ptr<T> try_pop()
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return std::shared_ptr<T>();
		std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
		data_queue.pop();
		return res;
	}
	bool empty() const
	{
		std::lock_guard<std::mutex> lk(mut);
		return data_queue.empty();
	}
};
threadsafe_queue<data_chunk> data_queue; // 1
void data_preparation_thread()
{
	while (more_data_to_prepare())
	{
		data_chunk const data = prepare_data();
		data_queue.push(data); // 2
	}
}
void data_processing_thread()
{
	while (true)
	{
		data_chunk data;
		data_queue.wait_and_pop(data); // 3
		process(data);
		if (is_last_chunk(data))
			break;
	}
}

使用期望等待一次性事件

C++标准库模型将这种一次性事件称为“期望” (future)。
当事件发生时(并且期望状态为就绪),这个“期望”就不能被重置。
在C++标准库中,有两种“期望”,使用两种类型模板实现,声明在头文件中: 唯一期望(uniquefutures)( std::future<> )和共享期望(shared futures)( std::shared_future<> )。后者的实现中,所有实例会在同时变为就绪状态,并且他们可以访问与事件相关的任何数据。这种数据关联与模板有关,比如 std::unique_ptr 和 std::shared_ptr 的模板参数就是相关联的数据类型。在与数据无关的
地方,可以使用 std::future<void> 与 std::shared_future<void> 的特化模板。


带返回值的后台任务

当任务的结果你不着急要时,你可以使用 std::async 启动一个异步任务。与 std::thread 对象等待运行方式的不同, std::async 会返回一个 std::future 对象,这个对象持有最终计算出来的结果。当你需要这个值时,你只需要调用这个对象的get()成员函数;并且直到“期望”状态为就绪的情况下,线程才会阻塞;之后,返回计算结果。
使用 std::future 从异步任务中获取返回值:
#include <future>
#include <iostream>
int find_the_answer_to_ltuae()
{
	std::this_thread::sleep_for(std::chrono::milliseconds(100));
	return 10;
}
void do_other_stuff()
{
	std::this_thread::sleep_for(std::chrono::milliseconds(120));
}
int main()
{
	std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
	do_other_stuff();
	std::cout << "The answer is " << the_answer.get() << std::endl;
	system("pause");
	return 0;
}
std::async 允许你通过添加额外的调用参数,向函数传递额外的参数。当第一个参数是一个指向成员函数的指针,第二个参数提供有这个函数成员类的具体对象(不是直接的,就是通过指针,还可以包装在 std::ref 中),剩余的参数可作为成员函数的参数传入。否则,第二个和随后的参数将作为函数的参数,或作为指定可调用对象的第一个参数。
使用 std::async 向函数传递参数
#include <string>
#include <future>
#include <iostream>
struct X
{
	int m;
	void foo(int i, std::string const& s)
	{
		std::cout << s << "\t" << i << std::endl;
	}
	std::string bar(std::string const &s)
	{
		return "bar("+s+")";
	}
};
struct Y
{
	double operator()(double d)
	{
		return d + 1.1;
	}
};
X baz(X& _x)
{
	++_x.m;
	return _x;
}
class move_only
{
public:
	move_only() = default;
	move_only(move_only&&) = default;
	move_only(move_only const&) = delete;
	move_only& operator=(move_only&&) = default;
	move_only& operator=(move_only const&) = delete;
	void operator()()
	{
		std::cout << "move_only()" << std::endl;
	}
};
void fun()
{
	X x;
	auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用p->foo(42, "hello"),p是指向x的指针,指针
	auto f2 = std::async(&X::bar, x, "goodbye"); // 调用tmpx.bar("goodbye"), tmpx是x的拷贝副本,具体对象
	std::cout << f2.get() << std::endl;
	Y y;
	auto f3 = std::async(Y(), 3.141); // 调用tmpy(3.141),tmpy通过Y的移动构造函数得到
	std::cout << f3.get() << std::endl;
	auto f4 = std::async(std::ref(y), 2.718); // 调用y(2.718)
	std::cout << f4.get() << std::endl;
	x.m = 1;
	auto f5 = std::async(baz, std::ref(x)); // 调用baz(x)
	std::cout << f5.get().m << std::endl;
	auto f6 = std::async(move_only()); // 调用tmp(),tmp是通过std::move(move_only())构造得到
}
int main()
{
	fun();
	system("pause");
	return 0;
}
在默认情况下,这取决于 std::async 是否启动一个线程,或是否在期望等待时同步任务。在大多数情况下(估计这就是你想要的结果),但是你也可以在函数调用之前,向 std::async 传递一个额外参数。这个参数的类型是 std::launch ,还可以是std::launch::defered ,用来表明函数调用被延迟到wait()或get()函数调用时才执行, std::launch::async 表明函数必须在其所在的独立线程上执行, std::launch::deferred | std::launch::async 表明实现可以选择这两种方式的一种。最后一个选项是默认的。当函数调用被延迟,它可能不会在运行了。
baz函数更改,方便观察效果:
X baz(X& _x,int i)
{
	_x.m=i;
	std::cout << _x.m<<"调用" << std::endl;
	return _x;
}
调用:
	auto f7 = std::async(std::launch::async, Y(), 1.2); // 在新线程上执行
	std::cout <<"f7\t"<< f7.get() << std::endl;
	auto f8 = std::async(std::launch::deferred, baz, std::ref(x),2); // 在wait()或get()调用时执行
	auto f9 = std::async(
		std::launch::deferred | std::launch::async,
		baz, std::ref(x),3); // 实现选择执行方式
	
	std::cout << "f9\t"<<f9.get().m << std::endl;
	auto f10 = std::async(baz, std::ref(x),4);
	f8.wait(); // 调用延迟函数,后台运行,此时如果有结果就前行,否则阻塞
	std::cout << "f8\t" << f8.get().m << std::endl;
	std::cout <<"f10\t"<< f10.get().m << std::endl;	


这不是让一个 std::future 与一个任务实例相关联的唯一方式;你也可以将任务包装入一个 std::packaged_task<> 实例中,或通过编写代码的方式,使用 std::promise<> 类型模板显示设置值。与 std::promise<> 对比, std::packaged_task<> 具有更高层的抽象,所以我们从“高抽象”的模板说起。

任务与期望



以上是关于Cpp Concurrency In Action(读书笔记3)——同步并发操作的主要内容,如果未能解决你的问题,请参考以下文章

“Concurrency in Action”原子操作示例的正确性

C++并发编程----异常安全的并行算法(《C++ Concurrency in Action》 读书笔记)

C++并发编程----异常安全的并行算法(《C++ Concurrency in Action》 读书笔记)

尝试从“C++ Concurrency in Action”(第 133 页)一书中用原子理解代码示例

C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)

C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)