这个 OpenMP 屏障有啥解决方法吗?
Posted
技术标签:
【中文标题】这个 OpenMP 屏障有啥解决方法吗?【英文标题】:Is there any workaround for this OpenMP barrier?这个 OpenMP 屏障有什么解决方法吗? 【发布时间】:2017-10-23 02:43:07 【问题描述】:我有这个用 OpenMp 编写的并行区域:
std::vector<T> sharedResult;
#pragma omp parallel
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++)
//fill result
#pragma omp critical
sharedResult.insert(sharedResult.end(), result.begin(), result.end());
#pramga omp barrier
#pragma omp for nowait
for(size_t i=0; i<sharedResult.size(); i++)
foo(sharedResult[i]);
...
恐怕#pragma omp barrier
是必要的。我认为的原因是,否则当一个线程碰到最后一个#pragma omp for
,sharedResult.size()
在那一刻仍然不是他的最终状态(在前一个并行完成时获得)。请注意,不幸的是,sharedResult
的大小以前是未知的。
不幸的是,我注意到这个障碍会产生很大的开销,即一个特定的迭代比所有其他迭代都更昂贵,因此所有线程都必须等待执行该迭代的线程。这可以认为是负载不平衡,但我没有找到任何解决方案。
所以我的问题是:有什么方法可以在不等待前一个完成的情况下启动最后一个并行,或者没有办法改进它?
【问题讨论】:
什么是典型的sharedResult.size()
和n
?
@Zulan 感谢您的评论。这是计算机视觉算法的一部分,两者都高度依赖于输入图像,但在这两种情况下我们都在谈论数千个元素。
祖蓝的第二种方案是我想到的。虽然我只是想给出一个想法,但它似乎比我想象的要复杂。现在让我们删除我们的 cmets,并请删除“可能的解决方案”部分,因为它不正确,也不是您问题的一部分 :)
@Shadow 我觉得你是对的,我删除了我以前的 cmets
【参考方案1】:
我同意屏障是必要的。我看到了几种解决方法,它们的复杂性越来越高,效率也可能越来越高:
任务
为每个结果元素发布一个任务:
#pragma omp parallel
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++)
//fill result
// would prefer a range based loop here, but
// there seem to be issues with passing references
// to tasks in certain compilers
for(size_t i=0; i<result.size(); i++)
#pragma omp task
foo(result[i]);
您甚至可以在初始循环中发布任务。如果任务太多,您可能会获得大量开销。
用完成的线程处理结果队列
现在这个比较棘手 - 特别是您需要区分结果队列为空和所有线程都完成了它们的第一个循环。
std::vector<T> sharedResult;
int threadsBusy;
size_t resultIndex = 0;
#pragma omp parallel
#pragma omp single
threadsBusy = omp_num_threads();
std::vector<T> result;
#pragma omp for nowait
for(int i=0; i<n; i++)
//fill result
#pragma omp critical
sharedResult.insert(sharedResult.end(), result.begin(), result.end());
threadsBusy--;
do
bool hasResult, allThreadsDone;
// We need a copy here as the vector may be resized
// and elements may become invalid by insertion
T myResult;
#pragma omp critical
if (resultIndex < sharedResult.size())
resultIndex++;
hasResult = true;
myResult = sharedResult[myResult];
else
hasResult = false;
allThreadsDone = threadsBusy == 0;
if (hasResult)
foo(myResult);
else
if (allThreadsDone)
break;
// If we just continue here, we will spin on the mutex
// Unfortunately there are no condition variables in OpenMP
// So instead we go for a quick nap as a compromise
// Feel free to tune this accordingly
std::this_thread::sleep_for(10ms);
while (true);
注意:通常我会测试我在这里发布的代码,但由于缺少完整的示例而无法测试。
通过并行循环处理结果生成块
最后,对于已经完成的结果,您可以多次并行运行 for 循环。然而,这有许多问题。首先,所有线程都必须遇到每个工作共享区域,即使是较晚完成第一个工作的线程也是如此。所以你必须跟踪你运行的循环。此外,每个线程的循环绑定必须相同 - 您只能在关键部分阅读sharedResult.size()
。因此,您必须在关键部分中由一个线程预先将其读取到共享变量中,但要等待所有线程,直到它被正确读取。此外,您将不得不使用动态调度,否则您可能会使用静态调度,并且您将等待最后完成的线程。您编辑的示例不执行这些操作。我不会想当然地认为 for nowait schedule(dynamic)
可以在团队中的所有线程进入它之前完成(但它适用于 libgomp)。综合考虑,我不会真的去那里。
【讨论】:
感谢您的有用回答,非常感谢。第二种解决方案对我来说是最好的解决方案(尽管我从未使用过#pragma omp
任务)。但是为什么我们不使用共享的std::queue
来代替sharedResult
而不是std::vector
结合allThreadsDone
和快速小睡呢?
在我对 [OpenMP] 问题的回答中,我尝试坚持使用纯 OpenMP,因为不幸的是,OpenMP 在技术上不支持C++11 and it's parallel primitives。也就是说,使用std::condition_variable
可能既实用又干净。
甚至concurrent_bounded_queue
!在我的实现中,我通过使用它获得了最佳性能
我认为第二种解决方案中存在竞争条件:sharedResult.insert
在关键部分内被调用。没关系。但是sharedResult[myIndex]
在临界区之外被调用。 sharedResult[myIndex]
返回的引用随时可能失效,因为sharedResult.insert
可能会重新分配。为了解决这个问题,我们必须在第二个关键部分做更多的工作:不仅要获取索引,还要从向量中获取实际元素。注意:我们需要复制或移动实际元素(类型T
),而不仅仅是sharedResult[myIndex]
返回的引用。
谢谢,很好。我更新了答案。复制的替代方法是引用稳定的集合,例如std::deque
(但我相信std::deque
仅使用索引仍然是错误的)。以上是关于这个 OpenMP 屏障有啥解决方法吗?的主要内容,如果未能解决你的问题,请参考以下文章
tornado v6 似乎已经放弃了 tornado.web.asynchronous 协程。在代码中解决这个问题有啥不同的方法吗?