使用 MPI_Gather 在 Fortran 中发送二维数组

Posted

技术标签:

【中文标题】使用 MPI_Gather 在 Fortran 中发送二维数组【英文标题】:Sending 2D arrays in Fortran with MPI_Gather 【发布时间】:2013-07-04 17:21:48 【问题描述】:

我想使用MPI_GATHER 发送二维数据块。例如:我在每个节点上有 2x3 数组,如果我有 4 个节点,我想要根上的 8x3 数组。对于 1d 数组,MPI_GATHER 根据 MPI 等级对数据进行排序,但对于 2d 数据,它会造成混乱!

什么是整理块的干净方法?

我期待这段代码的输出:

program testmpi
  use mpi
  implicit none
  integer :: send (2,3)
  integer :: rec (4,3)
  integer :: ierror,my_rank,i,j

  call MPI_Init(ierror)
  MPI_DATA_TYPE type_col
  ! find out process rank
  call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierror)
  if (my_rank==0) then
    send=1
    do i=1,2
      print*,(send(i,j),j=1,3)
    enddo
  endif
  if (my_rank==1) then
    send=5
    ! do 1,2
    !   print*,(send(i,j),j=1,3)
    ! enddo
  endif
  call MPI_GATHER(send,6,MPI_INTEGER,rec,6,MPI_INTEGER,0,MPI_COMM_WORLD,ierror)
  if (my_rank==0) then
    print*,'<><><><><>rec'
    do i=1,4
      print*,(rec(i,j),j=1,3)
    enddo
  endif
  call MPI_Finalize(ierror)
end program testmpi

变成这样:

   1           1           1
   1           1           1
   5           5           5
   5           5           5

但它看起来像这样:

   1           1           5
   1           1           5
   1           5           5
   1           5           5

【问题讨论】:

This answer 给出了 C 的代码,但基本思路是一样的。 感谢您的快速回放。我编辑了问题并添加了一个示例来澄清我的问题。答案对我来说有点不清楚。我应该一步一步做什么?我猜该答案中的某些步骤仅适用于 C,因为缺乏本机多维数组支持。 基本思路是一样的。您需要 (a) 了解内存在 multid 数组中的布局方式; (b) 创建一个派生类型来描述您发送的数据块; (c) 使用范围来描述数据的去向。 【参考方案1】:

以下是 this answer 的字面 Fortran 翻译。我原以为这是不必要的,但数组索引和内存布局的多重差异可能意味着值得做一个 Fortran 版本。

首先我要说的是,您通常并不想这样做 - 从某个“主”进程分散并收集大量数据。通常,您希望每个任务都完成自己的难题,并且您的目标应该是永远不要让一个处理器需要整个数据的“全局视图”;只要你需要,你就会限制可伸缩性和问题的大小。如果您正在为 I/O 执行此操作 - 一个进程读取数据,然后将其分散,然后将其收集回来进行写入,您最终会希望查看 MPI-IO。

不过,说到您的问题,MPI 有非常好的方法可以将任意数据从内存中提取出来,并将其分散/收集到一组处理器中。不幸的是,这需要相当多的 MPI 概念——MPI 类型、范围和集体操作。在这个问题的答案中讨论了很多基本思想——MPI_Type_create_subarray and MPI_Gather。

考虑一个 1d 整数全局数组,任务 0 具有您希望将其分配给多个 MPI 任务,以便它们各自在其本地数组中获得一块。假设你有 4 个任务,全局数组是 [0,1,2,3,4,5,6,7]。您可以让任务 0 发送四条消息(包括一条给它自己)来分发它,当需要重新组装时,接收四条消息将它捆绑在一起;但这显然会在大量进程中变得非常耗时。这些类型的操作有优化的例程 - 分散/收集操作。所以在这种 1d 情况下,你会做这样的事情:

integer, dimension(8) :: global      ! only root has this
integer, dimension(2) :: local       ! everyone has this
integer, parameter    :: root = 0
integer :: rank, comsize
integer :: i, ierr

call MPI_Init(ierr)
call MPI_Comm_size(MPI_COMM_WORLD, comsize, ierr)
call MPI_Comm_rank(MPI_COMM_WORLD, rank, ierr)

if (rank == root) then
    global = [ (i, i=1,8) ]
endif

call MPI_Scatter(global, 2, MPI_INTEGER, &    ! send everyone 2 ints from global
                 local,  2, MPI_INTEGER, &    ! each proc recieves 2 into
                 root,                   &    ! sending process is root,
                 MPI_COMM_WORLD, ierr)        ! all procs in COMM_WORLD participate

在此之后,处理器的数据将如下所示

task 0:  local:[1,2]  global: [1,2,3,4,5,6,7,8]
task 1:  local:[3,4]  global: [garbage]
task 2:  local:[5,6]  global: [garbage]
task 3:  local:[7,8]  global: [garbage]

也就是说,分散操作采用全局数组并将连续的 2-int 块发送到所有处理器。

要重新组装数组,我们使用 MPI_Gather() 操作,其工作原理完全相同但相反:

local = local + rank

call MPI_Gather (local,  2, MPI_INTEGER, &    ! everyone sends 2 ints from local
                 global, 2, MPI_INTEGER, &    ! root receives 2 ints each proc into global
                 root,                   &    ! receiving process is root,
                 MPI_COMM_WORLD, ierr)        ! all procs in COMM_WORLD participate

现在数组看起来像:

task 0:  local:[1,2]    global: [1,2,4,5,7,8,10,11]
task 1:  local:[4,5]    global: [garbage-]
task 2:  local:[7,8]    global: [garbage-]
task 3:  local:[10,11]  global: [garbage-]

Gather 带回所有数据。

如果数据点的数量不均分进程数,并且我们需要向每个进程发送不同数量的项目,会发生什么情况?然后,您需要一个通用版本的 scatter,MPI_Scatterv,它允许您指定每个处理器的计数和位移——在全局数组中该数据段的开始位置。因此,假设对于相同的 4 个任务,您有一个字符数组 [a,b,c,d,e,f,g,h,i] 有 9 个字符,并且您将为每个进程分配两个字符,除了最后一个,得到了三个。那么你需要

character, dimension(9) :: global
character, dimension(3) :: local
integer, dimension(4)   :: counts
integer, dimension(4)   :: displs

if (rank == root) then
    global = [ (achar(i+ichar('a')), i=0,8) ]
endif
local = ['-','-','-']

counts = [2,2,2,3]
displs = [0,2,4,6]

mycounts = counts(rank+1)

call MPI_Scatterv(global, counts, displs,         & ! proc i gets counts(i) chars from displs(i)
                  MPI_CHARACTER,                  &
                  local, mycounts, MPI_CHARACTER, & ! I get mycounts chars into
                  root,                           & ! root rank does sending
                  MPI_COMM_WORLD, ierr)             ! all procs in COMM_WORLD participate

现在数据看起来像

task 0:  local:"ab-"  global: "abcdefghi"
task 1:  local:"cd-"  global: *garbage*
task 2:  local:"ef-"  global: *garbage*
task 3:  local:"ghi"  global: *garbage*

您现在已经使用 scatterv 来分发不规则数量的数据。每种情况下的位移都是从数组的开头开始的两个*秩(以字符为单位;位移的单位是为分散发送或为收集接收的类型;通常不是以字节或其他形式),并且计数为 [2,2,2,3]。如果它是我们希望有 3 个字符的第一个处理器,我们将设置 counts=[3,2,2,2] 并且位移将是 [0,3,5,7]。 Gatherv 再次以完全相同的方式工作,但相反; counts 和 displs 数组将保持不变。

现在,对于 2D,这有点棘手。如果我们想发送 2d 数组的 2d 子块,我们现在发送的数据不再是连续的。如果我们向 4 个处理器发送(比如说)一个 6x6 数组的 3x3 子块,我们发送的数据中有漏洞:

2D Array

   ---------
   |000|222|
   |000|222|
   |000|222|
   |---+---|
   |111|333|
   |111|333|
   |111|333|
   ---------

Actual layout in memory

   [000111000111000111222333222333222333]

(请注意,所有高性能计算都归结为了解内存中数据的布局。)

如果我们要将标记为“1”的数据发送给任务1,我们需要跳过三个值,发送三个值,跳过三个值,发送三个值,跳过三个值,发送三个值。第二个复杂因素是子区域停止和开始的地方。请注意,区域“1”不会从区域“0”停止的地方开始;在区域“0”的最后一个元素之后,内存中的下一个位置位于区域“1”的中途。

让我们首先解决第一个布局问题 - 如何仅提取我们想要发送的数据。我们总是可以将所有“0”区域数据复制到另一个连续数组中,然后发送;如果我们足够仔细地计划它,我们甚至可以这样做,我们可以在结果上调用 MPI_Scatter。但我们宁愿不必以这种方式转置整个主要数据结构。

到目前为止,我们使用的所有 MPI 数据类型都是简单的 - MPI_INTEGER 指定(比如说)连续 4 个字节。但是,MPI 允许您创建自己的数据类型来描述内存中任意复杂的数据布局。这种情况——数组的矩形子区域——很常见,there's a specific call for that。对于我们上面描述的二维情况,

integer :: newtype;
integer, dimension(2) :: sizes, subsizes, starts

sizes    = [6,6]     ! size of global array
subsizes = [3,3]     ! size of sub-region 
starts   = [0,0]     ! let's say we're looking at region "0"
                     ! which begins at offset [0,0] 

call MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_FORTRAN, MPI_INTEGER, newtype, ierr)
call MPI_Type_commit(newtype, ierr)

这将创建一个类型,该类型仅从全局数组中挑选出区域“0”。请注意,即使在 Fortran 中,start 参数也是作为距数组开头的偏移量(例如,从 0 开始)而不是索引(例如,从 1 开始)给出的。

我们现在可以将那条数据发送到另一个处理器

call MPI_Send(global, 1, newtype, dest, tag, MPI_COMM_WORLD, ierr)  ! send region "0"

并且接收进程可以将其接收到本地数组中。请注意,接收过程,如果它只是将它接收到一个 3x3 数组中,则不能将它接收的内容描述为一种新类型;这不再描述内存布局,因为在一行的结尾和下一行的开头之间没有大的跳跃。相反,它只是接收一个由 3*3 = 9 个整数组成的块:

call MPI_Recv(local, 3*3, MPI_INTEGER, 0, tag, MPI_COMM_WORLD, ierr)

请注意,我们也可以对其他子区域执行此操作,方法是为其他块创建不同的类型(具有不同的起始数组),或者只是从特定块的第一个位置开始发送:

if (rank == root) then
    call MPI_Send(global(4,1), 1, newtype, 1, tag, MPI_COMM_WORLD, ierr)
    call MPI_Send(global(1,4), 1, newtype, 2, tag, MPI_COMM_WORLD, ierr)
    call MPI_Send(global(4,4), 1, newtype, 3, tag, MPI_COMM_WORLD, ierr)
    local = global(1:3, 1:3)
else
    call MPI_Recv(local, 3*3, MPI_INTEGER, 0, tag, MPI_COMM_WORLD, rstatus, ierr)
endif

既然我们了解了如何指定子区域,那么在使用分散/聚集操作之前只需要讨论一件事,那就是这些类型的“大小”。我们还不能只对这些类型使用 MPI_Scatter()(甚至 scatterv),因为这些类型的范围是 15 个整数;也就是说,它们开始后的结束位置是 15 个整数——它们的结束位置与下一个块开始的位置不一致,所以我们不能只使用 scatter——它会选择错误的位置开始发送数据到下一个处理器。

当然,我们可以使用 MPI_Scatterv() 并自己指定位移,这就是我们要做的——除了位移以发送类型大小为单位,这对我们也没有帮助;这些块从全局数组开始的 (0,3,18,21) 个整数的偏移量开始,并且一个块从它开始的地方结束 15 个整数的事实并不能让我们用整数倍来表达这些位移.

为了解决这个问题,MPI 允许您设置类型的范围以用于这些计算。它不会截断类型;它仅用于在给定最后一个元素的情况下确定下一个元素的开始位置。对于像这样的带有孔的类型,将范围设置为小于内存中到类型实际结尾的距离通常很方便。

我们可以将范围设置为对我们来说方便的任何内容。我们可以将范围 1 设为整数,然后以整数为单位设置位移。不过,在这种情况下,我喜欢将范围设置为 3 个整数 - 子列的大小 - 这样,块“1”在块“0”之后立即开始,块“3”在块“之后立即开始” 2"。不幸的是,当从“2”块跳到“3”块时,它的效果并不好,但这无济于事。

因此,在这种情况下,为了分散子块,我们将执行以下操作:

integer(kind=MPI_ADDRESS_KIND) :: extent

starts   = [0,0]
sizes    = [6, 6]
subsizes = [3, 3]

call MPI_Type_create_subarray(2, sizes, subsizes, starts,        &
                              MPI_ORDER_FORTRAN, MPI_INTEGER,  &
                              newtype, ierr)
call MPI_Type_size(MPI_INTEGER, intsize, ierr)
extent = 3*intsize
call MPI_Type_create_resized(newtype, 0, extent, resizedtype, ierr)
call MPI_Type_commit(resizedtype, ierr)

在这里,我们创建了与以前相同的块类型,但我们调整了它的大小;我们没有改变类型“开始”(0)的位置,但我们改变了它“结束”的位置(3 个整数)。我们之前没有提到这一点,但是MPI_Type_commit 是必须的才能使用该类型;但您只需要提交您实际使用的最终类型,而不需要任何中间步骤。完成后使用MPI_Type_free 释放已提交的类型。

所以现在,最后,我们可以分散块了:上面的数据操作有点复杂,但是一旦完成,scatterv 看起来就像以前一样:

counts = 1          ! we will send one of these new types to everyone
displs = [0,1,6,7]  ! the starting point of everyone's data
                    ! in the global array, in block extents

call MPI_Scatterv(global, counts, displs, & ! proc i gets counts(i) types from displs(i) 
        resizedtype,                      &
        local, 3*3, MPI_INTEGER,          & ! I'm receiving 3*3 int
        root, MPI_COMM_WORLD, ierr)         !... from (root, MPI_COMM_WORLD)

现在我们已经完成了,在对 scatter、gather 和 MPI 派生类型进行了一些了解之后。

下面的示例代码显示了收集和分散操作以及字符数组。运行程序:

$ mpirun -np 4 ./scatter2d
 global array is:
 000222
 000222
 000222
 111333
 111333
 111333
 Rank            0  received:
 000
 000
 000
 Rank            1  received:
 111
 111
 111
 Rank            2  received:
 222
 222
 222
 Rank            3  received:
 333
 333
 333
 Rank            0  sending:
 111
 111
 111
 Rank            1  sending:
 222
 222
 222
 Rank            2  sending:
 333
 333
 333
 Rank            3  sending:
 444
 444
 444
  Root received:
 111333
 111333
 111333
 222444
 222444
 222444

代码如下:

program scatter
    use mpi
    implicit none

    integer, parameter :: gridsize = 6    ! size of array
    integer, parameter :: procgridsize = 2 ! size of process grid
    character, allocatable, dimension (:,:) :: global, local
    integer, dimension(procgridsize**2)   :: counts, displs
    integer, parameter    :: root = 0
    integer :: rank, comsize
    integer :: localsize
    integer :: i, j, row, col, ierr, p, charsize
    integer, dimension(2) :: sizes, subsizes, starts

    integer :: newtype, resizedtype
    integer, parameter :: tag = 1
    integer, dimension(MPI_STATUS_SIZE) :: rstatus
    integer(kind=MPI_ADDRESS_KIND) :: extent, begin

    call MPI_Init(ierr)
    call MPI_Comm_size(MPI_COMM_WORLD, comsize, ierr)
    call MPI_Comm_rank(MPI_COMM_WORLD, rank, ierr)

    if (comsize /= procgridsize**2) then
        if (rank == root) then
            print *, 'Only works with np = ', procgridsize**2, ' for now.'
        endif
        call MPI_Finalize(ierr)
        stop
    endif

    localsize = gridsize/procgridsize
    allocate( local(localsize, localsize) )
    if (rank == root) then
        allocate( global(gridsize, gridsize) )
        forall( col=1:procgridsize, row=1:procgridsize )
            global((row-1)*localsize+1:row*localsize, &
                   (col-1)*localsize+1:col*localsize) = &
                    achar(ichar('0')+(row-1)+(col-1)*procgridsize)
        end forall

        print *, 'global array is: '
        do i=1,gridsize
            print *, global(i,:)
        enddo
    endif
    starts   = [0,0]
    sizes    = [gridsize, gridsize]
    subsizes = [localsize, localsize]

    call MPI_Type_create_subarray(2, sizes, subsizes, starts,        &
                                  MPI_ORDER_FORTRAN, MPI_CHARACTER,  &
                                  newtype, ierr)
    call MPI_Type_size(MPI_CHARACTER, charsize, ierr)
    extent = localsize*charsize
    begin  = 0
    call MPI_Type_create_resized(newtype, begin, extent, resizedtype, ierr)
    call MPI_Type_commit(resizedtype, ierr)

    counts = 1          ! we will send one of these new types to everyone
    forall( col=1:procgridsize, row=1:procgridsize )
       displs(1+(row-1)+procgridsize*(col-1)) = (row-1) + localsize*procgridsize*(col-1)
    endforall

    call MPI_Scatterv(global, counts, displs,   & ! proc i gets counts(i) types from displs(i)
            resizedtype,                        &
            local, localsize**2, MPI_CHARACTER, & ! I'm receiving localsize**2 chars
            root, MPI_COMM_WORLD, ierr)           !... from (root, MPI_COMM_WORLD)

    do p=1, comsize
        if (rank == p-1) then
            print *, 'Rank ', rank, ' received: '
            do i=1, localsize
                print *, local(i,:)
            enddo
        endif
        call MPI_Barrier(MPI_COMM_WORLD, ierr)
    enddo

    local = achar( ichar(local) + 1 )

    do p=1, comsize
        if (rank == p-1) then
            print *, 'Rank ', rank, ' sending: '
            do i=1, localsize
                print *, local(i,:)
            enddo
        endif
        call MPI_Barrier(MPI_COMM_WORLD, ierr)
    enddo

    call MPI_Gatherv( local, localsize**2, MPI_CHARACTER, & ! I'm sending localsize**2 chars
                      global, counts, displs, resizedtype,&
                      root, MPI_COMM_WORLD, ierr)

    if (rank == root) then
        print *, ' Root received: '
        do i=1,gridsize
            print *, global(i,:)
        enddo
    endif

    call MPI_Type_free(newtype,ierr)
    if (rank == root) deallocate(global)
    deallocate(local)
    call MPI_Finalize(ierr)

end program scatter

这就是一般的解决方案。对于您的特定情况,我们只是按行附加,我们不需要 Gatherv,我们可以只使用聚集,因为在这种情况下,所有位移都是相同的 - 之前,在 2d 块情况下,我们有一个位移“向下”,然后在您“越过”到下一列块时跳入该位移。在这里,位移始终是前一个范围的一个范围,因此我们不需要明确给出位移。所以最终的代码如下:

program testmpi
use mpi
    implicit none
    integer, dimension(:,:), allocatable :: send, recv
    integer, parameter :: nsendrows = 2, nsendcols = 3
    integer, parameter :: root = 0
    integer :: ierror, my_rank, comsize, i, j, ierr
    integer :: blocktype, resizedtype
    integer, dimension(2) :: starts, sizes, subsizes
    integer (kind=MPI_Address_kind) :: start, extent
    integer :: intsize

    call MPI_Init(ierror)
    call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierror)
    call MPI_Comm_size(MPI_COMM_WORLD, comsize, ierror)

    allocate( send(nsendrows, nsendcols) )

    send = my_rank

    if (my_rank==root) then
        ! we're going to append the local arrays
        ! as groups of send rows
        allocate( recv(nsendrows*comsize, nsendcols) )
    endif

    ! describe what these subblocks look like inside the full concatenated array
    sizes    = [ nsendrows*comsize, nsendcols ]
    subsizes = [ nsendrows, nsendcols ]
    starts   = [ 0, 0 ]

    call MPI_Type_create_subarray( 2, sizes, subsizes, starts,     &
                                   MPI_ORDER_FORTRAN, MPI_INTEGER, &
                                   blocktype, ierr)

    start = 0
    call MPI_Type_size(MPI_INTEGER, intsize, ierr)
    extent = intsize * nsendrows

    call MPI_Type_create_resized(blocktype, start, extent, resizedtype, ierr)
    call MPI_Type_commit(resizedtype, ierr)

    call MPI_Gather( send, nsendrows*nsendcols, MPI_INTEGER, &  ! everyone send 3*2 ints
                     recv, 1, resizedtype,                   &  ! root gets 1 resized type from everyone
                     root, MPI_COMM_WORLD, ierr)

    if (my_rank==0) then
    print*,'<><><><><>recv'
    do i=1,nsendrows*comsize
        print*,(recv(i,j),j=1,nsendcols)
    enddo
    endif
    call MPI_Finalize(ierror)

end program testmpi

用 3 个进程运行它会得到:

$ mpirun -np 3 ./testmpi
 <><><><><>recv
           0           0           0
           0           0           0
           1           1           1
           1           1           1
           2           2           2
           2           2           2

【讨论】:

考虑到重新编写代码示例需要多长时间,也许值得做一个 Fortran 版本。 @JonathanDursi 我认为这个答案应该复制到文档中,因为它绝对很棒。 已更新以包含由鹰眼的@EnricoMariaDeAngelis 提出的编辑(并被审阅者错误但善意地拒绝),以修复一个示例中从 C 示例中过度字面复制索引的问题。 我们当然可以做到,@EnricoMariaDeAngelis 我们?我永远不会窃取您的出色答案;)顺便说一句,我发布了一个请求here。

以上是关于使用 MPI_Gather 在 Fortran 中发送二维数组的主要内容,如果未能解决你的问题,请参考以下文章

如何从 C 中使用 MPI_Scatter 和 MPI_Gather?

如何使用 MPI_Scatter 和 MPI_Gather 计算多个进程的平均值?

MPI_Gather 的问题无法将 8 个 32 个元素的数组收集到一个 256 个元素的大数组中

mpi 进程在信号 11 上退出

为啥在 Fortran 中使用命令 PRINT 会覆盖输入文件?

fortran读取一行数据