多个线程等待所有线程完成,直到新工作开始
Posted
技术标签:
【中文标题】多个线程等待所有线程完成,直到新工作开始【英文标题】:Multiple threads waiting for all to finish till new work is started 【发布时间】:2021-03-24 15:12:33 【问题描述】:我正在尝试创建一种线程池,它在单独的线程上运行函数,并且仅在所有函数完成后才开始新的迭代。
map<size_t, bool> status_map;
vector<thread> threads;
condition_variable cond;
bool are_all_ready()
mutex m;
unique_lock<mutex> lock(m);
for (const auto& [_, status] : status_map)
if (!status)
return false;
return true;
void do_little_work(size_t id)
this_thread::sleep_for(chrono::seconds(1));
cout << id << " did little work..." << endl;
void do_some_work(size_t id)
this_thread::sleep_for(chrono::seconds(2));
cout << id << " did some work..." << endl;
void do_much_work(size_t id)
this_thread::sleep_for(chrono::seconds(4));
cout << id << " did much work..." << endl;
void run(const function<void(size_t)>& function, size_t id)
while (true)
mutex m;
unique_lock<mutex> lock(m);
cond.wait(lock, are_all_ready);
status_map[id] = false;
cond.notify_all();
function(id);
status_map[id] = true;
cond.notify_all();
int main()
threads.push_back(thread(run, do_little_work, 0));
threads.push_back(thread(run, do_some_work, 1));
threads.push_back(thread(run, do_much_work, 2));
for (auto& thread : threads)
thread.join();
return EXIT_SUCCESS;
我希望得到输出:
0 did little work...
1 did some work...
2 did much work...
0 did little work...
1 did some work...
2 did much work...
.
.
.
在各自的超时之后,但当我运行程序时,我只得到
0 did little work...
0 did little work...
.
.
.
我还不得不说,我对多线程相当陌生,但在我的理解中,condition_variable
应该阻塞每个线程,直到谓词返回 true。在我的情况下,are_all_ready
应该在所有函数都返回后返回 true。
【问题讨论】:
你能把你的例子整理成minimal, reproducible example吗?至少您缺少包含using
指令,并且您已经注释掉了许多分散注意力的代码。
【参考方案1】:
有几种方法可以做到这一点。
在我看来,最简单的是 C++20 std::barrier
,它表示“等到所有 N 个线程都已到达并在此处等待”。
#include <barrier>
std::barrier synch_workers(3);
....
void run(const std::function<void(size_t)>& func, size_t id)
while (true)
synch_workers.arrive_and_wait(); // wait for all three to be ready
func(id);
为每个“批次”工作构建和join()
三个工作线程的新集合是更粗鲁、效率较低但同样有效的方法:
int main(...)
std::vector<thread> threads;
...
while (flag_running)
threads.push_back(...);
threads.push_back(...);
...
for (auto& thread : threads)
thread.join();
threads.clear();
一边
不过,我建议您重新审视一些核心同步概念。当您想重新使用共享的互斥锁时,您正在使用新的互斥锁。 scope of your unique_lock
不太对。
现在,您在 map
中跟踪工作线程“忙碌/空闲”状态的想法很简单,但无法正确协调必须同时开始的“批次”或“轮次”工作。
如果工作人员在map
中看到三个线程中的两个(包括它自己)处于“空闲”状态,这意味着什么? “一批”工作是否已经结束——即两名工人正在等待第三个迟到的工人?还是刚刚开始批处理——即,两个空闲线程延迟了,最好像他们更热心的同行一样开始工作?
如果不跟踪当前的工作批次,线程就无法知道答案,这就是障碍(或其更复杂的表亲 the phaser)在幕后所做的。
【讨论】:
【参考方案2】:按原样,由于同时访问 status_map
,您的程序发生了崩溃 (UB)。
当你这样做时:
void run(const function<void(size_t)>& function, size_t id)
...
mutex m;
unique_lock<mutex> lock(m);
...
status_map[id] = false;
创建的lock
s 是局部变量,每个线程一个,因此是独立的。因此,它不会阻止多个线程一次写入status_map
,从而导致崩溃。这就是我在我的机器上得到的。
现在,如果您将mutex
设为静态,则一次只有一个线程可以访问地图。但这也使得一次只有一个线程运行。有了这个,我看到 0、1 和 2 正在运行,但一次只运行一次,并且前一个线程很容易再次运行。
我的建议,回到绘图板,让它变得更简单。所有线程同时运行,单个互斥锁来保护映射,只锁定互斥锁来访问映射,而且......好吧,事实上,我什至不认为需要条件变量。
例如有什么问题:
#include <thread>
#include <iostream>
#include <vector>
using namespace std;
vector<thread> threads;
void do_little_work(size_t id)
this_thread::sleep_for(chrono::seconds(1));
cout << id << " did little work..." << endl;
void do_some_work(size_t id)
this_thread::sleep_for(chrono::seconds(2));
cout << id << " did some work..." << endl;
void do_much_work(size_t id)
this_thread::sleep_for(chrono::seconds(4));
cout << id << " did much work..." << endl;
void run(const function<void(size_t)>& function, size_t id)
while (true)
function(id);
int main()
threads.push_back(thread(run, do_little_work, 0));
threads.push_back(thread(run, do_some_work, 1));
threads.push_back(thread(run, do_much_work, 2));
for (auto& thread : threads)
thread.join();
return EXIT_SUCCESS;
【讨论】:
那么我必须创建一个互斥锁,然后用这个互斥锁创建所有锁,还是我必须创建一个锁? 我认为简化设计不会奏效。 OP 希望工作线程分批处理,在每个完成一个工作单元后暂停在“循环障碍”(或“倒计时锁存器”)。 我错过了批处理部分。在这种情况下,我认为std::counting_semaphore
会比condition_variable
更好,但无论如何当前的结构都不成立。它需要共享 mutice 来保护共享资源,并在需要计数的地方进行计数。以上是关于多个线程等待所有线程完成,直到新工作开始的主要内容,如果未能解决你的问题,请参考以下文章