每个条件唤醒多个线程工作一次

Posted

技术标签:

【中文标题】每个条件唤醒多个线程工作一次【英文标题】:Waking up multiple threads to work once per condition 【发布时间】:2020-05-26 23:35:07 【问题描述】:

我有一种情况,一个线程需要偶尔唤醒多个工作线程,每个工作线程需要(仅)完成一次工作,然后返回睡眠状态等待下一个通知。我正在使用 condition_variable 唤醒一切,但我遇到的问题是“只有一次”部分。假设每个线程的创建都很繁重,所以我不想每次都创建和加入它们。

// g++ -Wall -o threadtest -pthread threadtest.cpp
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>

std::mutex condMutex;
std::condition_variable condVar;
bool dataReady = false;

void state_change_worker(int id)

    while (1)
    
        
            std::unique_lock<std::mutex> lck(condMutex);
            condVar.wait(lck, []  return dataReady; );
            // Do work only once.
            std::cout << "thread " << id << " working\n";
        
    


int main()

    // Create some worker threads.
    std::thread threads[5];
    for (int i = 0; i < 5; ++i)
        threads[i] = std::thread(state_change_worker, i);

    while (1)
    
        // Signal to the worker threads to work.
        
            std::cout << "Notifying threads.\n";
            std::unique_lock<std::mutex> lck(condMutex);
            dataReady = true;
            condVar.notify_all();
        
        // It would be really great if I could wait() on all of the 
        // worker threads being done with their work here, but it's 
        // not strictly necessary.
        std::cout << "Sleep for a bit.\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    

更新:这是一个实现了一个几乎但不完全有效的小队锁版本。问题是我不能保证每个线程在再次运行之前都有机会唤醒并在 waitForLeader() 中减少计数。

// g++ -Wall -o threadtest -pthread threadtest.cpp
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>

class SquadLock

public:
    void waitForLeader()
    
        
            // Increment count to show that we are waiting in queue.
            // Also, if we are the thread that reached the target, signal
            // to the leader that everything is ready.
            std::unique_lock<std::mutex> count_lock(count_mutex_);
            std::unique_lock<std::mutex> target_lock(target_mutex_);
            if (++count_ >= target_)
                count_cond_.notify_one();
        
        // Wait for leader to signal done.
        std::unique_lock<std::mutex> lck(done_mutex_);
        done_cond_.wait(lck, [&]  return done_; );
        
            // Decrement count to show that we are no longer waiting.
            // If we are the last thread set done to false.
            std::unique_lock<std::mutex> lck(count_mutex_);
            if (--count_ == 0)
            
                done_ = false;
            
        
    

    void waitForHerd()
    
        std::unique_lock<std::mutex> lck(count_mutex_);
        count_cond_.wait(lck, [&]  return count_ >= target_; );
    
    void leaderDone()
    
        std::unique_lock<std::mutex> lck(done_mutex_);
        done_ = true;
        done_cond_.notify_all();
    
    void incrementTarget()
    
        std::unique_lock<std::mutex> lck(target_mutex_);
        ++target_;
    
    void decrementTarget()
    
        std::unique_lock<std::mutex> lck(target_mutex_);
        --target_;
    
    void setTarget(int target)
    
        std::unique_lock<std::mutex> lck(target_mutex_);
        target_ = target;
    

private:
    // Condition variable to indicate that the leader is done.
    std::mutex done_mutex_;
    std::condition_variable done_cond_;
    bool done_ = false;

    // Count of currently waiting tasks.
    std::mutex count_mutex_;
    std::condition_variable count_cond_;
    int count_ = 0;

    // Target number of tasks ready for the leader.
    std::mutex target_mutex_;
    int target_ = 0;
;

SquadLock squad_lock;
std::mutex print_mutex;
void state_change_worker(int id)

    while (1)
    
        // Wait for the leader to signal that we are ready to work.
        squad_lock.waitForLeader();
        
            // Adding just a bit of sleep here makes it so that every thread wakes up, but that isn't the right way.
            // std::this_thread::sleep_for(std::chrono::milliseconds(100));
            std::unique_lock<std::mutex> lck(print_mutex);
            std::cout << "thread " << id << " working\n";
        
    


int main()


    // Create some worker threads and increment target for each one
    // since we want to wait until all threads are finished.
    std::thread threads[5];
    for (int i = 0; i < 5; ++i)
    
        squad_lock.incrementTarget();
        threads[i] = std::thread(state_change_worker, i);
    
    while (1)
    
        // Signal to the worker threads to work.
        std::cout << "Starting threads.\n";
        squad_lock.leaderDone();
        // Wait for the worked threads to be done.
        squad_lock.waitForHerd();
        // Wait until next time, processing results.
        std::cout << "Tasks done, waiting for next time.\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    

【问题讨论】:

如果您通知所有线程并且从不重置dataReady,而您希望您的代码按预期工作。在实践中,使用计数器而不是布尔值会更容易,因为当你有事要做时你会加一,而在工作时会减一。最后,在您的工作人员中,所有代码都在锁定中。 【参考方案1】:

以下是我创建的有关并发设计模式的博客的摘录。模式使用 Ada 语言表达,但概念可以翻译成 C++。

总结

许多应用程序是由一组协作的执行线程构成的。从历史上看,这经常通过创建一组合作流程来实现。这些过程将通过共享数据进行合作。起初,只有文件用于共享数据。文件共享提出了一些有趣的问题。如果一个进程正在写入文件而另一个进程从文件中读取,您将经常遇到数据损坏,因为读取进程可能会在写入进程完全写入信息之前尝试读取数据。用于此的解决方案是创建文件锁,以便一次只有一个进程可以打开文件。 Unix 引入了 Pipe 的概念,它实际上是一个数据队列。一个进程可以写入管道,而另一个进程从管道读取。操作系统将管道中的数据视为一系列字节。在写入进程完成对数据的操作之前,它不会让读取进程访问特定字节的数据。 各种操作系统还引入了允许进程共享数据的其他机制。示例包括消息队列、套接字和共享内存。还有一些特殊功能可以帮助程序员控制对数据的访问,例如信号量。当操作系统引入单个进程操作多个执行线程的能力时,也称为轻量级线程,或者只是线程,它们还必须为共享数据提供相应的锁定机制。 经验表明,虽然共享数据的可能设计种类繁多,但经常出现一些非常常见的设计模式。具体来说,锁或信号量有一些变化,数据缓冲也有一些变化。本文探讨了监视器上下文中线程的锁定和缓冲设计模式。尽管监视器可以用多种语言实现,但本文中的所有示例都使用 Ada 保护类型。 Ada 保护类型是一个非常彻底的监视器实现。

监视器

有几种创建和控制共享内存的理论方法。最灵活和最强大的监视器之一是 C.A.R. 首次描述的监视器。霍尔。监视器是具有三种不同操作的数据对象。

过程用于更改监视器包含的状态或值。当线程调用监视器过程时,该线程必须对监视器具有独占访问权限,以防止其他线程遇到损坏或部分写入的数据。

条目与过程类似,用于更改监视器所包含的状态或值,但条目也指定了边界条件。该条目只能在边界条件为真时执行。当边界条件为假时调用条目的线程被放入队列中,直到边界条件变为真。例如,条目用于允许线程从共享缓冲区中读取。在缓冲区实际包含一些数据之前,不允许读取线程读取数据。边界条件是缓冲区不能为空。条目(如过程)必须具有对监视器数据的独占访问权限。

函数用于报告监视器的状态。由于函数只报告状态,不改变状态,它们不需要独占访问监视器的数据。许多线程可以通过函数同时访问同一个监视器,而不会造成数据损坏。

监视器的概念非常强大。它也可以非常有效。监视器提供了为线程系统设计高效和健壮的共享数据结构所需的所有功能。 尽管监视器功能强大,但它们确实有一些局限性。在监视器上执行的操作应该非常快,不会造成线程阻塞。如果这些操作被阻止,监视器将成为路障而不是通信工具。只要监视器操作被阻塞,所有等待访问监视器的线程都会被阻塞。出于这个原因,有些人选择不使用显示器。有一些监视器设计模式实际上可以用来解决这些问题。这些设计模式被归类为锁定模式。

小队锁

班锁允许特殊任务(班长)监控一群或一组工人任务的进度。当所有(或足够数量的)工作任务完成了他们工作的某个方面,并且领导者准备好继续进行时,整个任务集被允许通过障碍并继续他们的下一个活动序列。目的是允许任务异步执行,同时通过一组复杂的活动协调它们的进度。

package Barriers is
   protected type Barrier(Trigger : Positive) is
      entry Wait_For_Leader; 
      entry Wait_For_Herd; 
      procedure Leader_Done; 
   private
      Done : Boolean := False;
   end Barrier;

   protected type Autobarrier(Trigger : Positive) is
      entry Wait_For_Leader; 
      entry Wait_For_Herd; 
   private
      Done : Boolean := False;
   end Autobarrier;
end Barriers;

这个包展示了两种小队锁。屏障保护类型演示了基本的小队锁。牛群调用 Wait_For_Leader,领导者调用 Wait_For_Herd,然后是 Leader_Done。 Autobarrier 展示了一个更简单的界面。牛群调用 Wait_For_Leader,领导者调用 Wait_For_Herd。在创建任一类型屏障的实例时使用 Trigger 参数。它设置了领导者在继续之前必须等待的最小羊群任务数。

package body Barriers is
   protected body Barrier is
      entry Wait_For_Herd when Wait_For_Leader'Count >= Trigger is
      begin
         null;
      end Wait_For_Herd;

      entry Wait_For_Leader when Done is
      begin
         if Wait_For_Leader'Count = 0 then
            Done := False;
         end if;
      end Wait_For_Leader;

      procedure Leader_Done is
      begin
         Done := True;
      end Leader_Done;
   end Barrier;

   protected body Autobarrier is
      entry Wait_For_Herd when Wait_For_Leader'Count >= Trigger is
      begin
         Done := True;
      end Wait_For_Herd;

      entry Wait_For_Leader when Done is
      begin
         if Wait_For_Leader'Count = 0 then
            Done := False;
         end if;
      end Wait_For_Leader;
   end Autobarrier;
end Barriers;

【讨论】:

小队锁看起来确实很有趣,但是我对 Ada 不够熟悉,无法遵循逻辑。你有 c++ 版本吗? 我没有 C++ 版本。所有操作都是在调用条目或函数时隐式完成的。受保护的对象(防止竞争条件)自己处理它的锁。每个条目都有一个关联的条目队列,其中任务(线程)被挂起,直到边界条件评估为 TRUE。 Wait_For_Herd 的边界条件是进入队列中挂起的任务数。当该计数 >= Trigger 时,条件变为 TRUE。当变量 Done 为 TRUE 时,Wait_For_Leader 的边界条件为 TRUE。过程 Leader_Done 只是将 Done 设置为 True。 主控任务组在完成他们的直接工作后调用 Wait_For_Leader。如果没有挂起的任务,Wait_For_Leader 将 Done 设置为 FALSE。换句话说,最后激活的任务会关上门,直到主机将 Done 设置为 True。所有调用 Wait_For_Leader 的任务都挂起,直到 Done 为 TRUE。 master 等待直到 Trigger 任务在他们的队列中等待,让他们执行他们的下一个活动。 Ada 代码的 C++ 实现将从创建锁类开始。该类将实现私有条目队列以及在 Ada 代码中记录为条目和过程的公共函数。所有低级锁调用和/或条件变量都将在公共函数中实现。调用锁对象的线程不需要显式调用低级锁定函数。

以上是关于每个条件唤醒多个线程工作一次的主要内容,如果未能解决你的问题,请参考以下文章

POSIX消息队列 - mq_send线程唤醒命令

JAVA线程虚假唤醒

线程通信

通过等待条件等待多个正在运行的线程

多线程的交互

主程序如何通过等待而不是加入同时等待多个线程?