如何在 C++ 中使用 MPI 同步和排序打印(任务)

Posted

技术标签:

【中文标题】如何在 C++ 中使用 MPI 同步和排序打印(任务)【英文标题】:How to synchronize and sort prints (tasks) with MPI in C++ 【发布时间】:2021-10-17 07:39:57 【问题描述】:

编辑前

我正在使用 MPI 用 C++ 编写一个简单的代码来进行并行处理。我做一些简单的任务,比如从一个进程向另一个进程发送和接收消息,与MPI_Sendrecv 交换消息,打个招呼,并打印每个进程的执行时间。它可以工作,但输出没有按我想要的顺序排序(进程 0:任务......进程 1:任务......)。 我知道这是由于进程之间没有同步,根据我的搜索,我知道 MPI_Send,MPI_Recv ... 是隐式阻塞函数,但似乎我不明白如何使用这个功能。我还尝试了经常推荐的 MPI_Barrier() 功能,但没有成功。我运行 8 个进程。 也许有人可以帮助我?提前致谢。 这是我的代码:


#include <mpi.h>
#include <iostream>
using namespace std;


int main(int argc, char* argv[])  



    int rank,nbproc, length;
    char name[80];
    float time;
    double SendData, ReceiveData;
    int tag = 1; 
    int NumTo6 = 500;
    int NumTo7 = 300;
    int ReceiveFrom6, ReceiveFrom7;
 
    
    char message[] = "pokay";
    char receive[] = "none";
    int longueur = strlen(message); 

    SendData = 1254.3356;
    ReceiveData = 0;
    MPI_Init(&argc, &argv); 

    time = MPI_Wtime();
    cout << " " << endl;
    
    MPI_Comm_size(MPI_COMM_WORLD, &nbproc); 
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);    
   // MPI_Get_processor_name(name, &length);
    
    cout << "Hello from process\t" << rank << endl;

    if (rank == 1) 
    
        cout << "2*5 = " << 2*5 << endl;
        

    
    if (rank == 2) 
    
        MPI_Send(&SendData,1,MPI_DOUBLE,3,tag,MPI_COMM_WORLD); 
    

    if (rank == 3) 
    
        cout << "Data before the reception:\t" << ReceiveData << endl; 
        MPI_Recv(&ReceiveData,1,MPI_DOUBLE,2,tag,MPI_COMM_WORLD,MPI_STATUS_IGNORE); 
        cout << "Data received is :\t" << ReceiveData << endl;
        tag+=1;
    


    if (rank == 4)
    
        MPI_Send(&message[1],4,MPI_CHAR,5,tag,MPI_COMM_WORLD); 
    

    if (rank == 5)
       

        cout << "Message before the reception:\t" << receive << endl;
        MPI_Recv(&receive,longueur+1,MPI_CHAR,4,tag,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
        cout << "Message after reception:\t" << receive << endl;
        tag+=1;
    
// Exchange between 2 processes:
    if(rank == 6)
    
        MPI_Sendrecv(&NumTo7,1,MPI_INT,7,tag,&ReceiveFrom7,1,MPI_INT,7,tag+1,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
        cout << "Num receive from 7:\t" << ReceiveFrom7 << endl;
    

    if(rank == 7)
    
        MPI_Sendrecv(&NumTo6,1,MPI_INT,6,tag+1,&ReceiveFrom6,1,MPI_INT,6,tag,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
        cout << "Num receive from 6:\t" << ReceiveFrom6 << endl;
        tag+=2;  
    


    time = MPI_Wtime() - time;
    cout << "Time spend on process " << rank << " is: " << time << " sec" << endl;
    
    MPI_Finalize(); 
  
    return 0;



这是我的输出:

Hello from process      6
 
Hello from process      7
Num receive from 6:     300
Time spend on process 7 is: 6.0746e-05 sec
 
Hello from process      2
Time spend on process 2 is: 5.0439e-05 sec
 
Hello from process      3
Data before the reception:      0
Data received is:      1254.34
Time spend on process 3 is: 0.000439355 sec
 
Hello from process      4
Time spend on process 4 is: 6.2342e-05 sec
 
Hello from process      5
Message before the reception:   none
Message after reception:        okay
Time spend on process 5 is: 0.000168845 sec
 
Hello from process      1
2*5 = 10
Time spend on process 1 is: 0.000132448 sec
 
Hello from process      0
Time spend on the process 0 is: 3.9762e-05 sec
Num receive from 7:     500
Time spend on process 6 is: 0.00206599 sec

编辑 @VictorEijkhout 的评论:

除了char MPI_Gather() 之外,我几乎可以打印出我想要的所有内容以及我想要的方式...(请参阅我的代码)。

我的新代码:

#include <mpi.h>
#include <iostream>
#include <math.h>
#include <string>
#include <cstring>
#include <stdlib.h>
using namespace std;


int main(int argc, char* argv[]) 



    int rang,nbproc, taille;
    char name[80];
    float time;
    double SendData, ReceiveData;
    double NumTo6 = 500;
    double NumTo7 = 300;
    double ReceiveFrom6, ReceiveFrom7;

    
    char message[] = "precu";
    int longueur = strlen(message); 


    int len_buffer = 200;
    char Buffer_Time[len_buffer];
    char Buffer_Hello[len_buffer];
    char Buffer_message[len_buffer];

    char receive[] = "none";    

    int mylen = strlen(receive);
    char* Gathered_Char_message = new char[len_buffer];
    double DataMessage;
    double* GatheredDataMessage = new double[20]; 
    double* GateredDataTime = new double[20]; 
    double DataTime;
    int elements[] = ;
    int source = 0;
    int GatheredSources[] = ;
    double NoData = NAN; 
    SendData = 1254.3356;
    ReceiveData = 0;

    cout << " " << endl;

    MPI_Init(&argc, &argv); 
    time = MPI_Wtime();
    
   
    MPI_Comm_size(MPI_COMM_WORLD, &nbproc); 
    MPI_Comm_rank(MPI_COMM_WORLD, &rang);  
    MPI_Get_processor_name(name, &taille);
    
    sprintf(Buffer_Hello,"Hello from process %d among %d of the machine %s",rang,nbproc,name);
    sprintf(Buffer_Time,"Time elapsed in process %d on %d is " ,rang,nbproc);
    sprintf(Buffer_message,"Data received from process ");
    
    
    MPI_Send(Buffer_Time,len_buffer,MPI_CHAR,0,rang+20,MPI_COMM_WORLD);
    MPI_Send(Buffer_Hello,len_buffer,MPI_CHAR,0,rang+10,MPI_COMM_WORLD);
    MPI_Send(Buffer_message,len_buffer,MPI_CHAR,0,rang+30,MPI_COMM_WORLD);

    if (rang == 1) 
    
        DataMessage = 5*6; 
        source = 1;
        
    
    if (rang == 2) 
    
        MPI_Send(&SendData,1,MPI_DOUBLE,3,1,MPI_COMM_WORLD);
        DataMessage = NoData; 

    

    if (rang == 3) 
    
       
        MPI_Recv(&ReceiveData,1,MPI_DOUBLE,2,1,MPI_COMM_WORLD,MPI_STATUS_IGNORE); 
        DataMessage = ReceiveData;
        source = 2;
    

    if (rang == 4)
    
  
        MPI_Send(&message[1],longueur+1,MPI_CHAR,5,2,MPI_COMM_WORLD); 
        DataMessage = NoData;
  
        
    
 
    if (rang == 5)
       
        MPI_Recv(&receive,longueur+1,MPI_CHAR,4,2,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
        DataMessage = NoData; 
        source = 4;
    
// Exchange between 2 processes:
    if(rang == 6)
    
        MPI_Sendrecv(&NumTo7,1,MPI_DOUBLE,7,3,&ReceiveFrom7,1,MPI_DOUBLE,7,4,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
        DataMessage = ReceiveFrom7;
        elements[rang] = 1; 
        source = 7;
    

    if(rang == 7)
    
        MPI_Sendrecv(&NumTo6,1,MPI_DOUBLE,6,4,&ReceiveFrom6,1,MPI_DOUBLE,6,3,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
        DataMessage = ReceiveFrom6;
        elements[rang] = 1; 
        source = 6;
    

    DataTime = MPI_Wtime() - time;

    MPI_Gather(&DataTime,1,MPI_DOUBLE,GateredDataTime,1,MPI_DOUBLE,0,MPI_COMM_WORLD);
    MPI_Gather(&DataMessage,1,MPI_DOUBLE,GatheredDataMessage,1,MPI_DOUBLE,0,MPI_COMM_WORLD);
    MPI_Gather(&source,1,MPI_INT,GatheredSources,1,MPI_INT,0,MPI_COMM_WORLD);


    // int* recvcounts = new int[nbproc*sizeof(int)];

    
    // MPI_Gather(&mylen,1,MPI_INT,recvcounts,1,MPI_INT,0,MPI_COMM_WORLD); 
    
    // int totlen = 0;
    // int* displs = new int[nbproc*sizeof(int)];
    // //char* totalstring = new char[totlen*sizeof(char)];

    // if(rang == 0)
    // 
    //     displs[0] = 0;
    //     totlen += recvcounts[0] + 1;

    //     for(int i=1; i< nbproc; i++)
    //     
    //         totlen += recvcounts[i]+1;
    //         displs[i] = displs[i-1] + recvcounts[i-1] + 1;
    //     
    // 
    // char* totalstring = new char[totlen*sizeof(char)];

    // if(rang == 0)
    // 
    //     for (int i=0; i<totlen-1; i++)
    //         totalstring[i] = ' ';
        
    //     totalstring[totlen-1] = '\0';
    // 

    //    MPI_Gatherv(&receive, mylen, MPI_CHAR,
    //             totalstring, recvcounts, displs, MPI_CHAR,
    //             0, MPI_COMM_WORLD);

   if(rang == 0)
   
        cout << Buffer_Hello << endl;

        for(int i = 1; i < nbproc; i++)
        
           MPI_Recv(Buffer_Hello,len_buffer,MPI_CHAR,i,i+10,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
           MPI_Recv(Buffer_message,len_buffer,MPI_CHAR,i,i+30,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
           cout << Buffer_Hello << endl;
        
           if(isnan(GatheredDataMessage[i]))
           

           
           else 
           
               cout << Buffer_message << GatheredSources[i] << ": "<<
               GatheredDataMessage[i] << endl;
             


            // cout << totalstring[i] << endl;

            MPI_Recv(Buffer_Time,len_buffer,MPI_CHAR,i,i+20,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
            cout << Buffer_Time << GateredDataTime[i] << " sec" << endl;
            cout << " " << endl;
        
        
   

    delete[] GatheredDataMessage; 
    delete[] GateredDataTime;  
    MPI_Finalize(); 
    return 0;


并输出:

Hello from process 0 among 8 of the machine jeremy-SATELLITE-P50-C
Hello from process 1 among 8 of the machine jeremy-SATELLITE-P50-C
Data received from process 1: 30
Time elapsed in process 1 on 8 is 0.000248922 sec
 
Hello from process 2 among 8 of the machine jeremy-SATELLITE-P50-C
Time elapsed in process 2 on 8 is 0.00013139 sec
 
Hello from process 3 among 8 of the machine jeremy-SATELLITE-P50-C
Data received from process 2: 1254.34
Time elapsed in process 3 on 8 is 0.000183373 sec
 
Hello from process 4 among 8 of the machine jeremy-SATELLITE-P50-C
Time elapsed in process 4 on 8 is 0.000121771 sec
 
Hello from process 5 among 8 of the machine jeremy-SATELLITE-P50-C
Time elapsed in process 5 on 8 is 0.00027475 sec
 
Hello from process 6 among 8 of the machine jeremy-SATELLITE-P50-C
Data received from process 7: 500
Time elapsed in process 6 on 8 is 0.00330783 sec
 
Hello from process 7 among 8 of the machine jeremy-SATELLITE-P50-C
Data received from process 6: 300
Time elapsed in process 7 on 8 is 0.000215519 sec

所以我接近我想要的,它错过了进程 4 和 5 交换的字符的收集和打印,并为所有其他进程打印出初始字符“无”。例如,我尝试了几件事:https://***.com/a/31932283/14901229, 您可以在我的带有 cmets 的代码中看到这一点,但是没有打印任何内容...

也许你能帮我做最后一件事?另外,如果您看到我的代码的优化方法(我认为有......),请不要犹豫告诉我!

提前致谢!

【问题讨论】:

输出由操作系统处理。您不能从 MPI 中同步它。唯一基于 MPI 的解决方案是将所有文本发送到零处理并让其进行打印。在您的情况下,您还可以MPI_Gather 数字并让进程零打印它们并附带文本。 如果进程4和5有一个独立的打印,那又不能同步。忍受它,或将缓冲区发送到进程零。但实际上,印刷的意义何在?大规模应用程序永远不会精确地并行打印,因为它不能同步,如果它们同步,它们提供了一个明确的机制来收集数据。您还可以使用 shell 机制(您的 mpistarter 是否使用进程号定义环境变量?)将每个进程的输出发送到不同的文件。或者你可以编写一个MPI_printf 函数,作为进程号的前缀。 【参考方案1】:

我终于找到了一种方法来完成所有这些事情,对于那些感兴趣的人,我链接了下面的代码。它可以按我的意愿工作。

但是,我认为这不是更好的方法,所以如果有人可以向我解释如何优化它,我将不胜感激!

这是我的完整代码以及一些解释 cmets:


#include <mpi.h>
#include <iostream>
#include <math.h>
using namespace std;


int main(int argc, char* argv[]) 


    int rank,nbproc, taille;
    char name[80];
    float time;
    double SendData, ReceiveData;
    double NumTo6 = 500;
    double NumTo7 = 300;
    double ReceiveFrom6, ReceiveFrom7;

    char message[] = "preceived";
    int longueur = strlen(message); 
    int len_buffer = 200;
    char Buffer_Time[len_buffer];
    char Buffer_Hello[len_buffer];
    char Buffer_message[len_buffer];
    char receive[] = "nothing"; 

    char* Gathered_Char_message = new char[len_buffer];
    double DataMessage;
    double* GatheredDataMessage = new double[20]; 
    double* GateredDataTime = new double[20]; 
    double DataTime;
    int source = 0;
    int* GatheredSources = new int[20];
    double NoData = NAN; 
    SendData = 1254.3356;
    ReceiveData = 0;

    cout << " " << endl;

    MPI_Init(&argc, &argv); 
    time = MPI_Wtime();

    
    MPI_Comm_size(MPI_COMM_WORLD, &nbproc); 
    MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
    MPI_Get_processor_name(name, &taille);
    
    sprintf(Buffer_Hello,"Hello from process %d among %d of the machine %s",rank,nbproc,name);
    sprintf(Buffer_Time,"Time elapsed in process %d on %d is " ,rank,nbproc);
    sprintf(Buffer_message,"Data received from process ");

    
    MPI_Send(Buffer_Time,len_buffer,MPI_CHAR,0,rank+20,MPI_COMM_WORLD);
    MPI_Send(Buffer_Hello,len_buffer,MPI_CHAR,0,rank+10,MPI_COMM_WORLD);
    MPI_Send(Buffer_message,len_buffer,MPI_CHAR,0,rank+30,MPI_COMM_WORLD);
    
    if (rank == 1) 
    
        DataMessage = 5*6; 
        source = 1;
        

    
    if (rank == 2) 
    
        MPI_Send(&SendData,1,MPI_DOUBLE,3,1,MPI_COMM_WORLD); 
        DataMessage = NoData; 
    

    
  
    if (rank == 3) 
    
        MPI_Recv(&ReceiveData,1,MPI_DOUBLE,2,1,MPI_COMM_WORLD,MPI_STATUS_IGNORE); 
        DataMessage = ReceiveData;
        source = 2;
    

    if (rank == 4)
    
        MPI_Send(&message[1],longueur+1,MPI_CHAR,5,2,MPI_COMM_WORLD); // takes only a part of the message 
        DataMessage = NoData; 
    

    if (rank == 5)
       
        MPI_Recv(&receive,longueur+1,MPI_CHAR,4,2,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
        DataMessage = NoData; 
        source = 4;
    

    if(rank == 6)
    
        MPI_Sendrecv(&NumTo7,1,MPI_DOUBLE,7,3,&ReceiveFrom7,1,MPI_DOUBLE,7,4,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
        DataMessage = ReceiveFrom7;
        source = 7;
    

    if(rank == 7)
    
        MPI_Sendrecv(&NumTo6,1,MPI_DOUBLE,6,4,&ReceiveFrom6,1,MPI_DOUBLE,6,3,MPI_COMM_WORLD,MPI_STATUS_IGNORE); 
        DataMessage = ReceiveFrom6;
        source = 6;
    

   
   
    MPI_Gather(&DataMessage,1,MPI_DOUBLE,GatheredDataMessage,1,MPI_DOUBLE,0,MPI_COMM_WORLD);
    MPI_Gather(&source,1,MPI_INT,GatheredSources,1,MPI_INT,0,MPI_COMM_WORLD);

   
    int mylen = strlen(receive); // takes back the length of receive 
    int* recvcounts = new int[nbproc*sizeof(int)]; // stores all the lengths after gathering

    MPI_Gather(&mylen,1,MPI_INT,recvcounts,1,MPI_INT,0,MPI_COMM_WORLD); // Gathering all lengths
    
    int totlen = 0; // Total length of the string gathered with spaces
    int* displs = new int[nbproc*sizeof(int)]; // stores all spaces


    if(rank == 0)
    
        displs[0] = 0; // no space first
        totlen += recvcounts[0] + 1; // first word + space

        for(int i=1; i < nbproc; i++)
        
            totlen += recvcounts[i]+1; // (+ space and + \0 for the last iteration)
            displs[i] = displs[i-1] + recvcounts[i-1] + 1; // previous space + length of previous word + 1 for the \0
        
    


    char* totalstring = new char[totlen*sizeof(char)]; // Big string with all the 'receive' words and spaces

    if(rank == 0)
    
        for (int i=0; i<totlen; i++) // totlen = 65 -> 7*7 + 8 + 8(spaces + \0)
            totalstring[i] = ' '; // pre allocation
        
        totalstring[totlen] = '\0'; // end of string
    
  
    // Gather all the 'receive' words from each process to create a big string with spaces between words
       MPI_Gatherv(&receive, mylen, MPI_CHAR,
                totalstring, recvcounts, displs, MPI_CHAR,
                0, MPI_COMM_WORLD);
   

    char* piece = strtok(totalstring," "); // string function for splitting strings/ char array 
    //with a delimiter (here a space)
 

    DataTime = MPI_Wtime() - time;
    MPI_Gather(&DataTime,1,MPI_DOUBLE,GateredDataTime,1,MPI_DOUBLE,0,MPI_COMM_WORLD);

   if(rank == 0)
   
        cout << Buffer_Hello << endl;
        cout << "Message received: " << piece << endl;
        cout << " " << endl;

        for(int i = 1; i < nbproc; i++)
        
           MPI_Recv(Buffer_Hello,len_buffer,MPI_CHAR,i,i+10,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
           MPI_Recv(Buffer_message,len_buffer,MPI_CHAR,i,i+30,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
           cout << Buffer_Hello << endl;
          
           if(isnan(GatheredDataMessage[i]))
           

           
           else 
           
               cout << Buffer_message << GatheredSources[i] << ": "<<
               GatheredDataMessage[i] << endl;
             

             if(piece != NULL)  // Stop condition when it reads the \0 
            
                piece = strtok(NULL," "); // Alternativelly, a null pointer may be specified, 
                //in which case the function continues scanning where a previous successful call to the function ended.
                cout << "Message received: " << piece << endl;
            
        
            

            MPI_Recv(Buffer_Time,len_buffer,MPI_CHAR,i,i+20,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
            cout << Buffer_Time << GateredDataTime[i] << " sec" << endl;
            cout << " " << endl;
        
        cout << "Time elapsed in process 0 on 8 is " << MPI_Wtime() - time << "sec " << endl;
   

    delete[] GatheredDataMessage; 
    delete[] GateredDataTime;  
    MPI_Finalize(); 
   
    return 0;


这是我的输出:

Hello from process 0 among 8 of the machine jeremy-SATELLITE-P50-C
Message received: nothing
 
Hello from process 1 among 8 of the machine jeremy-SATELLITE-P50-C
Data received from process 1: 30
Message received: nothing
Time elapsed in process 1 on 8 is 0.000141763 sec
 
Hello from process 2 among 8 of the machine jeremy-SATELLITE-P50-C
Message received: nothing
Time elapsed in process 2 on 8 is 0.000560879 sec
 
Hello from process 3 among 8 of the machine jeremy-SATELLITE-P50-C
Data received from process 2: 1254.34
Message received: nothing
Time elapsed in process 3 on 8 is 0.0037692 sec
 
Hello from process 4 among 8 of the machine jeremy-SATELLITE-P50-C
Message received: nothing
Time elapsed in process 4 on 8 is 0.000128876 sec
 
Hello from process 5 among 8 of the machine jeremy-SATELLITE-P50-C
Message received: received
Time elapsed in process 5 on 8 is 0.00140292 sec
 
Hello from process 6 among 8 of the machine jeremy-SATELLITE-P50-C
Data received from process 7: 500
Message received: nothing
Time elapsed in process 6 on 8 is 0.00226591 sec
 
Hello from process 7 among 8 of the machine jeremy-SATELLITE-P50-C
Data received from process 6: 300
Message received: nothing
Time elapsed in process 7 on 8 is 0.000495762 sec
 
Time elapsed in process 0 on 8 is 0.00330377sec 

【讨论】:

以上是关于如何在 C++ 中使用 MPI 同步和排序打印(任务)的主要内容,如果未能解决你的问题,请参考以下文章

在 C++ 中使用排序运算符 < 定义 MPI 结构

分段错误:C++ 中的结构序列化和 MPI 数据传输

使用 MPI 和 Armadillo 在 C++ 中进行并行化

我应该如何使用 CMake 构建 MPI C++ 程序?

MPI:如何启动三个将在不同线程中执行的函数

如何在执行 time.sleep() 之前让 mpi4py 进程完成打印?