如何正确同步这两个线程?
Posted
技术标签:
【中文标题】如何正确同步这两个线程?【英文标题】:How can I syncronize these two threads properly? 【发布时间】:2019-09-25 16:24:16 【问题描述】:我想正确同步不同的线程,但到目前为止我只能编写一个不优雅的解决方案。有人可以指出我如何改进以下代码吗?
typedef void (*func)();
void thread(func func1, func func2, int& has_finished, int& id)
has_finished--;
func1();
has_finished++;
while (has_finished != 0) std::cout << "thread " << id << " waiting\n";
std::cout << "thread" << id << "resuming\n";
func2();
int main()
int has_finished(0), id_one(0), id_two(1);
std::thread t1(thread, fun, fun, std::ref(has_finished), std::ref(id_one));
std::thread t2(thread, fun, fun, std::ref(has_finished), std::ref(id_two));
t1.join();
t2.join();
;
程序的要点由函数thread
描述。该函数由两个std::thread
s 执行。该函数接受两个长时间运行的函数 func1
和 func2
以及两个整数引用作为参数。 线程应该只在所有线程退出func1
后调用func2
。参数has_finished
用于协调不同的线程:进入函数后,has_arguments
为零。然后每个std::thread
递减值并调用长时间运行的函数func1
。离开func1
后,has_finished
再次递增。只要该值不是其原始值零,线程就会等待。然后,每个线程都在func2
上工作。主要功能显示在最后。
如何更好地协调两个线程?我正在考虑使用std::mutex
和std::condition_variable
,但不知道如何正确使用它们?有人知道我可以如何改进这个程序吗?
【问题讨论】:
需要什么协调对我来说并不明显。是否允许所有线程同时执行 func1 和 func2;但是您希望所有线程在任何线程启动 func2 之前完成 func1?如果是这样,请查找屏障同步。如果没有,您能否阐明所需的协调。 没有原始同步:您需要某种屏障或闩锁原语。例如。 Abseil 有absl::BlockingCounter
:godbolt.org/z/N4oexX,Boost 有boost::latch
:wandbox.org/permlink/RLZnqUrtDOoA9o7y。
@mevets :感谢您的评论。事实上,我希望所有线程在开始处理func2
之前完成func1
。我希望我澄清了我的问题。
【参考方案1】:
不要自己写。这种同步被称为“锁存器”(或更一般地称为“屏障”,它可以通过各种库和 C++ 并发 TS 获得。(它也可能以某种形式进入 C++20。)
例如,使用a version from Boost:
#include <iostream>
#include <thread>
#include <boost/thread/latch.hpp>
void f(boost::latch& c)
std::cout << "Doing work in round 1\n";
c.count_down_and_wait();
std::cout << "Doing work in round 2\n";
int main()
boost::latch c(2);
std::thread t1(f, std::ref(c)), t2(f, std::ref(c));
t1.join();
t2.join();
【讨论】:
演示:absl::BlockingCounter
,boost::latch
。【参考方案2】:
您选择的方法实际上不会起作用,并且由于竞争条件而导致未定义的行为。如您所料,您需要一个条件变量。
这是一个Gate
类,演示了如何使用条件变量来实现一个门,该门等待一定数量的线程到达它,然后再继续:
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <sstream>
#include <utility>
#include <cassert>
struct Gate
public:
explicit Gate(unsigned int count = 2) : count_(count) // How many threads need to reach the gate before it unlocks
Gate(Gate const &) = delete;
void operator =(Gate const &) = delete;
void wait_for_gate();
private:
int count_;
::std::mutex count_mutex_;
::std::condition_variable count_gate_;
;
void Gate::wait_for_gate()
::std::unique_lock<::std::mutex> guard(count_mutex_);
assert(count > 0); // Count being 0 here indicates an irrecoverable programming error.
--count_;
count_gate_.wait(guard, [this]() return this-> count_ <= 0; );
guard.unlock();
count_gate_.notify_all();
void f1()
::std::ostringstream msg;
msg << "In f1 with thread " << ::std::this_thread::get_id() << '\n';
::std::cout << msg.str();
void f2()
::std::ostringstream msg;
msg << "In f2 with thread " << ::std::this_thread::get_id() << '\n';
::std::cout << msg.str();
void thread_func(Gate &gate)
f1();
gate.wait_for_gate();
f2();
int main()
Gate gate;
::std::thread t1thread_func, ::std::ref(gate);
::std::thread t2thread_func, ::std::ref(gate);
t1.join();
t2.join();
希望这段代码的结构与您的代码足够相似,您可以理解这里发生了什么。通过阅读您的代码,您似乎正在寻找所有线程来执行func1
,然后是func2
。您不希望在任何线程正在执行 func1
时运行 func2
。
这可以被认为是一个门,所有线程都在等待到达“完成的 func1”位置,然后再继续运行 func2。
我在自己的本地版本的编译器资源管理器上测试了这段代码。
另一个答案中闩锁的主要缺点是它还不是标准的 C++。我的Gate
类是另一个答案中提到的latch 类的简单实现,它是标准C++。
条件变量的基本工作方式是解锁互斥体,等待通知,然后锁定互斥体并测试条件。如果条件为真,它会继续而不解锁互斥锁。如果条件为假,则重新开始。
因此,在条件变量表明条件为真之后,您必须做任何您需要做的事情,然后解锁互斥锁并通知所有人您已经完成了。
这里的互斥锁保护着共享计数变量。每当您有一个共享值时,您都应该使用互斥锁来保护它,以便没有线程可以看到处于不一致状态的该值。条件是线程可以等待该count达到0,说明所有线程都将count变量递减。
【讨论】:
以上是关于如何正确同步这两个线程?的主要内容,如果未能解决你的问题,请参考以下文章