使用 MPI 在 C 中发送二维数组块

Posted

技术标签:

【中文标题】使用 MPI 在 C 中发送二维数组块【英文标题】:sending blocks of 2D array in C using MPI 【发布时间】:2012-03-05 09:05:43 【问题描述】:

如何将二维数组块发送到不同的处理器?假设二维数组大小为 400x400,我想将大小为 100X100 的块发送到不同的处理器。这个想法是每个处理器将在其单独的块上执行计算并将其结果发送回第一个处理器以获得最终结果。 我在 C 程序中使用 MPI。

【问题讨论】:

你应该接受答案... 【参考方案1】:

首先我要说的是,您通常并不想这样做 - 从某个“主”进程分散并收集大量数据。通常,您希望每个任务都完成自己的难题,并且您的目标应该是永远不要让一个处理器需要整个数据的“全局视图”;只要你需要,你就会限制可伸缩性和问题的大小。如果您正在为 I/O 执行此操作 - 一个进程读取数据,然后将其分散,然后将其收集回来进行写入,您最终会希望查看 MPI-IO。

不过,说到您的问题,MPI 有非常好的方法可以将任意数据从内存中提取出来,并将其分散/收集到一组处理器中。不幸的是,这需要相当多的 MPI 概念——MPI 类型、范围和集体操作。在这个问题的答案中讨论了很多基本思想——MPI_Type_create_subarray and MPI_Gather。

更新 - 在寒冷的日子里,这是很多代码而不是很多解释。所以让我稍微扩展一下。

考虑一个 1d 整数全局数组,任务 0 具有您希望将其分配给多个 MPI 任务,以便它们各自在其本地数组中获得一块。假设你有 4 个任务,全局数组是[01234567]。您可以让任务 0 发送四条消息(包括一条给它自己)来分发它,当需要重新组装时,接收四条消息将它捆绑在一起;但这显然会在大量进程中变得非常耗时。这些类型的操作有优化的例程 - 分散/收集操作。所以在这种 1d 情况下,你会做这样的事情:

int global[8];   /* only task 0 has this */
int local[2];    /* everyone has this */
const int root = 0;   /* the processor with the initial global data */

if (rank == root) 
   for (int i=0; i<7; i++) global[i] = i;


MPI_Scatter(global, 2, MPI_INT,      /* send everyone 2 ints from global */
            local,  2, MPI_INT,      /* each proc receives 2 ints into local */
            root, MPI_COMM_WORLD);   /* sending process is root, all procs in */
                                     /* MPI_COMM_WORLD participate */

在此之后,处理器的数据将如下所示

task 0:  local:[01]  global: [01234567]
task 1:  local:[23]  global: [garbage-]
task 2:  local:[45]  global: [garbage-]
task 3:  local:[67]  global: [garbage-]

也就是说,分散操作采用全局数组并将连续的 2-int 块发送到所有处理器。

要重新组装数组,我们使用MPI_Gather() 操作,其工作原理完全相同但相反:

for (int i=0; i<2; i++) 
   local[i] = local[i] + rank;

MPI_Gather(local,  2, MPI_INT,      /* everyone sends 2 ints from local */
           global, 2, MPI_INT,      /* root receives 2 ints each proc into global */
           root, MPI_COMM_WORLD);   /* recv'ing process is root, all procs in */
                                    /* MPI_COMM_WORLD participate */

现在数据看起来像

task 0:  local:[01]  global: [0134679a]
task 1:  local:[34]  global: [garbage-]
task 2:  local:[67]  global: [garbage-]
task 3:  local:[9a]  global: [garbage-]

Gather 带回所有数据,这里 a 是 10,因为在开始这个示例时我没有仔细考虑我的格式。

如果数据点的数量不均分进程数,并且我们需要向每个进程发送不同数量的项目,会发生什么情况?然后,您需要一个通用版本的 scatter,MPI_Scatterv(),它可以让您指定每个 处理器和位移——在全局数组中数据开始的位置。因此,假设您有一个字符数组[abcdefghi],其中包含 9 个字符,并且您将为每个进程分配两个字符,除了最后一个字符,即三个字符。那么你需要

char global[9];   /* only task 0 has this */
char local[3]='-','-','-';    /* everyone has this */
int  mynum;                     /* how many items */
const int root = 0;   /* the processor with the initial global data */

if (rank == 0) 
   for (int i=0; i<8; i++) global[i] = 'a'+i;


int counts[4] = 2,2,2,3;   /* how many pieces of data everyone has */
mynum = counts[rank];
int displs[4] = 0,2,4,6;   /* the starting point of everyone's data */
                             /* in the global array */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] pts from displs[i] */
            MPI_INT,      
            local, mynum, MPI_INT;   /* I'm receiving mynum MPI_INTs into local */
            root, MPI_COMM_WORLD);

现在数据看起来像

task 0:  local:[ab-]  global: [abcdefghi]
task 1:  local:[cd-]  global: [garbage--]
task 2:  local:[ef-]  global: [garbage--]
task 3:  local:[ghi]  global: [garbage--]

您现在已经使用 scatterv 来分发不规则数量的数据。每种情况下的位移都是从数组的开头开始的两个*秩(以字符为单位;位移的单位是为分散发送或为收集接收的类型;通常不是以字节或其他形式),并且计数是 2,2,2,3。如果它是我们希望有 3 个字符的第一个处理器,我们将设置 counts=3,2,2,2 并且位移将是 0,3,5,7。 Gatherv 再次以完全相同的方式工作,但相反; counts 和 displs 数组将保持不变。

现在,对于 2D,这有点棘手。如果我们想发送 2d 数组的 2d 子块,我们现在发送的数据不再是连续的。如果我们向 4 个处理器发送(比如说)一个 6x6 数组的 3x3 子块,我们发送的数据中有漏洞:

2D Array

   ---------
   |000|111|
   |000|111|
   |000|111|
   |---+---|
   |222|333|
   |222|333|
   |222|333|
   ---------

Actual layout in memory

   [000111000111000111222333222333222333]

(请注意,所有高性能计算都归结为了解内存中数据的布局。)

如果我们要将标记为“1”的数据发送给任务1,我们需要跳过三个值,发送三个值,跳过三个值,发送三个值,跳过三个值,发送三个值。第二个复杂因素是子区域停止和开始的地方。请注意,区域“1”不会从区域“0”停止的地方开始;在区域“0”的最后一个元素之后,内存中的下一个位置位于区域“1”的中途。

让我们首先解决第一个布局问题 - 如何仅提取我们想要发送的数据。我们总是可以将所有“0”区域数据复制到另一个连续数组中,然后发送;如果我们足够仔细地计划它,我们甚至可以这样做,我们可以在结果上调用MPI_Scatter。但我们宁愿不必以这种方式转置整个主要数据结构。

到目前为止,我们使用的所有 MPI 数据类型都是简单的 - MPI_INT 指定(例如)连续 4 个字节。但是,MPI 允许您创建自己的数据类型来描述内存中任意复杂的数据布局。这种情况——数组的矩形子区域——很常见,以至于有一个特定的要求。对于二维 我们上面描述的情况,

    MPI_Datatype newtype;
    int sizes[2]    = 6,6;  /* size of global array */
    int subsizes[2] = 3,3;  /* size of sub-region */
    int starts[2]   = 0,0;  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &newtype);
    MPI_Type_commit(&newtype);

这创建了一个类型,它只从全局数组中挑选出区域“0”;我们可以 现在只将那条数据发送到另一个处理器

    MPI_Send(&(global[0][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "0" */

并且接收进程可以将其接收到本地数组中。请注意,接收过程,如果它只是将它接收到一个 3x3 数组中,则不能将它接收的内容描述为newtype 的类型;不再描述内存布局。相反,它只是接收一个由 3*3 = 9 个整数组成的块:

    MPI_Recv(&(local[0][0]), 3*3, MPI_INT, 0, tag, MPI_COMM_WORLD);

请注意,我们也可以对其他子区域执行此操作,方法是为其他块创建不同的类型(具有不同的start 数组),或者仅通过在特定块的起点发送:

    MPI_Send(&(global[0][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "1" */
    MPI_Send(&(global[3][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "2" */
    MPI_Send(&(global[3][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "3" */

最后,请注意,这里我们要求全局和局部是连续的内存块;也就是说,&amp;(global[0][0])&amp;(local[0][0])(或者,等效地,*global*local 指向连续的 6*6 和 3*3 内存块;这不能通过分配动态多线程的常用方法来保证) d 数组。下面显示了如何执行此操作。

既然我们了解了如何指定子区域,那么在使用分散/聚集操作之前只需要讨论一件事,那就是这些类型的“大小”。我们还不能只对这些类型使用MPI_Scatter()(甚至是scatterv),因为这些类型的范围是16 个整数;也就是说,它们开始后的结束位置是 16 个整数——它们的结束位置与下一个块开始的位置不一致,所以我们不能只使用 scatter——它会选择错误的位置开始发送数据到下一个处理器。

当然,我们可以使用MPI_Scatterv() 并自己指定位移,这就是我们要做的——除了位移以发送类型大小为单位,这对我们也没有帮助;块从全局数组开始的 (0,3,18,21) 个整数的偏移量开始,并且块从它开始的位置结束 16 个整数的事实根本不允许我们以整数倍数表达这些位移.

为了解决这个问题,MPI 允许您设置类型的范围以用于这些计算。它不会截断类型;它仅用于在给定最后一个元素的情况下确定下一个元素的开始位置。对于像这样的带有孔的类型,将范围设置为小于内存中到类型实际结尾的距离通常很方便。

我们可以将范围设置为对我们来说方便的任何内容。我们可以将范围 1 设为整数,然后以整数为单位设置位移。不过,在这种情况下,我喜欢将范围设置为 3 个整数 - 子行的大小 - 这样,块“1”在块“0”之后立即开始,块“3”在块“之后立即开始2"。不幸的是,当从“2”块跳到“3”块时,它的效果并不好,但这无济于事。

因此,在这种情况下,为了分散子块,我们将执行以下操作:

    MPI_Datatype type, resizedtype;
    int sizes[2]    = 6,6;  /* size of global array */
    int subsizes[2] = 3,3;  /* size of sub-region */
    int starts[2]   = 0,0;  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    /* as before */
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);  
    /* change the extent of the type */
    MPI_Type_create_resized(type, 0, 3*sizeof(int), &resizedtype);
    MPI_Type_commit(&resizedtype);

在这里,我们创建了与以前相同的块类型,但我们调整了它的大小;我们没有改变类型“开始”的位置(0),但我们改变了它“结束”的位置(3 个整数)。我们之前没有提到这一点,但是MPI_Type_commit 是必须的才能使用该类型;但您只需要提交您实际使用的最终类型,而不需要任何中间步骤。完成后使用MPI_Type_free 释放类型。

所以现在,最后,我们可以分散块了:上面的数据操作有点复杂,但是一旦完成,scatterv 看起来就像以前一样:

int counts[4] = 1,1,1,1;   /* how many pieces of data everyone has, in units of blocks */
int displs[4] = 0,1,6,7;   /* the starting point of everyone's data */
                             /* in the global array, in block extents */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] types from displs[i] */
            resizedtype,      
            local, 3*3, MPI_INT;   /* I'm receiving 3*3 MPI_INTs into local */
            root, MPI_COMM_WORLD);

现在我们已经完成了,在对 scatter、gather 和 MPI 派生类型进行了一些了解之后。

下面的示例代码显示了收集和分散操作以及字符数组。运行程序:

$ mpirun -n 4 ./gathervarray
Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456
Local process on rank 0 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Local process on rank 1 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 2 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 3 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Processed grid:
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD

代码如下。

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

int malloc2dchar(char ***array, int n, int m) 

    /* allocate the n*m contiguous items */
    char *p = (char *)malloc(n*m*sizeof(char));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = (char **)malloc(n*sizeof(char*));
    if (!(*array)) 
       free(p);
       return -1;
    

    /* set up the pointers into the contiguous memory */
    for (int i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;


int free2dchar(char ***array) 
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;


int main(int argc, char **argv) 
    char **global, **local;
    const int gridsize=10; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) 
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    


    if (rank == 0) 
        /* fill in the array, and print it */
        malloc2dchar(&global, gridsize, gridsize);
        for (int i=0; i<gridsize; i++) 
            for (int j=0; j<gridsize; j++)
                global[i][j] = '0'+(3*i+j)%10;
        


        printf("Global array is:\n");
        for (int i=0; i<gridsize; i++) 
            for (int j=0; j<gridsize; j++)
                putchar(global[i][j]);

            printf("\n");
        
    

    /* create the local array which we'll process */
    malloc2dchar(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */

    int sizes[2]    = gridsize, gridsize;         /* global size */
    int subsizes[2] = gridsize/procgridsize, gridsize/procgridsize;     /* local size */
    int starts[2]   = 0,0;                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(char), &subarrtype);
    MPI_Type_commit(&subarrtype);

    char *globalptr=NULL;
    if (rank == 0) globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) 
        for (int i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1;
        int disp = 0;
        for (int i=0; i<procgridsize; i++) 
            for (int j=0; j<procgridsize; j++) 
                displs[i*procgridsize+j] = disp;
                disp += 1;
            
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        
    


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (int p=0; p<size; p++) 
        if (rank == p) 
            printf("Local process on rank %d is:\n", rank);
            for (int i=0; i<gridsize/procgridsize; i++) 
                putchar('|');
                for (int j=0; j<gridsize/procgridsize; j++) 
                    putchar(local[i][j]);
                
                printf("|\n");
            
        
        MPI_Barrier(MPI_COMM_WORLD);
    

    /* now each processor has its local array, and can process it */
    for (int i=0; i<gridsize/procgridsize; i++) 
        for (int j=0; j<gridsize/procgridsize; j++) 
            local[i][j] = 'A' + rank;
        
    

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_CHAR,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2dchar(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) 
        printf("Processed grid:\n");
        for (int i=0; i<gridsize; i++) 
            for (int j=0; j<gridsize; j++) 
                putchar(global[i][j]);
            
            printf("\n");
        

        free2dchar(&global);
    


    MPI_Finalize();

    return 0;

【讨论】:

这在某些版本中一次又一次地出现;我希望写一个我们可以一直指向人们的答案。但是谢谢:) 我非常精通 Fortran MPI,但我收藏了它以备将来参考。另外,我同意 mort 的评论。 整个过程在 Fortran 中更容易,它在语言中内置了多维数组; C 一直选择不包括的东西。而且你们俩都已经给出了非常有力的答案... 实际上问题是这样的,在一个 400x400 的网格上,一个 5x5 的光源网格被映射。光源在一个方向上的高度 (h) 和在另一方向上的功率 (p) 变化。我们必须在网格中找到最小照明点。由于 (xl, yl) 处的光源,任何点 (x,y) 的照明由 I = h*p/(r^3) 给出,其中 r^2 = (x-xl)^2 + (y-yl )^2 + h^2 . 我想知道使用 MPI 解决这个问题的最佳方法是什么。【参考方案2】:

我只是发现这样更容易检查。

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

/*
 This is a version with integers, rather than char arrays, presented in this
 very good answer: http://***.com/a/9271753/2411320
 It will initialize the 2D array, scatter it, increase every value by 1 and then gather it back.
*/

int malloc2D(int ***array, int n, int m) 
    int i;
    /* allocate the n*m contiguous items */
    int *p = malloc(n*m*sizeof(int));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = malloc(n*sizeof(int*));
    if (!(*array)) 
       free(p);
       return -1;
    

    /* set up the pointers into the contiguous memory */
    for (i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;


int free2D(int ***array) 
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;


int main(int argc, char **argv) 
    int **global, **local;
    const int gridsize=4; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes
    int i, j, p;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) 
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    


    if (rank == 0) 
        /* fill in the array, and print it */
        malloc2D(&global, gridsize, gridsize);
        int counter = 0;
        for (i=0; i<gridsize; i++) 
            for (j=0; j<gridsize; j++)
                global[i][j] = ++counter;
        


        printf("Global array is:\n");
        for (i=0; i<gridsize; i++) 
            for (j=0; j<gridsize; j++) 
                printf("%2d ", global[i][j]);
            
            printf("\n");
        
    
    //return;

    /* create the local array which we'll process */
    malloc2D(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */
    int sizes[2]    = gridsize, gridsize;         /* global size */
    int subsizes[2] = gridsize/procgridsize, gridsize/procgridsize;     /* local size */
    int starts[2]   = 0,0;                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(int), &subarrtype);
    MPI_Type_commit(&subarrtype);

    int *globalptr=NULL;
    if (rank == 0)
        globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) 
        for (i=0; i<procgridsize*procgridsize; i++)
            sendcounts[i] = 1;
        int disp = 0;
        for (i=0; i<procgridsize; i++) 
            for (j=0; j<procgridsize; j++) 
                displs[i*procgridsize+j] = disp;
                disp += 1;
            
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        
    


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_INT,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (p=0; p<size; p++) 
        if (rank == p) 
            printf("Local process on rank %d is:\n", rank);
            for (i=0; i<gridsize/procgridsize; i++) 
                putchar('|');
                for (j=0; j<gridsize/procgridsize; j++) 
                    printf("%2d ", local[i][j]);
                
                printf("|\n");
            
        
        MPI_Barrier(MPI_COMM_WORLD);
    

    /* now each processor has its local array, and can process it */
    for (i=0; i<gridsize/procgridsize; i++) 
        for (j=0; j<gridsize/procgridsize; j++) 
            local[i][j] += 1; // increase by one the value
        
    

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_INT,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2D(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) 
        printf("Processed grid:\n");
        for (i=0; i<gridsize; i++) 
            for (j=0; j<gridsize; j++) 
                printf("%2d ", global[i][j]);
            
            printf("\n");
        

        free2D(&global);
    


    MPI_Finalize();

    return 0;

输出:

linux16:>mpicc -o main main.c
linux16:>mpiexec -n 4 main Global array is:
 1  2  3  4
 5  6  7  8
 9 10 11 12
13 14 15 16
Local process on rank 0 is:
| 1  2 |
| 5  6 |
Local process on rank 1 is:
| 3  4 |
| 7  8 |
Local process on rank 2 is:
| 9 10 |
|13 14 |
Local process on rank 3 is:
|11 12 |
|15 16 |
Processed grid:
 2  3  4  5
 6  7  8  9
10 11 12 13
14 15 16 17

【讨论】:

以上是关于使用 MPI 在 C 中发送二维数组块的主要内容,如果未能解决你的问题,请参考以下文章

使用 MPI 发送二维数组

通过 MPI 发送和接收二维数组

在 MPI C++ 中传递大型二维数组

C 语言二级指针作为输入 ( 二维数组 | 二维数组遍历 | 二维数组排序 )

MPI_Bcast 动态二维数组

如何确保二维数组在内存中连续分配