逐元素矩阵乘法:R 与 Rcpp(如何加快此代码的速度?)

Posted

技术标签:

【中文标题】逐元素矩阵乘法:R 与 Rcpp(如何加快此代码的速度?)【英文标题】:Elementwise matrix multiplication: R versus Rcpp (How to speed this code up?) 【发布时间】:2014-07-24 12:09:01 【问题描述】:

我是 C++ 编程的新手(使用 Rcpp 无缝集成到 R),我希望能得到一些关于如何加快计算速度的建议。

考虑以下示例:

 testmat <- matrix(1:9, nrow=3)
 testvec <- 1:3

 testmat*testvec
   #      [,1] [,2] [,3]
   #[1,]    1    4    7
   #[2,]    4   10   16
   #[3,]    9   18   27

在这里,R 回收了testvec,因此,粗略地说,testvec“变成”了一个与testmat 相同维度的矩阵,用于此乘法。然后返回 Hadamard 产品。我希望使用Rcpp 来实现此行为,即我希望矩阵testmati-th 行的每个元素都与向量testveci-th 元素相乘。我的基准测试告诉我,我的实现非常慢,我希望能就如何加快速度提出建议。这是我的代码:

首先,使用Eigen

#include <RcppEigen.h>
// [[Rcpp::depends(RcppEigen)]]

using namespace Rcpp;
using namespace Eigen;

// [[Rcpp::export]]
NumericMatrix E_matvecprod_elwise(NumericMatrix Xs, NumericVector ys)
  Map<MatrixXd> X(as<Map<MatrixXd> >(Xs));
  Map<VectorXd> y(as<Map<VectorXd> >(ys));

  int k = X.cols();
  int n = X.rows();
  MatrixXd Y(n,k) ;

  // here, I emulate R's recycling. I did not find an easier way of doing this. Any hint appreciated.      
  for(int i = 0; i < k; ++i) 
   Y.col(i) = y;
  

  MatrixXd out = X.cwiseProduct(Y);
  return wrap(out);

这是我使用 Armadillo 的实现(调整以遵循 Dirk 的示例,请参阅下面的答案):

#include <RcppArmadillo.h>
// [[Rcpp::depends(RcppArmadillo)]]

using namespace Rcpp;
using namespace arma;

// [[Rcpp::export]]
arma::mat A_matvecprod_elwise(const arma::mat & X, const arma::vec & y)
  int k = X.n_cols ;
  arma::mat Y = repmat(y, 1, k) ;  // 
  arma::mat out = X % Y;  
return out;

使用 R、Eigen 或 Armadillo 对这些解决方案进行基准测试表明,Eigen 和 Armadillo 都比 R 慢大约 2 倍。有没有办法加快这些计算速度或至少与 R 一样快?有没有更优雅的设置方法?任何建议表示赞赏和欢迎。 (我也鼓励对一般编程风格进行切题的评论,因为我是 Rcpp / C++ 的新手。)

这里有一些可重现的基准:

 # for comparison, define R function:
 R_matvecprod_elwise <- function(mat, vec) mat*vec

n <- 50000
k <- 50
X <- matrix(rnorm(n*k), nrow=n)
e <- rnorm(n)

benchmark(R_matvecprod_elwise(X, e), A2_matvecprod_elwise(X, e), E_matvecprod_elwise(X,e),
          columns = c("test", "replications", "elapsed", "relative"), order = "relative", replications = 1000)

这会产生

                  test replications elapsed relative
1  R_matvecprod_elwise(X, e)         1000   10.89    1.000
2  A_matvecprod_elwise(X, e)         1000   26.87    2.467
3  E_matvecprod_elwise(X, e)         1000   27.73    2.546

如您所见,我的Rcpp-solutions 表现相当糟糕。有什么更好的办法吗?

【问题讨论】:

您可以在 Eigen 中执行 X = X.cwiseProduct(y.replicate(1, X.cols()));,然后在 return Xs; 中执行,但是对于 1e3*1e3 矩阵,它并没有提高我系统上的 Eigen 函数的基准。 在犰狳中,尝试 .each_row() 函数:arma.sourceforge.net/docs.html#each_colrow 类似以下内容:X.each_row() %= y 仔细查看您的代码,使用.each_col() 可能更正确:X.each_col() %= y 谢谢@mtall!您建议的更改大大加快了arma-函数的速度,因此它现在超过了R,即使Rcpp 版本仍然优于arma。无论如何感谢您的评论。非常感谢您的帮助。 @coffeinjunky - 这是代码可读性、紧凑性和速度之间的权衡。虽然您始终可以编写针对手头特定任务进行优化的低级代码,但它几乎总是以更难以阅读和维护为代价(因此您更有可能产生错误)。通常只使用库提供的工具会更安全、更容易,例如 .each_col() 方法。 【参考方案1】:

如果您想加快计算速度,您必须小心不要复制。这通常意味着牺牲可读性。这是一个不复制并就地修改矩阵 X 的版本。

// [[Rcpp::export]]
NumericMatrix Rcpp_matvecprod_elwise(NumericMatrix & X, NumericVector & y)
  unsigned int ncol = X.ncol();
  unsigned int nrow = X.nrow();
  int counter = 0;
  for (unsigned int j=0; j<ncol; j++) 
    for (unsigned int i=0; i<nrow; i++)  
      X[counter++] *= y[i];
    
  
  return X;

这是我在我的机器上得到的

 > library(microbenchmark)
 > microbenchmark(R=R_matvecprod_elwise(X, e), Arma=A_matvecprod_elwise(X, e),  Rcpp=Rcpp_matvecprod_elwise(X, e))

Unit: milliseconds
 expr       min        lq    median       uq      max neval
    R  8.262845  9.386214 10.542599 11.53498 12.77650   100
 Arma 18.852685 19.872929 22.782958 26.35522 83.93213   100
 Rcpp  6.391219  6.640780  6.940111  7.32773  7.72021   100

> all.equal(R_matvecprod_elwise(X, e), Rcpp_matvecprod_elwise(X, e))
[1] TRUE

【讨论】:

这看起来很不错。这是我运行的第一个代码来击败R!如果您不介意我问,为什么将所有整数声明为 unsigned 有帮助?这排除了负值,但为什么速度会有所提高?另外,我不确定我是否理解X[counter++] *= y[i]; 这一行。 counter++ 在这里做了什么,j 发生了什么?我将非常感谢您的简短评论... 我认为未签名并不重要。如果基准因此而发生重大变化,我会感到惊讶。速度的提高是因为我们不制作向量或矩阵的任何副本。计数器变量是为了避免必须通过利用 R 如何布置矩阵中的元素来计算 (i,j) 索引在矩阵中的位置。 是的,是内存布局问题使它更快。该解决方案是唯一通过矩阵按列操作的解决方案,并且只需查找正确的标量。其他一切都必须组装不那么便宜的行向量。而counter++ 是“简单地”将矩阵作为一个长向量逐列遍历的好技巧——这可以提高速度。【参考方案2】:

对于初学者,我将犰狳版本(接口)写成

#include <RcppArmadillo.h>
// [[Rcpp::depends(RcppArmadillo)]]

using namespace Rcpp;
using namespace arma;

// [[Rcpp::export]]
arama::mat A_matvecprod_elwise(const arma::mat & X, const arma::vec & y)
  int k = X.n_cols ;
  arma::mat Y = repmat(y, 1, k) ;  // 
  arma::mat out = X % Y;  
  return out;

当您进行额外的输入和输出转换时(尽管 wrap() 被胶水代码添加)。 const &amp; 是名义上的(正如您从上一个问题中了解到的,SEXP 是一个易于复制的轻量级指针对象),但样式更好。

您没有显示您的基准测试结果,因此我无法评论矩阵大小等 pp 的影响。我怀疑您可能会在 rcpp-devel 上得到比这里更好的答案。你的选择。

编辑:如果您真的想要便宜又快速的东西,我会这样做:

// [[Rcpp::export]]
mat cheapHadamard(mat X, vec y) 
    // should row dim of X versus length of Y here
    for (unsigned int i=0; i<y.n_elem; i++) X.row(i) *= y(i);
    return X;

它不分配新内存,因此速度更快,并且可能与 R 竞争。

测试输出:

R> cheapHadamard(testmat, testvec)
     [,1] [,2] [,3]
[1,]    1    4    7
[2,]    4   10   16
[3,]    9   18   27
R> 

【讨论】:

再次感谢 Dirk 的意见。我尝试使用您的cheapHadamard-函数,但结果证明它是最慢的,这让我感到惊讶。你知道为什么它会这么慢吗? 可能跨矩阵的逐行访问很慢。不知道。现在你可以了解这个——或者看看(毕竟是开源的)R 代码,它打败了我们所有人。它可能只是调用一个更快的 BLAS 例程......【参考方案3】:

我很抱歉给 C++ 问题提供了一个本质上 C 的答案,但正如已经建议的那样,解决方案通常在于事物的有效 ​​BLAS 实现。不幸的是,BLAS 本身缺少 Hadamard 乘法,因此您必须自己实现。

这里是一个纯 Rcpp 实现,基本上是调用 C 代码。如果你想让它成为正确的 C++,可以对工作函数进行模板化,但对于大多数使用 R 的应用程序来说,这不是问题。请注意,这也是“就地”操作,这意味着它会修改 X 而不复制它。

// it may be necessary on your system to uncomment one of the following
//#define restrict __restrict__ // gcc/clang
//#define restrict __restrict   // MS Visual Studio
//#define restrict              // remove it completely

#include <Rcpp.h>
using namespace Rcpp;

#include <cstdlib>
using std::size_t;

void hadamardMultiplyMatrixByVectorInPlace(double* restrict x,
                                           size_t numRows, size_t numCols,
                                           const double* restrict y)

  if (numRows == 0 || numCols == 0) return;

  for (size_t col = 0; col < numCols; ++col) 
    double* restrict x_col = x + col * numRows;

    for (size_t row = 0; row < numRows; ++row) 
      x_col[row] *= y[row];
    
  


// [[Rcpp::export]]
NumericMatrix C_matvecprod_elwise_inplace(NumericMatrix& X,
                                          const NumericVector& y)

  // do some dimension checking here

  hadamardMultiplyMatrixByVectorInPlace(X.begin(), X.nrow(), X.ncol(),
                                        y.begin());

  return X;

这是一个先复制的版本。我对 Rcpp 的了解不够好,无法在本地执行此操作,并且不会对性能造成重大影响。在堆栈上创建并返回 NumericMatrix(numRows, numCols) 会导致代码运行速度降低约 30%。

#include <Rcpp.h>
using namespace Rcpp;

#include <cstdlib>
using std::size_t;

#include <R.h>
#include <Rdefines.h>

void hadamardMultiplyMatrixByVector(const double* restrict x,
                                    size_t numRows, size_t numCols,
                                    const double* restrict y,
                                    double* restrict z)

  if (numRows == 0 || numCols == 0) return;

  for (size_t col = 0; col < numCols; ++col) 
    const double* restrict x_col = x + col * numRows;
    double* restrict z_col = z + col * numRows;

    for (size_t row = 0; row < numRows; ++row) 
      z_col[row] = x_col[row] * y[row];
    
  


// [[Rcpp::export]]
SEXP C_matvecprod_elwise(const NumericMatrix& X, const NumericVector& y)

  size_t numRows = X.nrow();
  size_t numCols = X.ncol();

  // do some dimension checking here

  SEXP Z = PROTECT(Rf_allocVector(REALSXP, (int) (numRows * numCols)));
  SEXP dimsExpr = PROTECT(Rf_allocVector(INTSXP, 2));
  int* dims = INTEGER(dimsExpr);
  dims[0] = (int) numRows;
  dims[1] = (int) numCols;
  Rf_setAttrib(Z, R_DimSymbol, dimsExpr);

  hadamardMultiplyMatrixByVector(X.begin(), X.nrow(), X.ncol(), y.begin(), REAL(Z));

  UNPROTECT(2);

  return Z;

如果您对restrict 的用法感到好奇,这意味着您作为程序员与编译器签订合同,不同位的内存不会重叠,从而允许编译器进行某些优化。 restrict 关键字是 C++11(和 C99)的一部分,但许多编译器为早期标准添加了 C++ 扩展。

一些 R 代码进行基准测试:

require(rbenchmark)

n <- 50000
k <- 50
X <- matrix(rnorm(n*k), nrow=n)
e <- rnorm(n)

R_matvecprod_elwise <- function(mat, vec) mat*vec

all.equal(R_matvecprod_elwise(X, e), C_matvecprod_elwise(X, e))
X_dup <- X + 0
all.equal(R_matvecprod_elwise(X, e), C_matvecprod_elwise_inplace(X_dup, e))

benchmark(R_matvecprod_elwise(X, e),
          C_matvecprod_elwise(X, e),
          C_matvecprod_elwise_inplace(X, e),
          columns = c("test", "replications", "elapsed", "relative"),
          order = "relative", replications = 1000)

结果:

                               test replications elapsed relative
3 C_matvecprod_elwise_inplace(X, e)         1000   3.317    1.000
2         C_matvecprod_elwise(X, e)         1000   7.174    2.163
1         R_matvecprod_elwise(X, e)         1000  10.670    3.217

最后,就地版本实际上可能更快,因为重复乘法到同一个矩阵会导致一些溢出混乱。

编辑:

删除了循环展开,因为它没有提供任何好处并且会分散注意力。

【讨论】:

非常感谢您的回答!这看起来非常令人印象深刻,虽然我不得不承认我还不明白很多事情,但我希望有一天会。我会努力让它尽快运行! 当我将此与其他解决方案一起进行基准测试时,它并没有变得更快。我使用调用 C 函数的包装器的最后一种方法。 您是否在有和没有显式循环展开的情况下对代码进行计时?许多编译器会自己执行展开,显式展开会使代码更难阅读。除此之外,您可能会调查Duff's Device。 就我个人而言,我对面向高级用户的代码中这种可读性较差的解决方案不太感兴趣(尽管我们确实有一些隐藏在 Rcpp Sugar 中)。 我制作了一个包含 4 个算法版本的包:常规、展开、就地和就地 + 展开。一开始,in-place 总是击败 in-place + unrolled,而 unrolled 的速度几乎和 R 一样慢。我花了一段时间才弄清楚,但是颠倒了 in-place 版本的测试顺序是什么做到了。实现一个纯 C 基准测试,其中可以只对函数调用进行计时,始终产生 IP (2.6s)

以上是关于逐元素矩阵乘法:R 与 Rcpp(如何加快此代码的速度?)的主要内容,如果未能解决你的问题,请参考以下文章

OpenCV逐元素矩阵乘法

如何在 numpy 中获得逐元素矩阵乘法(Hadamard 乘积)?

Scipy CSR 矩阵逐元素加法

加速python中的元素数组乘法

TensorFlow 中矩阵和向量的高效逐元素乘法

使用 AVX 的平铺矩阵乘法