创建一个在 MPI 进程之间保持同步的计数器

Posted

技术标签:

【中文标题】创建一个在 MPI 进程之间保持同步的计数器【英文标题】:Creating a counter that stays synchronized across MPI processes 【发布时间】:2011-06-24 08:13:47 【问题描述】:

我在使用基本的 comm 和 group MPI2 方法方面有相当多的经验,并且使用 MPI 进行了相当多令人尴尬的并行模拟工作。到目前为止,我已经将我的代码结构化为有一个调度节点和一堆工作节点。调度节点具有将与模拟器一起运行的参数文件列表。它使用参数文件为每个工作节点播种。工作节点运行它们的模拟,然后请求调度节点提供的另一个参数文件。运行完所有参数文件后,调度节点会先关闭每个工作节点,然后再自行关闭。

参数文件通常命名为“Par_N.txt”,其中 N 是标识整数(例如 - N = 1-1000)。所以我在想,如果我可以创建一个计数器,并且可以让这个计数器在我的所有节点上同步,我就可以消除对调度节点的需求,并使系统更简单一些。这在理论上听起来很简单,但在实践中我怀疑它有点困难,因为我需要确保计数器在更改时被锁定等。并且认为 MPI 可能有一种内置方式处理这个(事情。有什么想法吗?我是不是想多了?

【问题讨论】:

您能否解释一下您希望从取消调度程序中获得什么好处? @aix- 当然。在我们的一些较大的运行中,我注意到调度节点的通信已经饱和(比如运行 np=10k 节点)。为了克服这个问题,我开始允许多个调度节点,其中每个调度节点都有一个子组。然而,这会导致更复杂(即更难维护)的代码。所以这主要是一个试图简化事情的问题(如果它可以简单地完成的话)。 此外,在更频繁的小运行(比如 5-10 个节点)上,最好不要将整个节点交给调度节点。我们的系统管理员非常反对内核过载,并已将作业调度程序设置为不允许进程数 > 请求内核数的作业。 【参考方案1】:

实现共享计数器并非易事,但一旦你完成它并将它放在某个库中的某个地方,你就可以用它很多

在Using MPI-2 这本书中,如果你要实现这些东西,你应该拿到这本书,其中一个例子(代码是available online)是一个共享计数器。 “不可扩展”的应该适用于几十个进程——计数器是一个 0..size-1 的整数数组,每个等级一个,然后“获取下一个工作项#”操作包括锁定窗口,读取其他人对计数器的贡献(在这种情况下,他们已经采取了多少项目),更新您自己的(++),关闭窗口并计算总数。这一切都是通过被动的片面操作来完成的。 (更好的缩放只使用一棵树而不是一维数组)。

所以用途是你说 0 级托管计数器,每个人都继续做工作单元并更新计数器以获得下一个,直到没有更多工作为止;然后你在障碍物或其他地方等待并最终确定。

一旦你有这样的东西 - 使用共享值来获得下一个可用的工作单元 - 工作,那么你可以推广到更复杂的方法。因此,正如 suzterpatt 所建议的那样,每个人在开始时都接受“他们的份额”的工作单元效果很好,但是如果有些人完成得比其他人快怎么办?现在通常的答案是偷工减料。每个人都将他们的工作单元列表保存在一个出队中,然后当一个人用完工作时,它会从其他人的出队的另一端窃取工作单元,直到没有更多工作了。这实际上是完全分布式的 master-worker 版本,不再需要单一的 master 分区工作。一旦你有一个共享计数器工作,你就可以从中创建互斥体,然后你可以实现出队。但如果简单的共享计数器运行良好,您可能不需要去那里。

更新: 好的,所以这里有一个 hacky-attempt 来做共享计数器 - 我在 MPI-2 书中的简单版本:似乎有效,但我不会说什么比这强得多(很久没玩过这个东西了)。有一个简单的计数器实现(对应于 MPI-2 书中的非缩放版本),带有两个简单的测试,一个大致对应于您的工作案例;每个项目都会更新计数器以获取工作项,然后执行“工作”(随机睡眠时间)。在每次测试结束时,都会打印出计数器数据结构,即每个等级所做的增量#。

#include <mpi.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>

struct mpi_counter_t 
    MPI_Win win;
    int  hostrank ;
    int  myval;
    int *data;
    int rank, size;
;

struct mpi_counter_t *create_counter(int hostrank) 
    struct mpi_counter_t *count;

    count = (struct mpi_counter_t *)malloc(sizeof(struct mpi_counter_t));
    count->hostrank = hostrank;
    MPI_Comm_rank(MPI_COMM_WORLD, &(count->rank));
    MPI_Comm_size(MPI_COMM_WORLD, &(count->size));

    if (count->rank == hostrank) 
        MPI_Alloc_mem(count->size * sizeof(int), MPI_INFO_NULL, &(count->data));
        for (int i=0; i<count->size; i++) count->data[i] = 0;
        MPI_Win_create(count->data, count->size * sizeof(int), sizeof(int),
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
     else 
        count->data = NULL;
        MPI_Win_create(count->data, 0, 1,
                       MPI_INFO_NULL, MPI_COMM_WORLD, &(count->win));
    
    count -> myval = 0;

    return count;


int increment_counter(struct mpi_counter_t *count, int increment) 
    int *vals = (int *)malloc( count->size * sizeof(int) );
    int val;

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, count->hostrank, 0, count->win);

    for (int i=0; i<count->size; i++) 

        if (i == count->rank) 
            MPI_Accumulate(&increment, 1, MPI_INT, 0, i, 1, MPI_INT, MPI_SUM,
                           count->win);
         else 
            MPI_Get(&vals[i], 1, MPI_INT, 0, i, 1, MPI_INT, count->win);
        
    

    MPI_Win_unlock(0, count->win);
    count->myval += increment;

    vals[count->rank] = count->myval;
    val = 0;
    for (int i=0; i<count->size; i++)
        val += vals[i];

    free(vals);
    return val;


void delete_counter(struct mpi_counter_t **count) 
    if ((*count)->rank == (*count)->hostrank) 
        MPI_Free_mem((*count)->data);
    
    MPI_Win_free(&((*count)->win));
    free((*count));
    *count = NULL;

    return;


void print_counter(struct mpi_counter_t *count) 
    if (count->rank == count->hostrank) 
        for (int i=0; i<count->size; i++) 
            printf("%2d ", count->data[i]);
        
        puts("");
    


int test1() 
    struct mpi_counter_t *c;
    int rank;
    int result;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    result = increment_counter(c, 1);
    printf("%d got counter %d\n", rank, result);

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);



int test2() 
    const int WORKITEMS=50;

    struct mpi_counter_t *c;
    int rank;
    int result = 0;

    c = create_counter(0);

    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    srandom(rank);

    while (result < WORKITEMS) 
        result = increment_counter(c, 1);
        if (result <= WORKITEMS) 
             printf("%d working on item %d...\n", rank, result);
             sleep(random() % 10);
          else 
             printf("%d done\n", rank);
         
    

    MPI_Barrier(MPI_COMM_WORLD);
    print_counter(c);
    delete_counter(&c);


int main(int argc, char **argv) 

    MPI_Init(&argc, &argv);

    test1();
    test2();

    MPI_Finalize();

【讨论】:

MPI-3 中的 MPI_Fetch_and_op 大大简化了这段代码。【参考方案2】:

我想不出任何内置机制来解决这个问题,你必须手动实现它。从您的 cmets 来看,您希望分散程序,在这种情况下,每个进程(或至少是进程组)都必须保持自己的计数器值并保持同步。这可能可以通过巧妙地使用非阻塞发送/接收来完成,但这些语义并非微不足道。

相反,我会通过简单地一次向工作进程发出多个文件来解决饱和问题。这将减少网络流量并允许您保持简单的单一调度程序设置。

【讨论】:

@suszterpatt-我一直在考虑您提到的内容-给定进程ID和进程总数,我可以轻松获取要完成的工作的“块”每个过程。然而,我在这里担心的是,模拟的计算时间差异很大(2+ 个数量级,取决于收敛速度),并且我可以看到一种情况,其中一个节点被赋予了大量的长时间进程,我的负载平衡将成为一个问题。 @MarkD:理论上,这当然是可能的。但是,听起来您正在处理真正大量的数据,因此它的可能性实际上可能并不那么大。尽管如此,一种可能的解决方法可能是让您的调度程序“取消调度”其工作节点尚未开始处理的文件,并将它们分派给当前空闲的工作人员。我仍然认为这种方法比实现共享变量更简单。【参考方案3】:

您似乎正在使用调度节点进行动态负载平衡(当处理器可用时将工作分配给处理器)。不需要所有处理器停止的共享计数器不会这样做。我建议您保持现有的状态,或者按照 suszterpatt 的建议,一次发送一批文件。

【讨论】:

【参考方案4】:

尚不清楚是否需要严格按顺序浏览文件。如果没有,为什么不让每个节点i 处理N % total_workers == i 所在的所有文件——即循环分配工作?

【讨论】:

以上是关于创建一个在 MPI 进程之间保持同步的计数器的主要内容,如果未能解决你的问题,请参考以下文章

如何强制打开mpi 3使用TCP?

Java并发包中的线程同步器

使用 MPI 的质数计数器(埃拉托色尼筛法),太慢了

进程间的几种通信方式的比较和线程间的几种

在 MPI 进程之间交换数据(晕)

SymmetricDS:保持单独的自动增量计数器