如何正确同步这两个线程?

Posted

技术标签:

【中文标题】如何正确同步这两个线程?【英文标题】:How can I syncronize these two threads properly? 【发布时间】:2019-09-25 16:24:16 【问题描述】:

我想正确同步不同的线程,但到目前为止我只能编写一个不优雅的解决方案。有人可以指出我如何改进以下代码吗?

typedef void (*func)();

void thread(func func1, func func2, int& has_finished, int& id) 
    has_finished--;
    func1();
    has_finished++;
    while (has_finished != 0) std::cout << "thread " << id << " waiting\n";
    std::cout << "thread" << id << "resuming\n";
    func2();


int main() 
    int has_finished(0), id_one(0), id_two(1);
    std::thread t1(thread, fun, fun, std::ref(has_finished), std::ref(id_one));
    std::thread t2(thread, fun, fun, std::ref(has_finished), std::ref(id_two));
    t1.join();
    t2.join();
;

程序的要点由函数thread 描述。该函数由两个std::threads 执行。该函数接受两个长时间运行的函数 func1func2 以及两个整数引用作为参数。 线程应该只在所有线程退出func1 后调用func2。参数has_finished 用于协调不同的线程:进入函数后,has_arguments 为零。然后每个std::thread 递减值并调用长时间运行的函数func1。离开func1 后,has_finished 再次递增。只要该值不是其原始值零,线程就会等待。然后,每个线程都在func2 上工作。主要功能显示在最后。

如何更好地协调两个线程?我正在考虑使用std::mutexstd::condition_variable,但不知道如何正确使用它们?有人知道我可以如何改进这个程序吗?

【问题讨论】:

需要什么协调对我来说并不明显。是否允许所有线程同时执行 func1 和 func2;但是您希望所有线程在任何线程启动 func2 之前完成 func1?如果是这样,请查找屏障同步。如果没有,您能否阐明所需的协调。 没有原始同步:您需要某种屏障或闩锁原语。例如。 Abseil 有absl::BlockingCounter:godbolt.org/z/N4oexX,Boost 有boost::latch:wandbox.org/permlink/RLZnqUrtDOoA9o7y。 @mevets :感谢您的评论。事实上,我希望所有线程在开始处理func2 之前完成func1。我希望我澄清了我的问题。 【参考方案1】:

不要自己写。这种同步被称为“锁存器”(或更一般地称为“屏障”,它可以通过各种库和 C++ 并发 TS 获得。(它也可能以某种形式进入 C++20。)

例如,使用a version from Boost:

#include <iostream>
#include <thread>

#include <boost/thread/latch.hpp>

void f(boost::latch& c) 
    std::cout << "Doing work in round 1\n";
    c.count_down_and_wait();
    std::cout << "Doing work in round 2\n";


int main() 
    boost::latch c(2);

    std::thread t1(f, std::ref(c)), t2(f, std::ref(c));
    t1.join();
    t2.join();

【讨论】:

演示:absl::BlockingCounterboost::latch【参考方案2】:

您选择的方法实际上不会起作用,并且由于竞争条件而导致未定义的行为。如您所料,您需要一个条件变量。

这是一个Gate 类,演示了如何使用条件变量来实现一个门,该门等待一定数量的线程到达它,然后再继续:

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <sstream>
#include <utility>
#include <cassert>

struct Gate 
 public:
    explicit Gate(unsigned int count = 2) : count_(count)    // How many threads need to reach the gate before it unlocks
    Gate(Gate const &) = delete;
    void operator =(Gate const &) = delete;

    void wait_for_gate();

 private:
    int count_;
    ::std::mutex count_mutex_;
    ::std::condition_variable count_gate_;
;

void Gate::wait_for_gate()

    ::std::unique_lock<::std::mutex> guard(count_mutex_);
    assert(count > 0); // Count being 0 here indicates an irrecoverable programming error.
    --count_;
    count_gate_.wait(guard, [this]() return this-> count_ <= 0; );
    guard.unlock();
    count_gate_.notify_all();


void f1()

    ::std::ostringstream msg;
    msg << "In f1 with thread " << ::std::this_thread::get_id() << '\n';
    ::std::cout << msg.str();


void f2()

    ::std::ostringstream msg;
    msg << "In f2 with thread " << ::std::this_thread::get_id() << '\n';
    ::std::cout << msg.str();


void thread_func(Gate &gate)

    f1();
    gate.wait_for_gate();
    f2();


int main()

    Gate gate;
    ::std::thread t1thread_func, ::std::ref(gate);
    ::std::thread t2thread_func, ::std::ref(gate);
    t1.join();
    t2.join();

希望这段代码的结构与您的代码足够相似,您可以理解这里发生了什么。通过阅读您的代码,您似乎正在寻找所有线程来执行func1,然后是func2。您不希望在任何线程正在执行 func1 时运行 func2

这可以被认为是一个门,所有线程都在等待到达“完成的 func1”位置,然后再继续运行 func2。

我在自己的本地版本的编译器资源管理器上测试了这段代码。

另一个答案中闩锁的主要缺点是它还不是标准的 C++。我的Gate 类是另一个答案中提到的latch 类的简单实现,它是标准C++。

条件变量的基本工作方式是解锁互斥体,等待通知,然后锁定互斥体并测试条件。如果条件为真,它会继续而不解锁互斥锁。如果条件为假,则重新开始。

因此,在条件变量表明条件为真之后,您必须做任何您需要做的事情,然后解锁互斥锁并通知所有人您已经完成了。

这里的互斥锁保护着共享计数变量。每当您有一个共享值时,您都应该使用互斥锁来保护它,以便没有线程可以看到处于不一致状态的该值。条件是线程可以等待该count达到0,说明所有线程都将count变量递减。

【讨论】:

以上是关于如何正确同步这两个线程?的主要内容,如果未能解决你的问题,请参考以下文章

如何在特定的代码行同步两个 CPU 线程?

同步两个线程

Java和同步两个线程

线程同步

线程2---异步1

#yyds干货盘点# Exchanger详解