MPI 发送自定义序列化对象(更通用的代码)

Posted

技术标签:

【中文标题】MPI 发送自定义序列化对象(更通用的代码)【英文标题】:MPI send for custom serialized objects (more generic code) 【发布时间】:2018-10-02 03:58:27 【问题描述】:

问题

我正在尝试找到一种通过 MPI 发送自定义序列化对象(不是自定义 MPI 结构 - 参见下面的定义)的正确方法。在阅读了一些材料和 *** 之后,我有一个使用boost::serialization 并将序列化对象作为stringstream 发送的工作 示例。但是,我当前的解决方案看起来有点hackish,请参见下面的快照(完整代码附在末尾部分)。

我的问题:您能否对当前的解决方案发表意见并推荐一些行业接受的方式来发送自定义序列化对象?

.

限制:不幸的是,boost.mpi 不是一个选项,因为它与 openmpi 的依赖关系在我的 ubuntu-xenial 基础架构上有一个与 TCP 相关的错误。我只使用纯mpich

自定义对象定义

自定义对象:在我的示例中,自定义对象序列化其base 类、std::vectorboost::shared_ptr 和其他一些简单变量。

MPI 发送/接收快照

这是我如何发送/接收流的小程序快照。

 if (rank == 1) 
        std::stringstream mystream;
        //...more serialization code here

        int len = mystream.str().size();
        MPI_Send( &len, 1, MPI_INT, 1, lentag, MPI_COMM_WORLD );
        MPI_Send( (void *)mystream.str().c_str(), len, MPI_BYTE, 1, datatag, MPI_COMM_WORLD );

 else if (rank == 1) 
        int len;
        MPI_Recv( &len, 1, MPI_INT, 0, lentag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

        char data[len+1];
        MPI_Recv( data, len, MPI_BYTE, 0, datatag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        data[len] = '\0';

        std::stringstream mystream;
        mystream.write((const char*) data, len);

        //...more deserialization code here

程序输出

这是程序输出。可以看到数据从rank 0转移到rank 1成功了。

$ mpirun.mpich -np 2 ./mpidata 
Rank 0 sum in 6
Rank 0 vsize out 4
Rank 0 ptr out 30

Rank 1 sum in 6
Rank 1 vsize in 4
Rank 1 ptr in 30

MPI 发送/接收完整代码

下面提供了完整的代码。

#include <mpi.h>
#include <iostream>
#include <sstream>
#include <vector>

#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/binary_iarchive.hpp>
#include <boost/serialization/base_object.hpp>
#include <boost/serialization/shared_ptr.hpp>
#include <boost/serialization/vector.hpp>
#include <boost/serialization/string.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>

// Forward declaration of class boost::serialization::access
namespace boost 
namespace serialization 
class access;



class Obj 
public:
    // Serialization expects the object to have a default constructor
    Obj() : d1_(-1), d2_(-2) 
    Obj(int d1, int d2) : d1_(d1), d2_(d2) 
    bool operator==(const Obj& o) const 
        return d1_ == o.d1_ && d2_ == o.d2_;
    

    const int sum() const return d1_+d2_;
private:
    int d1_;
    int d2_;

    // Allow serialization to access non-public data members.
    friend class boost::serialization::access;

    template<typename Archive>
    void serialize(Archive& ar, const unsigned version) 
        ar & d1_ & d2_; // Simply serialize the data members of Obj
    
;

class ObjChild : public Obj 

private:
    typedef Obj _Super;

public:
    ObjChild() : Obj(),d1_(-1),dv_1,2,iptr_(new Obj()) 
    ObjChild(
            int d1,
            int d2,
            int d1new,
            std::vector<int> const& dv,
            boost::shared_ptr<Obj> const& obj
            ) : Obj(d1,d2),d1_(d1new),dv_(dv),iptr_(obj) 

    const int sum2() const return d1_ + sum();
    const int vsize() const return dv_.size();
    const int ptrsum() const return iptr_->sum();

private:
    int d1_; // Another d1_
    std::vector<int> dv_;
    boost::shared_ptr<Obj> iptr_;

    // -------------------------------------------------------------
    friend class boost::serialization::access;

    template <typename Archive>
    void serialize(Archive& ar, const unsigned version) 
        ar & boost::serialization::base_object<_Super>(*this);
        ar & d1_;
        ar & dv_;
        ar & iptr_;
    
    // -------------------------------------------------------------
;

int main(int argc,char** argv) 

    int size, rank;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (size < 2) 
        if (rank == 0)
            std::cerr << "Require at least 2 tasks" << std::endl;
        MPI_Abort(MPI_COMM_WORLD, 1);
    

   const int lentag=0;
   const int datatag=1;
   if (rank == 0) 

        std::stringstream mystream;

        ObjChild obj(1,3,2,1,2,3,4,boost::make_shared<Obj>(10,20));

        boost::archive::binary_oarchive oarchivemystream;
        oarchive << obj;

        std::cout<<"Rank "<< rank << " sum in " << obj.sum2() << std::endl;
        std::cout<<"Rank "<< rank << " vsize out " << obj.vsize() << std::endl;
        std::cout<<"Rank "<< rank << " ptr out " << obj.ptrsum() << std::endl;

        int len = mystream.str().size();
        // Send length, then data
        MPI_Send( &len, 1, MPI_INT, 1, lentag, MPI_COMM_WORLD );
        MPI_Send( (void *)mystream.str().c_str(), len, MPI_BYTE, 1, datatag, MPI_COMM_WORLD );


         else if (rank == 1) 
            int len;
            MPI_Recv( &len, 1, MPI_INT, 0, lentag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

            char data[len+1];
            MPI_Recv( data, len, MPI_BYTE, 0, datatag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            data[len] = '\0';

            std::stringstream mystream;
            mystream.write((const char*) data, len);

            boost::archive::binary_iarchive iarchive(mystream);

            ObjChild obj;

            iarchive >> obj;

            std::cout<<"Rank "<< rank << " sum in "<< obj.sum2() << std::endl;
            std::cout<<"Rank "<< rank << " vsize in " << obj.vsize() << std::endl;
            std::cout<<"Rank "<< rank << " ptr in " << obj.ptrsum() <<         std::endl;

        

        MPI_Finalize();
        return EXIT_SUCCESS;

【问题讨论】:

感谢您提供完整的代码示例。虽然这个问题很好并且可以回答,但请注意不要邀请自以为是的答案(“最佳实践”)。而是专注于切实的目标和具体问题,例如如何使用更简洁的“不那么老套”的抽象 【参考方案1】:

如果您无论如何都在使用 Boost.Serialization,那么绝对明智的做法是使用 Boost.MPI。这将基本上隐藏通信部分的所有序列化样板,如下所示:

  boost::mpi::environment env;
  boost::mpi::communicator world;
  auto rank = world.rank();

  if (world.size() < 2) 
    if (rank == 0)
      std::cerr << "Require at least 2 tasks" << std::endl;
    MPI_Abort(MPI_COMM_WORLD, 1);
  

  const int datatag = 1;
  if (rank == 0) 
    ObjChild obj(1, 3, 2, 1, 2, 3, 4, boost::make_shared<Obj>(10, 20));

    std::cout << "Rank " << rank << " sum in " << obj.sum2() << std::endl;
    std::cout << "Rank " << rank << " vsize out " << obj.vsize() << std::endl;
    std::cout << "Rank " << rank << " ptr out " << obj.ptrsum() << std::endl;

    world.send(1, datatag, obj);
   else if (rank == 1) 
    ObjChild obj;
    world.recv(0, datatag, obj);

    std::cout << "Rank " << rank << " sum in " << obj.sum2() << std::endl;
    std::cout << "Rank " << rank << " vsize in " << obj.vsize() << std::endl;
    std::cout << "Rank " << rank << " ptr in " << obj.ptrsum() << std::endl;
  

POD 等某些类型可能受益于额外指定 is_mpi_datatype,但 ObjChild 由于指针的原因不符合条件。

不幸的是,尽管 boost 享有良好的声誉,但 Boost.MPI 似乎几乎没有得到维护,基本问题没有得到解决甚至讨论。对序列化对象的非阻塞通信要特别小心。因此,如果您不愿意自己投资修复东西,我不一定会推荐 Boost.MPI 用于生产代码。这可能仍然比自己从地面建造要好。另请注意,序列化,尤其是 Boost 的实现相当慢,可能不适合某些 HPC 用例,在这些用例中,最好将内存布局设计为首先不需要任何序列化或复杂打包。

【讨论】:

我绝不会推荐使用 boost.mpi。这个问题比维护问题更糟糕。构建器不支持 boost.mpi ......你肯定想要一个从集群到集群的可移植解决方案......所以硬编码是最好的方法(在我看来)。 您当然可以在各种 HPC 系统上运行依赖于 Boost.MPI 的程序——如果这甚至是一个目标的话。如果已经存在对 Boost.Serialization 的依赖,那么这个论点就更弱了。由 user1221647 权衡使用 Boost.MPI 的利弊。正如我所写,我对 Boost.MPI 有自己的看法。 @Zulan 谢谢你的回答。我很抱歉,更不用说由于一些 openmpi 错误,我无法运行 boost.mpi。我只使用纯 MPICH。我很想尝试修复 openmpi+boost.mpi 并重试您的建议。 如果boost.mpi 不是一个选项,您还有其他选择吗? 重建一个类似的接口,但只关注可序列化类型(而不是原始 MPI 数据类型)并且只进行阻塞通信(而不是非阻塞)。

以上是关于MPI 发送自定义序列化对象(更通用的代码)的主要内容,如果未能解决你的问题,请参考以下文章

使用 MPI 集体通信发送 Struct

MPI自定义数据类型

C中的结构序列化并通过MPI传输

如何使用 MPI 传输带有动态数组的自定义结构?

Java TCP/IP Socket构建和解析自定义协议消息(含代码)

iOS 自定义对象及子类及模型套模型的拷贝归档存储的通用代码