C++ Fork Join 并行阻塞

Posted

技术标签:

【中文标题】C++ Fork Join 并行阻塞【英文标题】:C++ Fork Join Parallelism Blocking 【发布时间】:2016-08-24 06:09:38 【问题描述】:

假设您希望并行运行一个部分,然后合并回主线程,然后并行返回到部分,依此类推。类似于童年游戏红灯绿灯。

我已经给出了一个我正在尝试做的例子,我使用条件变量在开始时阻塞线程,但希望并行启动它们,然后在最后阻塞它们,这样它们可以连续打印出来。 *= 操作可能是跨越数秒的更大操作。重用线程也很重要。使用任务队列可能太重了。

我需要使用某种阻塞结构,而不仅仅是简单的繁忙循环,因为我知道如何用繁忙的循环解决这个问题。

英文:

    线程 1 创建了 10 个被阻塞的线程 线程 1 向所有线程发出信号以启动(互不阻塞) 线程 2-11 处理它们的独占内存 线程 1 一直在等待,直到 2-11 完成(此处可以使用原子计数) 线程 2-11 完成,如有必要,每个线程都可以通知 1 检查其状态 线程 1 检查其条件并打印数组 线程 1 重新通知 2-11 再次处理,从 2 继续

示例代码(天真改编自 cplusplus.com 上的示例):

// condition_variable example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
#include <atomic>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;
std::atomic<int> count(0);

bool end = false;
int a[10];

void doublea (int id) 
  while(!end) 
    std::unique_lock<std::mutex> lck(mtx);
    while (!ready) cv.wait(lck);
    a[id] *= 2;
    count.fetch_add(1);
  


void go() 
  std::unique_lock<std::mutex> lck(mtx);

  ready = true;
  cv.notify_all();
  ready = false; // Naive

  while (count.load() < 10) sleep(1);
  for(int i = 0; i < 10; i++) 
    std::cout << a[i] << std::endl;
  

  ready = true;
  cv.notify_all();
  ready = false;
  while (count.load() < 10) sleep(1);
  for(int i = 0; i < 10; i++) 
    std::cout << a[i] << std::endl;
  

  end = true;
  cv.notify_all();


int main () 
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i) 
    a[i] = 0;
    threads[i] = std::thread(doublea,i);
  

  std::cout << "10 threads ready to race...\n";
  go();                       // go!

  return 0;

【问题讨论】:

为什么count 是原子的?为什么在一侧使用条件变量而在另一侧使用忙碌睡眠? @UmNyobe 我明确不想使用忙睡眠。线程在等待通知时应该休眠。对于您的另一个问题, count 是原子的,因为如果运气好的话,它可以同时被多个线程增加。增量是一个两阶段的过程。 【参考方案1】:

要有效地实现它并不是那么简单。此外,除非您正在学习该主题,否则它没有任何意义。条件变量在这里不是一个好的选择,因为它不能很好地扩展。

我建议您看看成熟的运行时库如何实现 fork-join 并行性,并从中学习或在您的应用程序中使用它们。请参阅http://www.openmprtl.org/、http://opentbb.org/、https://www.cilkplus.org/ - 所有这些都是开源的。

OpenMP 是您所寻找的最接近的模型,并且它具有最有效的分叉连接障碍实现。但是,它有其缺点,因为它是为 HPC 设计的并且缺乏动态可组合性。 TBB 和 Cilk 最适合嵌套并行性以及可在外部并行区域上下文中使用的模块和库中的使用。

【讨论】:

你说我正在努力学习是正确的。我之前看过 TBB 和 OpenMP,但是 OpenMP 大量使用 pragma 是令人反感的,而且 TBB 有很多样板来做简单的事情。但是,我很乐意再给他们看一眼并使用它们。为了真正阅读我的问题,我会给你答案。 您不想在下面混合前端(使用模型)和运行时库。您可以为任何这些运行时编写自己的前端。【参考方案2】:

您可以使用屏障或条件变量来启动所有线程。然后线程一个可以等待所有线程结束他们的工作(通过所有线程上的join方法,它是阻塞的),然后在一个for循环中打印他们的数据。

【讨论】:

这不是一个解决方案,因为线程需要在加入后重新创建。

以上是关于C++ Fork Join 并行阻塞的主要内容,如果未能解决你的问题,请参考以下文章

Fork/Join框架

ForkJoinPool线程池工作原理

虚幻4与现代C++:使用TaskGraph实现Fork-Join模型

虚幻4与现代C++:使用TaskGraph实现Fork-Join模型

如何在不阻塞主线程的情况下使用 join() 创建多个 C++ 线程?

父线程join():阻塞直到孩子完成?