OpenMPI Java绑定行为差异锁定,累积,获取

Posted

技术标签:

【中文标题】OpenMPI Java绑定行为差异锁定,累积,获取【英文标题】:OpenMPI Java Bindings Behavior Difference for lock, accumulate, get 【发布时间】:2018-11-09 16:52:19 【问题描述】:

我有一个案例,我需要在我们的研究集群上使用 Java 和 MPI。 this question 很好地介绍了我需要的一个特定功能(链接答案中包含 C++ 代码)。我构建了 C++ 代码,它完全按预期工作。

我尝试构建与此代码等效的 Java,但失败了。即使在功能上我已经复制了 C++ 代码的功能,Java 版本并不能始终如一地返回所需的结果。

mpiexec --oversubscribe -n 4 ./test

0 got counter 1
2 got counter 2
1 got counter 3
3 got counter 4
 1  1  1  1 

(在我的本地笔记本电脑上使用 --oversubscribe 运行。)

当我运行我的 Java 等效项时,我没有得到任何接近相同的结果:

mpirun --oversubscribe -n 4 java -cp .:/usr/local/lib/openmpi/mpi.jar CounterTest

0 got counter 1
3 got counter 1
1 got counter 3
2 got counter 2
 1  1  1  1 

我希望每个等级都有一个且只有一个计数器。这次运行,计数器 1 被使用了两次。一旦在一个蓝色的月亮,我可以让它给我送 1 - 4 个(顺序不重要;唯一计数是)。

我们在集群上运行 2.1.0 版。在我的本地笔记本电脑上,我安装了 OpenMPI 2.1.0 和 3.1.0(当前),我可以在任一版本上重现 C++ 程序的正确行为和 Java 程序的不当行为。

这是我创建的 Counter 类:

import java.nio.ByteBuffer;
import java.util.ArrayList;

import mpi.MPI;
import mpi.MPIException;
import mpi.Win;

public class Counter 
    Win win;
    int hostRank;
    int myVal;
    ByteBuffer data;
    int rank;
    int size;

    public Counter(int hostRank) throws MPIException 
        this.setHostRank(hostRank);
        this.setSize(MPI.COMM_WORLD.getSize());
        this.setRank(MPI.COMM_WORLD.getRank());

        if (this.getRank() == hostRank) 
//          this.setData(MPI.newByteBuffer(this.getSize() * Integer.BYTES));
            this.setData(ByteBuffer.allocateDirect(this.getSize() * Integer.BYTES));
            for (int i = 0; i < this.getData().capacity(); i += Integer.BYTES)
                this.getData().putInt(i, 0);
         else 
//          this.setData(MPI.newByteBuffer(0));
            this.setData(ByteBuffer.allocateDirect(0));
           

        this.setWin(new Win(this.getData(), this.getData().capacity(), Integer.BYTES,
                MPI.INFO_NULL, MPI.COMM_WORLD));

        this.setMyVal(0);
    

    public int increment(int increment) throws MPIException 

        // A list to store all of the values we pull
        ArrayList<Integer> vals = new ArrayList<Integer>();
        for (int i = 0; i < this.getSize(); i++)
            vals.add(i, 0);

        // Need to convert the increment to a buffer
        ByteBuffer incrbuff = ByteBuffer.allocateDirect(Integer.BYTES);
        incrbuff.putInt(increment);

        // Our values are returned to us in a byte buffer
        ByteBuffer valBuff = ByteBuffer.allocateDirect(Integer.BYTES);

//      System.out.printf("Data for RANK %d: ", this.getRank());
        this.getWin().lock(MPI.LOCK_EXCLUSIVE, this.getHostRank(), 0);
        for (int i = 0; i < this.getSize(); i++) 
            // Always ensure that we're at the top of the buffer
            valBuff.position(0);
            if (i == this.getRank()) 
                this.getWin().accumulate(incrbuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT, MPI.SUM);
                // Without this, it comes back all 1s 
                this.getWin().flushLocalAll();
//              System.out.printf(" [%d] ", this.getMyVal() + increment);
             else 
                this.getWin().get(valBuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT);
                vals.set(i, valBuff.getInt(0));
//              System.out.printf("  %d  ", vals.get(i))
            
        
        this.getWin().unlock(this.getHostRank());

        this.setMyVal(this.getMyVal() + increment);
        vals.set(this.getRank(), this.getMyVal());

//      System.out.printf(" <<%d>> \n", vals.stream().mapToInt(Integer::intValue).sum());
//      this.getWin().unlock(this.getHostRank());

        return vals.stream().mapToInt(Integer::intValue).sum();

    

    public void printCounter() 
        if (this.getRank() == this.getHostRank()) 
            for (int i = 0; i < this.getSize(); i++) 
                System.out.printf(" %d ", this.getData().getInt());
            
            System.out.println("");
        
    

    public void delete() throws MPIException 
        this.getWin().detach(this.getData());
        this.getWin().free();

        this.setData(null);
        this.setHostRank(0);
        this.setMyVal(0);
        this.setRank(0);
        this.setSize(0);
        this.setWin(null);

    

    private Win getWin() 
        return win;
    

    private void setWin(Win win) 
        this.win = win;
    

    private int getHostRank() 
        return hostRank;
    

    private void setHostRank(int hostrank) 
        this.hostRank = hostrank;
    

    private int getMyVal() 
        return myVal;
    

    private void setMyVal(int myval) 
        this.myVal = myval;
    

    private ByteBuffer getData() 
        return data;
    

    private void setData(ByteBuffer data) 
        this.data = data;
    

    private int getRank() 
        return rank;
    

    private void setRank(int rank) 
        this.rank = rank;
    

    private int getSize() 
        return size;
    

    private void setSize(int size) 
        this.size = size;
    


还应注意,Java 代码包含 C++ 代码不包含的内容:

this.getWin().flushLocalAll();

没有这个,计数器对于每个等级都会是“1”。

这里也是测试类的第一部分:

import java.util.Random;

import mpi.*;

public class CounterTest 

    public static void main(String[] args) 

        try 
            MPI.Init(args);
         catch (MPIException e1) 
            // TODO Auto-generated catch block
            e1.printStackTrace();
        

        try 
            test1();
//          test2();
         catch (MPIException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
        

        try 
            MPI.Finalize();
         catch (MPIException e) 
            // TODO Auto-generated catch block
            e.printStackTrace();
        

    

    public static void test1 () throws MPIException 
        Counter c = new Counter(0);
        int rank = MPI.COMM_WORLD.getRank();
        int size = MPI.COMM_WORLD.getSize();

        int result = c.increment(1);
        System.out.printf("%d got counter %d\n", rank, result);

        MPI.COMM_WORLD.barrier();
        c.printCounter();
        c.delete();
        c = null;                       

    

我尝试了各种其他技术,在尝试隔离方面,使用组来使用 MPI_Win_start() 和 MPI_Win_complete(),但无济于事。我觉得这与我可以获得的原始 C++ 代码的真实表示非常接近。

我错过了什么?为什么这与原生 C++ 代码的行为不同?

编辑:我还发现我需要在针对实际集群运行它时添加它(最近两天它因维护而停机):

this.getWin().flush(0);

【问题讨论】:

你也可以发布C++代码吗? 这是我帖子顶部的第一个链接:***.com/a/4961000/516746 我对 C++ 代码所做的唯一更改是注释掉第二个测试。想如果我不能通过 Java 方面的第一个测试,根本不需要浪费时间运行另一个测试...... 感谢您观看@GillesGouaillardet !! 【参考方案1】:

我认为问题在于这些行

this.getWin().get(valBuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT);
vals.set(i, valBuff.getInt(0));

我的理解是你不能假设valBuff 的内容是正确的之前 MPI_Win_unlock() 已被调用。

我通过使用几个缓冲区重写了子程序,并设置vals 之后 MPI_Win_unlock() 并且能够得到正确的输出。

public int increment(int increment) throws MPIException 

    // A list to store all of the values we pull
    ArrayList<Integer> vals = new ArrayList<Integer>();
    for (int i = 0; i < this.getSize(); i++)
        vals.add(i, 0);

    // Need to convert the increment to a buffer
    ByteBuffer incrbuff = ByteBuffer.allocateDirect(Integer.BYTES);
    incrbuff.putInt(increment);

    // Our values are returned to us in several byte buffers
    ByteBuffer valBuff[] = new ByteBuffer[this.getSize()];

    this.getWin().lock(MPI.LOCK_EXCLUSIVE, this.getHostRank(), 0);
    for (int i = 0; i < this.getSize(); i++) 
        // Always ensure that we're at the top of the buffer
        if (i == this.getRank()) 
            this.getWin().accumulate(incrbuff, 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT, MPI.SUM);
         else 
            valBuff[i] = ByteBuffer.allocateDirect(Integer.BYTES);
            valBuff[i].position(0);
            this.getWin().get(valBuff[i], 1, MPI.INT, this.getHostRank(), i, 1, MPI.INT);
        
    
    this.getWin().unlock(this.getHostRank());
    for (int i = 0; i < this.getSize(); i++) 
        if (i != this.getRank()) 
            vals.set(i, valBuff[i].getInt(0));
        
    

    this.setMyVal(this.getMyVal() + increment);
    vals.set(this.getRank(), this.getMyVal());

    return vals.stream().mapToInt(Integer::intValue).sum();


请注意,不再需要

this.getWin().flushLocalAll();
this.getWin().flush(0);

FWIW,我尝试使用 this.getSize() 整数的单个缓冲区,但无法正常工作。

【讨论】:

我改变了每一段代码,但我没有想到。可能是因为我在解释因为 C++ 代码会立即写入 &vals[i],所以我需要立即执行相同的操作。在集群上启动会话以进行验证。 !! :-D 不确定我是否获得了更新的版本。应该:valBuff[i] = ByteBuffer.allocateDirect(Integer.BYTES); ...是 Integer.BYTES * this.getSize()?在循环之外?它不断因 ArrayOutOfBoundException 而爆炸。 其实这个:ByteBuffer valBuff[] = new ByteBuffer[Integer.BYTES * this.getSize()]; 这让我可以超越4级! :-D 对答案进行了更改;等待同行评审。 正确的解决方法是 ByteBuffer valBuff[] = new ByteBuffer[this.getSize()]; 如果你去达拉斯参加 SC'18,你很可能会在 RIST 展位找到我 ;-)

以上是关于OpenMPI Java绑定行为差异锁定,累积,获取的主要内容,如果未能解决你的问题,请参考以下文章

openmpi + java,找不到或加载主类

OpenMPI MPI_Send 与英特尔 MPI MPI_Send

在rman增量备份中,有差异增量和累积增量的概念

Python Numpy累积/差异[重复]

Oracle 差异增量和累积增量的区别

R:如果差异超过阈值,则累积和