OpenMP 中的并行累积(前缀)总和:线程之间的通信值
Posted
技术标签:
【中文标题】OpenMP 中的并行累积(前缀)总和:线程之间的通信值【英文标题】:Parallel cumulative (prefix) sums in OpenMP: communicating values between threads 【发布时间】:2013-09-14 04:23:39 【问题描述】:假设我有一个函数f(i)
,它依赖于索引i
(以及其他无法预先计算的值)。
我想填充一个数组a
,以便a[n] = sum(f(i)) from i=0 to n-1
。
编辑:在 Hristo Iliev 发表评论后,我意识到我在做的是 cumulative/prefix sum。
这可以写成代码
float sum = 0;
for(int i=0; i<N; i++)
sum += f(i);
a[i] = sum;
现在我想使用 OpenMP 并行执行此操作。我可以使用 OpenMP 执行此操作的一种方法是并行写出 f(i)
的值,然后以串行方式处理依赖关系。如果f(i)
是一个慢速函数,那么这可能会很好地工作,因为非并行循环很简单。
#pragma omp parallel for
for(int i=0; i<N; i++)
a[i] = f(i);
for(int i=1; i<N; i++)
a[i] += a[i-1];
但在没有 OpenMP 的非并行循环的情况下也可以做到这一点。然而,我想出的解决方案很复杂,而且可能很老套。所以我的问题是,是否有一种更简单、不那么复杂的方式来使用 OpenMP 做到这一点?
下面的代码基本上运行我为每个线程列出的第一个代码。结果是给定线程中a
的值是正确的,直到一个常数。我将每个线程的总和保存到带有nthreads+1
元素的数组suma
中。这使我可以在线程之间进行通信并确定每个线程的恒定偏移量。然后我用偏移量更正a[i]
的值。
float *suma;
#pragma omp parallel
const int ithread = omp_get_thread_num();
const int nthreads = omp_get_num_threads();
const int start = ithread*N/nthreads;
const int finish = (ithread+1)*N/nthreads;
#pragma omp single
suma = new float[nthreads+1];
suma[0] = 0;
float sum = 0;
for (int i=start; i<finish; i++)
sum += f(i);
a[i] = sum;
suma[ithread+1] = sum;
#pragma omp barrier
float offset = 0;
for(int i=0; i<(ithread+1); i++)
offset += suma[i];
for(int i=start; i<finish; i++)
a[i] += offset;
delete[] suma;
一个简单的测试就是设置f(i) = i
。那么解决方案是a[i] = i*(i+1)/2
(无穷远处是-1/12)。
【问题讨论】:
这就是通常使用 OpenMP 计算前缀和的方式。您可以将#pragma omp for schedule(static)
应用于在a[]
上运行的两个循环,而不是手动计算开始和结束索引。
@HristoIliev,我认为尽管在实践中 OpenMP 像我一样定义开始和结束,但我不应该假设 OpenMP 会那样做(我想我在你的一篇文章中读过)。代码for(int i=0; i<(ithread+1); i++)
要求在并行循环中,较大的索引值始终对应于较大的线程值。一般情况下是这样吗?
schedule(static)
具有标准保证的特殊属性,例如在某些条件下(在您的情况下满足)可重复分布模式。
好的,我想我明白了。我对此提出了一个 SO 问题,因为我认为这是其他人可能想知道的。我有一段时间不确定。
【参考方案1】:
您可以将策略扩展到任意数量的子区域,并使用任务递归地减少它们:
#include<vector>
#include<iostream>
using namespace std;
const int n = 10000;
const int baseLength = 100;
int f(int ii)
return ii;
int recursiveSumBody(int * begin, int * end)
size_t length = end - begin;
size_t mid = length/2;
int sum = 0;
if ( length < baseLength )
for(size_t ii = 1; ii < length; ii++ )
begin[ii] += begin[ii-1];
else
#pragma omp task shared(sum)
sum = recursiveSumBody(begin ,begin+mid);
#pragma omp task
recursiveSumBody(begin+mid,end );
#pragma omp taskwait
#pragma omp parallel for
for(size_t ii = mid; ii < length; ii++)
begin[ii] += sum;
return begin[length-1];
void recursiveSum(int * begin, int * end)
#pragma omp single
recursiveSumBody(begin,end);
int main()
vector<int> a(n,0);
#pragma omp parallel
#pragma omp for
for(int ii=0; ii < n; ii++)
a[ii] = f(ii);
recursiveSum(&a[0],&a[n]);
cout << n*(n-1)/2 << endl;
cout << a[n-1] << endl;
return 0;
【讨论】:
非常感谢您发布一个工作示例!我想我希望得到一个适用于 OpenMP 2.0 的答案(因此它也适用于 MSVC),但这对我来说是使用 OpenMP 任务的好机会。我不得不增加baseLength
以获得n=10000
的正确值。你知道这种方法有多有效吗?
好吧,我认为对于这个特定的示例,任务不会比您编写的代码更快。我更担心的是你必须增加baseLength
才能获得正确的值,这意味着某处存在缺陷。无论如何,我无法在程序中看到数据竞争。
看来baseLength
必须等于n
才能得到正确的结果。
我的机器上的baseLength
得到了正确的结果。使用g++ 4.8.1
编译。
奇怪,我不知道,我必须包含 为了完整起见,我在考虑Hristo的评论时添加了OP的MWE代码:
#include <iostream>
#include <omp.h>
using std::cout;
using std::endl;
const int N = 10;
const int Nthr = 4;
float f(int i) return (float)i;
int main(void)
omp_set_num_threads(Nthr);
float* a = new float[N];
float *suma = new float[Nthr+1];
suma[0] = 0.0;
float sum = 0.0;
#pragma omp parallel for schedule(static) firstprivate(sum)
for (int i=0; i<N; i++)
sum += f(i);
a[i] = sum;
suma[omp_get_thread_num()+1] = sum;
// this for-loop is also a commulative sum, but it has only Nthr iterations
for (int i=1; i<Nthr;i++)
suma[i] += suma[i-1];
#pragma omp parallel for schedule(static)
for(int i=0; i< N; i++)
a[i] += suma[omp_get_thread_num()];
for (int i=0; i<N; i++)
cout << a[i] << endl;
delete[] suma;
int n = 95;
cout << a[n] << endl << n*(n+1)/2 << endl;
delete[] a;
return 0;
【讨论】:
以上是关于OpenMP 中的并行累积(前缀)总和:线程之间的通信值的主要内容,如果未能解决你的问题,请参考以下文章