Parallel 和 Rcpp Armadillo 的问题:集群工作人员之间可能存在变量损坏

Posted

技术标签:

【中文标题】Parallel 和 Rcpp Armadillo 的问题:集群工作人员之间可能存在变量损坏【英文标题】:Problems with Parallel and Rcpp Armadillo: Possible variable corruption between cluster workers 【发布时间】:2019-07-13 20:02:43 【问题描述】:

我在将 Rcpp(和 Rcpp Armadillo)与并行包一起使用时遇到了一些问题,并且我得到的结果不正确,具体取决于我用于计算的内核数量。

我有一个函数compute_indices,它为我的数据中的每个观察结果计算 3 组索引。它首先使用parallel::makeCluster 创建一个(FORK)集群,具体取决于我指定的核心数量。然后它将我的数据分成相等的部分,并使用我之前创建的集群对象在每个部分上应用(使用parallel::parLapply)一个函数meancorlsm。现在meancorlsm 基本上是我在 Rcpp 和 Rcpp Armadillo 中编写的函数(称为covArmadilloMclsmNormal)的包装,因为我试图加快计算速度。但是,我有另一个完全用 R 编写的函数版本(称为meancorlsR),我用它来测试 RccpArmadillo 版本的正确性。

现在,如果我使用meancorlsm 运行compute_indices(我首先使用sourceCpp() 使covArmadilloMclsmNormal 在全局环境中可用),我会得到部分正确的答案,具体取决于我告诉的核心数量compute_indices 使用。具体来说,如果我使用 4 个核心,计算索引的前 1/4 是正确的,如果我使用 2 个核心,我的结果的前 1/2 是正确的,如果我使用单个核心,我的所有结果都是正确的.我使用meancorlsm 的R 版本(如前所述的meancorlsR)生成的答案来检查结果的正确性。由于我在使用单核时得到了正确的结果,所以我觉得 RcppArmadillo 函数是正确的,并且集群的不同线程/工作者可能在计算过程中相互干扰,因此我得到了这种奇怪的行为。

下面是compute_indices

compute_indices <- function(dt, nr = ncol(dt), n_core = 4) 
  par_cluster <- makeCluster(n_core, type = "FORK")
  # compute the column splits/partition for parallel processing
  splits <- parallel:::splitList(1:nr, n_core)
  # compute auxiliar support data
  data_means <- colMeans(dt, na.rm = T)
  data_vars <- apply(dt, MARGIN =  2, var)
  data_sds <- apply(dt, 2, sd)
  # compute Outliers using parapply
  vectors <- do.call(rbind, parLapply(par_cluster, splits,
                                      meancorlsm, dt, data_means,
                                      data_vars, data_sds))
  stopCluster(par_cluster)
  vectors

meancorlsm

meancorlsm<- function(i, mtx, means, vars, sds)
  pre_outl <- covArmadilloMclsmNormal(dti = mtx[,i], dt = mtx,
                                      dtmeans = means, dtvars = vars,
                                      dtsds = sds)
  colnames(pre_outl) <- c("sh", "mg", "ap")
  pre_outl

使用covArmadilloMclsmNormal Rcpp 函数:


#include <RcppArmadillo.h>
using namespace Rcpp;


//[[Rcpp::depends(RcppArmadillo)]]
// [[Rcpp::export]]

arma::mat covArmadilloMclsmNormal(arma::mat dti, arma::mat dt,
                     arma::vec dtmeans, arma::vec dtvars,
                     arma::vec dtsds)
  arma::mat out(dt.n_cols, dti.n_cols);
  out = arma::cov(dt, dti);
  int n = out.n_rows;
  int p = out.n_cols;
  //declare matrices to hold result
  arma::vec temp(n);
  arma::mat preout(3,p);
  for(int i = 0; i<p ; ++i)
    temp = out.col(i)/dtvars;
    preout(0,i) = arma::mean((out.col(i)/dtsds))/dtsds(i); 
    preout(1, i) = dtmeans(i) - arma::mean(temp % dtmeans);
    preout(2, i) = arma::mean(temp); 
  

  return preout.t();


现在这里是我用于测试的meancorlsm 的 R 版本:

meancorlsR <- function(i, mtx, means, vars, sds)
  pre_outl <- apply(cov(mtx, mtx[,i], use = "pairwise.complete.obs"), 2,
                    function(col)
                      tmp <- col/vars
                      c("sh" = mean(col/sds, na.rm = T), 
                        "mg" = mean(tmp * means, na.rm = T), 
                        "ap" = mean(tmp, na.rm = T)) 
                    )
  pre_outl[1,] <- pre_outl[1,]/sds[i] 
  pre_outl[2,] <- means[i] - pre_outl[2,] 
  t(pre_outl)

您可以在compute_indices 函数中将meancorlsm 函数替换为meancorlsR,它将起作用(用于测试)。然而,为了立即重现,我在这里提供compute_indicesR

compute_indicesR <- function(dt, nr = ncol(dt), n_core = 4) 
  par_cluster <- makeCluster(n_core, type = "FORK")
  # compute the column splits/partition for parallel processing
  splits <- parallel:::splitList(1:nr, n_core)
  # compute auxiliar support data
  data_means <- colMeans(dt, na.rm = T)
  data_vars <- apply(dt, MARGIN =  2, var)
  data_sds <- apply(dt, 2, sd)
  # compute using parapply
  vectors <- do.call(rbind, parLapply(par_cluster, splits,
                                      meancorlsR, dt, data_means, 
                                      data_vars, data_sds))
  stopCluster(par_cluster)
  vectors

最后,这是一个运行的最小示例:

library(Rcpp)
library(parallel)
# use this to source the Rcpp function from a file
# to make the covArmadilloMclsmNormal function available
sourceCpp("covArmadilloMclsmNormal.cpp")

data("attitude") # available in datasets in base R
dt <- t(as.matrix(attitude[1:10,])) #select first 10 row 

indices_rcpp4 <- compute_indices(dt) # using 4 cores
indices_rcpp2 <- compute_indices(dt, n_core = 2) # using 2 cores
indices_rcpp1 <- compute_indices(dt, n_core = 1) # 1 core
# using the R version
# already replaced the meancorlsm function to meancorlsR here  
indices_R <- compute_indicesR(dt) # R version

无论我指定的内核数量如何,我都希望所有输出都与 R 版本生成的输出相匹配。但是,它们是不同的。这是我使用 R 版本得到的结果:

"           sh               mg               ap 
1   0.634272567307155 -7.09315427645087   0.992492531586726
2   0.868144125333511  22.3206363514708   0.622504642756242
3   0.819231480417289  24.8027625928423   0.756088388472384
4   0.830462006956641 -6.26663378557611   1.03847748215856
5   0.836182582923674  15.0558414918816   0.901413435882058
6   0.648813304451793  23.4689784056255   0.40175151333289
7   0.839409670144446  3.73900558549848   0.883655665107456
8   0.781070895796188  13.1775081516076   0.810306856575333
9   0.772967959938828  2.24023877077873   1.1146249477264
10  0.826849986442202  3.31330282673472   0.910527502047015"

我使用 2 核的 Rcpp 版本得到的结果是

"           sh               mg               ap 
1   0.634272567307155 -7.09315427645086   0.992492531586726
2   0.868144125333511  22.3206363514708   0.622504642756242
3   0.819231480417289  24.8027625928424   0.756088388472384
4   0.830462006956641 -6.26663378557612   1.03847748215856
5   0.836182582923674  15.0558414918816   0.901413435882058
6   0.231943043232274  28.1832641199112   0.40175151333289
7   1.20839881621289   7.02471987121276   0.883655665107456
8   0.865212462148289  21.7489367230362   0.810306856575333
9   0.853048693647409  -10.474046943507   1.1146249477264
10  0.857055188335614  14.599017112449    0.910527502047015"

而对于使用 4 核的 Rccp 版本:

"           sh               mg               ap 
1   0.634272567307155 -7.09315427645086   0.992492531586726
2   0.868144125333511  22.3206363514708   0.622504642756242
3   0.819231480417289  24.8027625928424   0.756088388472384
4   0.648794650804865 -10.2666337855761   1.03847748215856
5   1.25119408317776   5.48441292045304   0.901413435882058
6   0.231943043232274  28.1832641199112   0.40175151333289
7   1.20839881621289   7.02471987121276   0.883655665107456
8   0.487272877566209  3.60607958017906   0.810306856575333
9   1.50139103326263  -6.75976122922128   1.1146249477264
10  1.01076542369015   15.4561599695919   0.910527502047015"

使用单核的 Rccp 版本产生了与 R 版本相同的答案,这是正确的结果。同样有趣的是,无论我使用的核心数量是多少,答案的ap 列保持不变,而shmg 列发生变化。

最后,我的平台是 Ubuntu 16.04。似乎FORK clusters 在 Windows 上不起作用,因此您可能无法重现此结果。但是,即使使用 PSOCK 集群,我也得到了相同的行为(在这种情况下,我使用 clusterEvalQ() 来获取必要的 Cpp 函数以使它们可供工作人员使用)。非常感谢任何关于我做错了什么的帮助或见解。

【问题讨论】:

嗨 Segun - 这是一个很长的问题,但通常要注意并行工作最好在独立数据块上完成,因为 R 显然不是多线程安全的。也许看看 RcppParallel 文档及其示例——以及它们对 RMatrixRVector 的使用。 我现在还没有所有的数学知识,但是根据arma::cov 的文档,您不应该按行拆分吗?所以使用nr = nrow(dt) 然后mtx[i,] 肯定是mtx[i, , drop = FALSE] 嗨,Alexis,整个函数的编写方式使得数据采用长格式,即我在解析到我的函数之前转置了数据。因此,数据的行实际上是通过引用函数中的列得到的。 @DirkEddelbuettel 谢谢,我会调查 RcppParallel。你认为如果我建立一个包,事情可能会更好吗?也许我使用 sourceCpp 来养活工人这一事实可能是个问题。 【参考方案1】:

别管我的 cmets,我误解了犰狳的文档。

您的 C++ 代码正在使用 i 索引帮助器 dtmeansdtsds 向量, 但是i 对于每个并行实例总是从零开始, 所以你需要传递一个偏移量来指示跳过了多少列:

//[[Rcpp::depends(RcppArmadillo)]]

#include <RcppArmadillo.h>

//[[Rcpp::export]]
arma::mat covArmadilloMclsmNormal(arma::mat dti, arma::mat dt, int offset,
                                  arma::vec dtmeans, arma::vec dtvars, arma::vec dtsds)

  arma::mat out = arma::cov(dt, dti);
  size_t p = out.n_cols;

  arma::mat preout(p,3);
  for (int i = 0; i < p; ++i) 
    arma::vec temp = out.col(i) / dtvars;
    preout(i,0) = arma::mean((out.col(i) / dtsds)) / dtsds(i + offset); 
    preout(i,1) = dtmeans(i + offset) - arma::mean(temp % dtmeans);
    preout(i,2) = arma::mean(temp); 
  

  return preout;

因此:

meancorlsm <- function(i, mtx, means, vars, sds)
  pre_outl <- covArmadilloMclsmNormal(dti = mtx[,i, drop = FALSE], dt = mtx, offset = min(i) - 1L,
                                      dtmeans = means, dtvars = vars,
                                      dtsds = sds)
  colnames(pre_outl) <- c("sh", "mg", "ap")
  pre_outl

您甚至可以按顺序证实它:

data("attitude") # available in datasets in base R
dt <- t(as.matrix(attitude[1:10,])) #select first 10 row 

# compute the column splits/partition for parallel processing
splits <- parallel:::splitList(1:ncol(dt), 2)

# compute auxiliary support data
data_means <- colMeans(dt, na.rm = T)
data_vars <- apply(dt, MARGIN =  2, var)
data_sds <- apply(dt, 2, sd)

do.call(rbind, lapply(splits,
                      meancorlsm, dt, data_means,
                      data_vars, data_sds))
             sh        mg        ap
 [1,] 0.6342726 -7.093154 0.9924925
 [2,] 0.8681441 22.320636 0.6225046
 [3,] 0.8192315 24.802763 0.7560884
 [4,] 0.8304620 -6.266634 1.0384775
 [5,] 0.8361826 15.055841 0.9014134
 [6,] 0.6488133 23.468978 0.4017515
 [7,] 0.8394097  3.739006 0.8836557
 [8,] 0.7810709 13.177508 0.8103069
 [9,] 0.7729680  2.240239 1.1146249
[10,] 0.8268500  3.313303 0.9105275

顺便说一句,如果你只是用= 覆盖它们,我认为在 C++ 代码中预分配矩阵是没有用的。

【讨论】:

非常感谢。您能否澄清关于预分配矩阵的最后一个建议。抱歉,这是我第一次使用 Rcpp 和犰狳,所以我不熟悉内存管理。我怎样才能让它更有效率?我在哪里预分配一个不必要的矩阵。谢谢 @SegunOjo 就像你第一次定义arma::mat out(dt.n_cols, dti.n_cols) 但立即用out = arma::cov(dt, dti) 覆盖它;前者分配从未使用过的内存,因为out 最终在内部持有arma::cov 分配的内存。这更像是 C++ 的东西,而不是 Rcpp;搜索复制和移动语义。

以上是关于Parallel 和 Rcpp Armadillo 的问题:集群工作人员之间可能存在变量损坏的主要内容,如果未能解决你的问题,请参考以下文章

(Rcpp, armadillo) 将 arma::vec 转换为 arma::mat

在 Rcpp (Armadillo) 函数中使用数字序列作为默认参数

Rcpp Armadillo:RStudio 说“exp”不明确

从 Rcpp Armadillo 中的 sp_mat 访问维度名称

如何通过在 Rcpp 或 Armadillo 中将矩阵乘以向量元素来复制 R 的功能?

RcppArmadillo 和 Armadillo 之间的性能差异