大型数组中元素的并行总和

Posted

技术标签:

【中文标题】大型数组中元素的并行总和【英文标题】:Parallel sum of elements in a large Array 【发布时间】:2013-04-29 06:21:58 【问题描述】:

我有一个程序可以对一个非常大的数组中的元素求和。我想并行化这个总和。

#define N = some_very_large_no; // say 1e12
float x[N]; // read from a file
float sum=0.0;
main()


for (i=0, i<N, i++)

sum=sum+x[i];


如何使用线程并行化这个总和(c/c++/Java 任何代码示例都可以)?如果机器中有 8 个内核,我应该使用多少线程才能获得最佳性能?

编辑:N 可能真的很大(实际上大于 1e6),并且根据我从中读取数据的文件大小而有所不同。该文件的顺序为 GB。

编辑:N 改为较大的值(1e12 到 1e16)

【问题讨论】:

您可以为此目的使用 c++11 线程。您可能需要尝试最佳线程数,但我认为由于缓存约束等原因,少于 8 个线程将提供最佳结果(如 5-6)... 这是一个 Java、C 或 C++ 问题吗? 是否有 原因 导致如此多的反对票?这对我来说是一个非常有效的问题。简单地显示你可以拆分然后添加结果一点都不好。 我敢问,我们在这里谈论多少个花车?因为, 1. 对单次扫描解决方案进行基准测试,因为无论如何您都将它们全部保存在内存中。 2. 除非您有足够的浮点数(取决于您的工作系统),否则我会惊讶地发现,用多线程解决方案击败那个时间所需的数量并不大。您的 100 万个浮动示例很可能不会这样做。将它们分成一系列生成的 FFT,您可能有充分的理由这样做。一个简单的总结?不太可能。 “没有得到你的反对票。”恭喜!你现在确实是通过暗示我得到的。 -1 【参考方案1】:

在Java中你可以写

int cpus = Runtime.getRuntime().availableProcessors();
// would keep this of other tasks as well.
ExecutorService service = Executors.newFixedThreadPool(cpus);

float[] floats = new float[N];

List<Future<Double>> tasks = new ArrayList<>();
int blockSize = (floats.length + cpus - 1) / cpus;
for (int i=0, i < floats.length, i++) 
    final start = blockSize * i;
    final end = Math.min(blockSize * (i+1), floats.length);
    tasks.add(service.submit(new Callable<Double>() 
        public Double call() 
            double d= 0;
            for(int j=start;j<end;j++)
                d += floats[j];
            return d;
        
     );

double sum = 0;
for(Future<Double> task: tasks)
    sum += task.get();

正如 WhozCraig 所提到的,一百万个浮点数可能不足以需要多个线程,或者您会发现您的瓶颈是您可以从主内存(单线程资源)加载数组的速度在任何在这种情况下,您不能假设当您将获取数据的成本包括在内时它会更快。

【讨论】:

考虑到一个基元数组很可能(在 Java 中)是一个连续的内存块,并且一个简单的 for 循环很容易被 CPU“预测”,我认为(我不是甚至可以肯定),更多的CPU会做更少的事情。我们能期待这样的事情吗? 我们可以预期更多的 cpu 可能会更好,但不会更差。好多少取决于你在做什么。对于这么简单的操作,我怀疑不多。【参考方案2】:

你说数组来自一个文件。如果您对程序的不同部分进行计时,您会发现与从磁盘读取数据所需的时间相比,汇总元素所需的时间可以忽略不计。从Amdahl's Law 可以得出结论,并行化求和没有任何好处。

如果您需要提高性能,您应该专注于提高 I/O 吞吐量。

【讨论】:

你是对的。因为我可以并行计算,所以我首先考虑这个。我不确定 io 是否可以并行化,我的意思是从磁盘读取。 @StackUnderflow 保存存储多卷镜像的数据,不太可能。我看不到单轴读取操作从并行化中获得任何好处。但是还有那些可爱的SSD,不是吗=P【参考方案3】:

您可以使用多个线程(多于内核)。但是没有线程及其性能取决于您的算法以及它们的工作方式。 由于数组长度为 100000,因此创建 x 个线程,每个线程将计算 arr[x] 到 arr[x+limit]。您必须在其中设置限制,以免与其他线程重叠并且任何元素都不应保持未使用状态。 线程创建:

   pthread_t tid[COUNT];
    int i = 0;
        int err;
        while (i < COUNT) 
        
            void *arg;
            arg = x; //pass here a no which will tell from where this thread will use arr[x]
            err = pthread_create(&(tid[i]), NULL, &doSomeThing, arg);
            if (err != 0)
                printf("\ncan't create thread :[%s]", strerror(err));
            else
            
                //printf("\n Thread created successfully\n");
            

            i++;
        
       // NOW CALCULATE....
        for (int i = 0; i < COUNT; i++) 
        
            pthread_join(tid[i], NULL);
        


void* doSomeThing(void *arg) 

    int *x;
    x = (int *) (arg);
   // now use this x to start the array sum from arr[x] to ur limit which should not overlap to other thread

【讨论】:

这种优化可能只会造成伤害,至少在Java中是这样。你可能会破坏分支预测,线程甚至可能需要更长的时间来“安排”彼此,做实际的工作。 @Eugene 是的,但它完全取决于线程的使用,如果它们正在为任何共享资源而苦苦挣扎,那么我们必须使用一些锁定,但这会导致问题......!所以线程只有在算法是并行的时候才有用。 @Eugene 我不确定。如果对象不是 volatile,那么 Java 就不会等待同步线程访问。 @Eugene 我没有使用过 java 线程,所以不能说什么......但基础不会错......所以线程可能会提高性能或降低其取决于应用程序算法.. .not on c/c++ or java threads.!!【参考方案4】:

使用分而治之算法

将数组分成 2 份或更多份(继续递归分割,直到得到大小可控的数组) 开始计算子数组(分割数组)的总和(使用单独的线程) 最后将所有子数组(从所有线程)生成的总和相加,以产生最终结果

【讨论】:

您的第一步是不必要的。另外,这并不能真正回答问题 因为它是一个数组,你可以固定求和的最佳长度,然后按索引继续。线程 0:0 到 oLength-1,线程 1:oLength 到 2*oLength - 1.....等等。【参考方案5】:

正如其他人所说,读取文件的时间成本几乎肯定会比计算总和大得多。它是文本文件还是二进制文件?如果将数字存储为文本,则读取它们的成本可能会非常高,具体取决于您的实现。

您还应该小心添加大量浮点数。由于精度有限,数组后面的小值可能对总和没有贡献。考虑至少使用双精度来累积值。

【讨论】:

【参考方案6】:

您可以在 c 中使用 pthreads 来解决您的问题 这是我的 N=4 代码(您可以更改它以满足您的需要) 要运行此代码,请应用以下命令: gcc -pthread test.c -o 测试 ./测试

#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>

#define NUM_THREADS 5
pthread_t threads[NUM_THREADS];
pthread_mutex_t mutexsum;
int  a[2500];
int sum = 0;
void *do_work(void* parms) 

    long tid = (long)parms;
printf("I am thread # %ld\n ", tid);

    int start, end, mysum;

    start = (int)tid * 500;
    end = start + 500;
    int i = 0;
printf("Thread # %ld with start = %d and end = %d \n",tid,start,end);
    for (int i = start; i < end; i++) 
        mysum += a[i];
    
    pthread_mutex_lock(&mutexsum);
printf("Thread # %ld lock and sum = %d\n",tid,sum);
    sum += mysum;
    pthread_mutex_unlock(&mutexsum);

pthread_exit(NULL);



void main(int argv, char* argc) 
    int i = 0; int rc;
pthread_attr_t attr;
         pthread_mutex_init(&mutexsum, NULL);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
    pthread_mutex_init(&mutexsum, NULL);
printf("Initializing array : \n");
for(i=0;i<2500;i++)
a[i]=1;

    for (i = 0; i < NUM_THREADS; i++) 
        printf("Creating thread # %d.\n", i);

        rc = pthread_create(&threads[i], &attr, &do_work, (void *)i);
        if (rc) 
            printf("Error in thread %d with rc  = %d. \n", i, rc);
            exit(-1);
        

    
pthread_attr_destroy(&attr);
printf("Creating threads complete. start ruun " );
    for (i = 0; i < NUM_THREADS; i++) 
        pthread_join(threads[i], NULL);

    
    printf("\n\tSum : %d", sum);
pthread_mutex_destroy(&mutexsum);
    pthread_exit(NULL);

【讨论】:

【参考方案7】:

OpenMP 支持内置缩减。编译时添加标志-fopenmp。

#include <omp.h>
#define N = some_very_large_no; // say 1e12
float x[N]; // read from a file
int main()


float sum = 0.0;
#pragma omp parallel for reduction(+:sum)
for (i=0, i<N, i++)
  sum=sum+x[i];

return 0;


【讨论】:

以上是关于大型数组中元素的并行总和的主要内容,如果未能解决你的问题,请参考以下文章

与金属 swift 并行计算数组值的总和

如何在 cuda 中获得并行数组的“总和”?

减少cython并行中的数组

在不使用推力的情况下,每个线程具有多个元素的并行前缀总和

MPI_Reduce() 到特定的数组元素

最小唯一数组总和