多个线程等待所有线程完成,直到新工作开始

Posted

技术标签:

【中文标题】多个线程等待所有线程完成,直到新工作开始【英文标题】:Multiple threads waiting for all to finish till new work is started 【发布时间】:2021-03-24 15:12:33 【问题描述】:

我正在尝试创建一种线程池,它在单独的线程上运行函数,并且仅在所有函数完成后才开始新的迭代。

map<size_t, bool> status_map;
vector<thread> threads;
condition_variable cond;

bool are_all_ready() 
  mutex m;
  unique_lock<mutex> lock(m);
  for (const auto& [_, status] : status_map) 
    if (!status) 
      return false;
    
  
  return true;


void do_little_work(size_t id) 
  this_thread::sleep_for(chrono::seconds(1));
  cout << id << " did little work..." << endl;


void do_some_work(size_t id) 
  this_thread::sleep_for(chrono::seconds(2));
  cout << id << " did some work..." << endl;


void do_much_work(size_t id) 
  this_thread::sleep_for(chrono::seconds(4));
  cout << id << " did much work..." << endl;


void run(const function<void(size_t)>& function, size_t id) 
  while (true) 
    mutex m;
    unique_lock<mutex> lock(m);

    cond.wait(lock, are_all_ready);

    status_map[id] = false;
    cond.notify_all();

    function(id);

    status_map[id] = true;
    cond.notify_all();
  

 
int main() 
  threads.push_back(thread(run, do_little_work, 0));
  threads.push_back(thread(run, do_some_work, 1));
  threads.push_back(thread(run, do_much_work, 2));

  for (auto& thread : threads) 
    thread.join();
  

  return EXIT_SUCCESS;


我希望得到输出:

0 did little work...
1 did some work...
2 did much work...
0 did little work...
1 did some work...
2 did much work...
        .
        .
        .

在各自的超时之后,但当我运行程序时,我只得到

0 did little work...
0 did little work...
        .
        .
        .

我还不得不说,我对多线程相当陌生,但在我的理解中,condition_variable 应该阻塞每个线程,直到谓词返回 true。在我的情况下,are_all_ready 应该在所有函数都返回后返回 true。

【问题讨论】:

你能把你的例子整理成minimal, reproducible example吗?至少您缺少包含 using 指令,并且您已经注释掉了许多分散注意力的代码。 【参考方案1】:

有几种方法可以做到这一点。

在我看来,最简单的是 C++20 std::barrier,它表示“等到所有 N 个线程都已到达并在此处等待”。

#include <barrier>

std::barrier synch_workers(3);
....
void run(const std::function<void(size_t)>& func, size_t id) 
  while (true) 
    synch_workers.arrive_and_wait(); // wait for all three to be ready
    func(id);
  

为每个“批次”工作构建和join() 三个工作线程的新集合是更粗鲁、效率较低但同样有效的方法:

int main(...) 
  std::vector<thread> threads;
  ...
  while (flag_running) 
    threads.push_back(...);
    threads.push_back(...);
    ...
    for (auto& thread : threads) 
      thread.join();
    
    threads.clear();
  

一边

不过,我建议您重新审视一些核心同步概念。当您想重新使用共享的互斥锁时,您正在使用新的互斥锁。 scope of your unique_lock 不太对。

现在,您在 map 中跟踪工作线程“忙碌/空闲”状态的想法很简单,但无法正确协调必须同时开始的“批次”或“轮次”工作。

如果工作人员在map 中看到三个线程中的两个(包括它自己)处于“空闲”状态,这意味着什么? “一批”工作是否已经结束——即两名工人正在等待第三个迟到的工人?还是刚刚开始批处理——即,两个空闲线程延迟了,最好像他们更热心的同行一样开始工作?

如果不跟踪当前的工作批次,线程就无法知道答案,这就是障碍(或其更复杂的表亲 the phaser)在幕后所做的。

【讨论】:

【参考方案2】:

按原样,由于同时访问 status_map,您的程序发生了崩溃 (UB)。

当你这样做时:

void run(const function<void(size_t)>& function, size_t id)

...
    mutex m;
    unique_lock<mutex> lock(m);
...
    status_map[id] = false;

创建的locks 是局部变量,每个线程一个,因此是独立的。因此,它不会阻止多个线程一次写入status_map,从而导致崩溃。这就是我在我的机器上得到的。

现在,如果您将mutex 设为静态,则一次只有一个线程可以访问地图。但这也使得一次只有一个线程运行。有了这个,我看到 0、1 和 2 正在运行,但一次只运行一次,并且前一个线程很容易再次运行。

我的建议,回到绘图板,让它变得更简单。所有线程同时运行,单个互斥锁来保护映射,只锁定互斥锁来访问映射,而且......好吧,事实上,我什至不认为需要条件变量。

例如有什么问题:

#include <thread>
#include <iostream>
#include <vector>

using namespace std;

vector<thread> threads;

void do_little_work(size_t id) 
  this_thread::sleep_for(chrono::seconds(1));
  cout << id << " did little work..." << endl;


void do_some_work(size_t id) 
  this_thread::sleep_for(chrono::seconds(2));
  cout << id << " did some work..." << endl;


void do_much_work(size_t id) 
  this_thread::sleep_for(chrono::seconds(4));
  cout << id << " did much work..." << endl;


void run(const function<void(size_t)>& function, size_t id) 
  while (true) 
    function(id);
  


int main() 
  threads.push_back(thread(run, do_little_work, 0));
  threads.push_back(thread(run, do_some_work, 1));
  threads.push_back(thread(run, do_much_work, 2));

  for (auto& thread : threads) 
    thread.join();
  

  return EXIT_SUCCESS;

【讨论】:

那么我必须创建一个互斥锁,然后用这个互斥锁创建所有锁,还是我必须创建一个锁? 我认为简化设计不会奏效。 OP 希望工作线程分批处理,在每个完成一个工作单元后暂停在“循环障碍”(或“倒计时锁存器”)。 我错过了批处理部分。在这种情况下,我认为std::counting_semaphore 会比condition_variable 更好,但无论如何当前的结构都不成立。它需要共享 mutice 来保护共享资源,并在需要计数的地方进行计数。

以上是关于多个线程等待所有线程完成,直到新工作开始的主要内容,如果未能解决你的问题,请参考以下文章

通过等待条件等待多个正在运行的线程

Java并发工具类等待多线程完成的CountDownLatch

pthread_join - 多个线程等待

创建多个线程并等待所有线程完成

主程序如何通过等待而不是加入同时等待多个线程?

java多线程 -- CountDownLatch 闭锁