来自多个线程的 MPI RMA

Posted

技术标签:

【中文标题】来自多个线程的 MPI RMA【英文标题】:MPI RMA from multiple threads 【发布时间】:2022-01-01 23:23:35 【问题描述】:

在我的应用程序中,与 MPI 单向通信 (RMA) 相比,我通过 TBB 流程图实现了我所谓的“稀疏向量减少”。该算法的核心部分如下所示:

auto &reduce = m_g_R.add<function_node<ReductionJob, ReductionJob>>(
  serial,
  [=, &reduced_bi](ReductionJob rj) noexcept
  
    const auto r = std::get<0>(rj);

    auto *buffer = std::get<1>(rj)->data.data();
    auto &mask = std::get<1>(rj)->mask;

    if (m_R_comms[r] != MPI_COMM_NULL)
    
      const size_t n = reduced_bi.dim(r);

      MPI_Win win;

      MPI_Win_create(
        buffer,
        r == mr ? n * sizeof(T) : 0,
        sizeof(T),
        MPI_INFO_NULL,
        m_R_comms[r],
        &win
      );

      if (n > 0 && r != mr)
      
        MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, win);

        size_t i = 0;

        do
        
          while (i < n && !mask[i]) ++i;

          size_t base = i;

          while (i < n && mask[i]) ++i;

          if (i > base) MPI_Accumulate(
            buffer + base, i - base, MpiType<T>::type,
            0,
            base, i - base, MpiType<T>::type,
            MPI_SUM,
            win
          );
        
        while (i < n);

        MPI_Win_unlock(0, win);
      

      MPI_Win_free(&win);
    

    return rj;
  
);

这是针对参与计算的每个等级 r 执行的,reduced_bi.dim(r) 指定每个等级拥有多少元素。 mr 是当前等级,并且以这样一种方式创建通信器,即目标进程是它们每个的根。 bufferT = double 的数组(通常),maskstd::vector&lt;bool&gt; 标识哪些元素非零。循环的组合将通信拆分为非零元素块。

这通常可以正常工作并且结果是正确的,与我之前使用MPI_Reduce 的实现相同。但是,将此节点的并发级别设置为serial 似乎很重要,这表明最多有一个并行 TBB 任务(因此最多有一个线程)执行此代码。

我想将其设置为 unlimited 以提高性能,而且确实在我的笔记本电脑上运行 MPICH 3.4.1 的小型作业可以正常工作。然而,在我真正想要运行计算的集群上,使用 OpenMPI 4.1.1,它运行了一段时间,然后因段错误和涉及一堆 UCX 函数的回溯而崩溃。

我现在想知道,是否不允许有多个线程并行调用这样的 RMA 操作(在我的笔记本电脑上它只是偶然工作),还是我遇到了集群上的错误/限制?从文档中我没有直接看到我想做的事情不受支持。

当然,MPI 是用MPI_THREAD_MULTIPLE 初始化的,我再次重复上面发布的 sn-p 工作正常,只有当我更改 serial --> unlimited 以启用并发执行时,我才会遇到问题在集群上。


在回复下面的 Victor Eijkhout 评论时,这里有一个完整的示例程序,可以重现该问题。这在我的笔记本电脑上运行良好(专门使用 mpirun -n 16 进行了测试),但是当我以 16 个等级(分布在 4 个集群节点上)运行它时,它在集群上崩溃。

#include <iostream>
#include <vector>
#include <thread>

#include <mpi.h>

int main(void)

  int requested = MPI_THREAD_MULTIPLE, provided;

  MPI_Init_thread(nullptr, nullptr, requested, &provided);

  if (provided != requested)
  
    std::cerr << "Failed to initialize MPI with full thread support!"
              << std::endl;
    exit(1);
  

  int mr, nr;

  MPI_Comm_rank(MPI_COMM_WORLD, &mr);
  MPI_Comm_size(MPI_COMM_WORLD, &nr);

  const size_t dim = 1024;
  const size_t repeat = 100;

  std::vector<double> send(dim, static_cast<double>(mr) + 1.0);
  std::vector<double> recv(dim, 0.0);

  MPI_Win win;

  MPI_Win_create(
    recv.data(),
    recv.size() * sizeof(double),
    sizeof(double),
    MPI_INFO_NULL,
    MPI_COMM_WORLD,
    &win
  );

  std::vector<std::thread> threads;

  for (size_t i = 0; i < repeat; ++i)
  
    threads.clear();
    threads.reserve(nr);

    for (int r = 0; r < nr; ++r) if (r != mr)
    
      threads.emplace_back([r, &send, &win]
      
        MPI_Win_lock(MPI_LOCK_SHARED, r, 0, win);

        for (size_t i = 0; i < dim; ++i) MPI_Accumulate(
          send.data() + i, 1, MPI_DOUBLE,
          r,
          i, 1, MPI_DOUBLE,
          MPI_SUM,
          win
        );

        MPI_Win_unlock(r, win);
      );
    

    for (auto &t : threads) t.join();

    MPI_Barrier(MPI_COMM_WORLD);

    if (mr == 0) std::cout << recv.front() << std::endl;
  

  MPI_Win_free(&win);
  MPI_Finalize();

注意:我在这里故意使用纯线程来避免不必要的依赖。它应该与-lpthread 链接。

我在集群上遇到的具体错误是这样的,使用 OpenMPI 4.1.1:

*** An error occurred in MPI_Accumulate
*** reported by process [1829189442,11]
*** on win ucx window 3
*** MPI_ERR_RMA_SYNC: error executing rma sync
*** MPI_ERRORS_ARE_FATAL (processes in this win will now abort,
***    and potentially your MPI job)

来自ompi_info的可能相关部分:

Open MPI: 4.1.1
Open MPI repo revision: v4.1.1
Open MPI release date: Apr 24, 2021
Thread support: posix (MPI_THREAD_MULTIPLE: yes, OPAL support: yes, OMPI progress: no, Event lib: yes)

已用 UCX/1.10.1 编译。

【问题讨论】:

不知道是不是每个线程都创建了窗口内存的问题。那不应该是每个进程吗? 哦,这是在任何地方明确说明的吗? MPI_Win_create 的 OpenMPI 文档说“每个进程都指定一个现有内存的窗口,它向 comm 组中的进程公开给 RMA 访问。”我认为这可以理解为“每个进程一个窗口”,也可以理解为“每个通信器一个窗口”。 MPI_Win_create 的 MPICH 文档说“这个例程是线程安全的。这意味着这个例程可以被多个线程安全地使用,而不需要任何用户提供的线程锁。” Windows 在一个通信器上是集体的,因此通信器中的每个进程都需要进行一些创建调用。您似乎让每个 thread 进行创建调用。我会将 create 调用移到您的 lambda 之外。无论如何,您可能想多次使用 Windows,对吧?线程安全仅指使用窗口,而不是创建。 在这种情况下,流程图拓扑(未在 sn-p 中显示)实际上确保每个通信器只有一个窗口,并且所有进程以相同的顺序执行窗口创建。但是,是的,理想情况下,我想在前面创建所有窗口。 事实上,我有一个以这种方式实现的版本,但它也与 OpenMPI 崩溃,给了我 MPI_ERR_WIN。我的理论是,可能存在一个约束,要求创建和使用窗口仅限于同一个线程,但当时可能还有其他问题。如果我稍微改变我的算法并将 MPI_Accumulate 也用于 rank-local 更新,我现在意识到实际上有一种方法只使用一个通信器和窗口每个进程,所以我会尝试那个版本。 【参考方案1】:

C++ 中的风格是将*&amp;类型,而不是标识符。这是在 Stroustrup 的第一本书的开头特别提到的,这是与 C 风格的有意区别。


创建 -- 锁定 -- 解锁 -- 免费

⧺R.1 使用资源句柄和 RAII(资源获取即初始化)自动管理资源

使用包装类,要么为此目的而编写,为此 C API 设计,通用资源管理器模板,要么使用带有自定义删除器的 unique_ptr,而不是必须的显式调用匹配正确的行为。

RAII/RFID 是 C++ 的基本优势之一,使用它可以大大减少代码错误并及时维护。


使用解构语法。

const auto r = std::get<0>(rj);
auto *buffer = std::get<1>(rj)->data.data();
auto &mask = std::get<1>(rj)->mask;

您可以直接命名组件,而不是引用get&lt;0&gt;get&lt;1&gt;

const auto& [r, fred] = rj;
auto* buffer = fred->data.data();
auto& mask = fred->mask;

【讨论】:

我同意结构化绑定可以提高可读性,但在我的代码中,我大多避免使用它们,因为目前它们无法被捕获到 lambdas 中。 (我相信,标准中发现了一些导致这种情况的缺陷,但我现在不记得细节了。)无论如何,我很抱歉我看不出这与我的 MPI 问题有什么关系。 这个答案是由在 C++ 代码中寻找“锁”的机器人生成的吗?这无关紧要。 @VictorEijkhout 一点也不。我不小心从Code Review 上网,因为它出现在右侧,并没有注意到它切换到主 S.O.网站。 @JDługosz 感谢您的澄清。感谢您进行代码审查。这是一项有用的服务。那里有很多糟糕的 C++ 代码。

以上是关于来自多个线程的 MPI RMA的主要内容,如果未能解决你的问题,请参考以下文章

具有多个 Pthread 的 MPI

多进程 MPI 与多线程 std::thread 性能

InfiniBand:传输速率取决于 MPI_Test* 频率

MPI + 线程并行化与仅 MPI 相比有啥优势(如果有)?

MPI +线程并行化与仅MPI的优势(如果有的话)是什么?

这可以是多线程 MPI_Irecv 中最原子的“如果未收到则取消”