OpenMP 中的并行累积(前缀)总和:线程之间的通信值

Posted

技术标签:

【中文标题】OpenMP 中的并行累积(前缀)总和:线程之间的通信值【英文标题】:Parallel cumulative (prefix) sums in OpenMP: communicating values between threads 【发布时间】:2013-09-14 04:23:39 【问题描述】:

假设我有一个函数f(i),它依赖于索引i(以及其他无法预先计算的值)。 我想填充一个数组a,以便a[n] = sum(f(i)) from i=0 to n-1

编辑:在 Hristo Iliev 发表评论后,我意识到我在做的是 cumulative/prefix sum。

这可以写成代码

float sum = 0;
for(int i=0; i<N; i++) 
    sum += f(i);
    a[i] = sum;

现在我想使用 OpenMP 并行执行此操作。我可以使用 OpenMP 执行此操作的一种方法是并行写出 f(i) 的值,然后以串行方式处理依赖关系。如果f(i) 是一个慢速函数,那么这可能会很好地工作,因为非并行循环很简单。

#pragma omp parallel for
for(int i=0; i<N; i++) 
    a[i] = f(i);

for(int i=1; i<N; i++) 
    a[i] += a[i-1];

但在没有 OpenMP 的非并行循环的情况下也可以做到这一点。然而,我想出的解决方案很复杂,而且可能很老套。所以我的问题是,是否有一种更简单、不那么复杂的方式来使用 OpenMP 做到这一点?

下面的代码基本上运行我为每个线程列出的第一个代码。结果是给定线程中a 的值是正确的,直到一个常数。我将每个线程的总和保存到带有nthreads+1 元素的数组suma 中。这使我可以在线程之间进行通信并确定每个线程的恒定偏移量。然后我用偏移量更正a[i] 的值。

float *suma;
#pragma omp parallel

    const int ithread = omp_get_thread_num();
    const int nthreads = omp_get_num_threads();
    const int start = ithread*N/nthreads;
    const int finish = (ithread+1)*N/nthreads;
    #pragma omp single
    
        suma = new float[nthreads+1];
        suma[0] = 0;
    
    float sum = 0;
    for (int i=start; i<finish; i++) 
        sum += f(i);
        a[i] = sum;
    
    suma[ithread+1] = sum;
    #pragma omp barrier
    float offset = 0;
    for(int i=0; i<(ithread+1); i++) 
        offset += suma[i];
    
    for(int i=start; i<finish; i++) 
        a[i] += offset;
    

delete[] suma;

一个简单的测试就是设置f(i) = i。那么解决方案是a[i] = i*(i+1)/2(无穷远处是-1/12)。

【问题讨论】:

这就是通常使用 OpenMP 计算前缀和的方式。您可以将#pragma omp for schedule(static) 应用于在a[] 上运行的两个循环,而不是手动计算开始和结束索引。 @HristoIliev,我认为尽管在实践中 OpenMP 像我一样定义开始和结束,但我不应该假设 OpenMP 会那样做(我想我在你的一篇文章中读过)。代码for(int i=0; i&lt;(ithread+1); i++) 要求在并行循环中,较大的索引值始终对应于较大的线程值。一般情况下是这样吗? schedule(static) 具有标准保证的特殊属性,例如在某些条件下(在您的情况下满足)可重复分布模式。 好的,我想我明白了。我对此提出了一个 SO 问题,因为我认为这是其他人可能想知道的。我有一段时间不确定。 【参考方案1】:

您可以将策略扩展到任意数量的子区域,并使用任务递归地减少它们:

#include<vector>
#include<iostream>

using namespace std;

const int n          = 10000;
const int baseLength = 100;

int f(int ii) 
  return ii;


int recursiveSumBody(int * begin, int * end)

  size_t length  = end - begin;
  size_t mid     = length/2;
  int    sum     = 0;


  if ( length < baseLength ) 
    for(size_t ii = 1; ii < length; ii++ )
        begin[ii] += begin[ii-1];
    
   else 
#pragma omp task shared(sum)
    
      sum = recursiveSumBody(begin    ,begin+mid);
    
#pragma omp task
    
      recursiveSumBody(begin+mid,end      );
    
#pragma omp taskwait

#pragma omp parallel for
    for(size_t ii = mid; ii < length; ii++) 
      begin[ii] += sum;
    

  
  return begin[length-1];


void recursiveSum(int * begin, int * end)

#pragma omp single
  
    recursiveSumBody(begin,end);
      



int main() 

  vector<int> a(n,0);

#pragma omp parallel
  
    #pragma omp for
    for(int ii=0; ii < n; ii++)           
      a[ii] = f(ii);
      

    recursiveSum(&a[0],&a[n]);

  
  cout << n*(n-1)/2 << endl;
  cout << a[n-1] << endl;

  return 0;

【讨论】:

非常感谢您发布一个工作示例!我想我希望得到一个适用于 OpenMP 2.0 的答案(因此它也适用于 MSVC),但这对我来说是使用 OpenMP 任务的好机会。我不得不增加baseLength 以获得n=10000 的正确值。你知道这种方法有多有效吗? 好吧,我认为对于这个特定的示例,任务不会比您编写的代码更快。我更担心的是你必须增加baseLength 才能获得正确的值,这意味着某处存在缺陷。无论如何,我无法在程序中看到数据竞争。 看来baseLength 必须等于n 才能得到正确的结果。 我的机器上的baseLength 得到了正确的结果。使用g++ 4.8.1编译。 奇怪,我不知道,我必须包含 才能编译,但就是这样。我正在使用 G++ 4.7.3。【参考方案2】:

为了完整起见,我在考虑Hristo的评论时添加了OP的MWE代码:

#include <iostream>
#include <omp.h>
using std::cout;
using std::endl;

const int N = 10;
const int Nthr = 4;
float f(int i) return (float)i;

int main(void) 
    omp_set_num_threads(Nthr);
    float* a = new float[N];
    float *suma = new float[Nthr+1];
    suma[0] = 0.0;
    float sum = 0.0;
#pragma omp parallel for schedule(static) firstprivate(sum)
    for (int i=0; i<N; i++) 
        sum += f(i);
        a[i] = sum;
        suma[omp_get_thread_num()+1] = sum;
    

    // this for-loop is also a commulative sum, but it has only Nthr iterations
    for (int i=1; i<Nthr;i++)
        suma[i] += suma[i-1];

#pragma omp parallel for schedule(static)
    for(int i=0; i< N; i++) 
        a[i] += suma[omp_get_thread_num()];
    

    for (int i=0; i<N; i++) 
        cout << a[i] << endl;
    

    delete[] suma;
    int n = 95;
    cout << a[n] << endl << n*(n+1)/2 << endl;
    delete[] a;
    return 0;


【讨论】:

以上是关于OpenMP 中的并行累积(前缀)总和:线程之间的通信值的主要内容,如果未能解决你的问题,请参考以下文章

前缀和的并行化 (Openmp)

c++ openmp中的线程

openmp:线程数的增加会降低性能

OpenMP 并行前缀和加速

循环C ++中的分段错误Openmp

在 OpenMP 中并行化嵌套循环并使用更多线程执行内部循环