有没有办法在这个 R 代码中进行并行处理?

Posted

技术标签:

【中文标题】有没有办法在这个 R 代码中进行并行处理?【英文标题】:Is there a way to do parallelism in this R Code? 【发布时间】:2021-12-06 12:11:38 【问题描述】:

我正在尝试学习 R 中的并行性。我编写了一个代码,其中我有一个从 1 到 250000 的 50*50 矩阵。对于矩阵中的每个元素,我正在寻找其具有最低值的邻居。邻居也可以处于对角线位置。然后我用最低的邻居替换元素本身。在我的计算机上运行此代码所需的时间约为 4.5 秒。如果有可能,任何人都可以帮助我使 for 循环并行吗?这是代码片段

start_time <- Sys.time()


myMatrix <- matrix(1:250000, nrow=500) # a 500 * 500 matrix from 1 to 250000


indexBound <- function(row,col)  # this function is to check if the indexes are out of bound
  if(row<0 || col <0 || row > 500 || col >500)
    return (FALSE)
  
  else
    return (TRUE)
  



for(row in 1:nrow(myMatrix))
  
  for(col in 1:ncol(myMatrix))
    li <- list()
    if(indexBound(row-1,col-1))
      li <- c(li,myMatrix[row-1,col-1])
     
    
    if(indexBound(row-1,col))
      li <- c(li,myMatrix[row-1,col])
     
    
    if(indexBound(row-1,col+1))
      li <- c(li,myMatrix[row-1,col+1])
      
    
    if(indexBound(row,col-1))
      li <- c(li,myMatrix[row,col-1])
    
    if(indexBound(row-1,col+1))
      li <- c(li,myMatrix[row,col+1])
      
    
    if(indexBound(row+1,col-1))
      li <- c(li,myMatrix[row+1,col-1])
      
    
    if(indexBound(row+1,col))
      li <- c(li,myMatrix[row+1,col])
    
    
    if(indexBound(row+1,col+1))
      li <- c(li, myMatrix[row+1,col+1])
     
    
    min = Reduce(min,li) #find the lowest value from the list
    myMatrix[row,col] = min
  

end_time <- Sys.time()

end_time - start_time



感谢您的回复。

【问题讨论】:

如所写,您的代码将矩阵中的所有值更改为 2,因此它与上面文本中的描述不匹配。你能澄清你需要做什么吗?向量化(消除循环)可能会很好地解决这个问题,而不是并行化。 可能this answer 可能是您需要的90%,只需将rbind(...) 更改为pmin(..., na.rm=TRUE) 【参考方案1】:

您的脚本将生成一个所有元素都等于 2 的矩阵。如果不是这样,您应该创建一个 myMatrix 的副本以在构建 li 时使用(在 if 语句内)。

我意识到这可能是探索并行化的人为示例,但对于 R,通常最好首先关注矢量化。当向量化时,此操作可以足够快,以至于并行化实际上可能由于开销而变慢。例如,这是一个使用填充矩阵的矢量化解决方案(这不会给出全 2,并且它仍然不包括 min 计算中的当前单元格):

library(matrixStats)

system.time(
  idxShift <- expand.grid(rep(list(-1:1), 2))[-5,] # don't include the current cell (0, 0)
  myMatrix <- matrix(nrow = 502, ncol = 502)
  myMatrix[2:501, 2:501] <- matrix(1:250000, nrow = 500)
  myMatrix <- matrix(rowMins(mapply(function(i,j) c(myMatrix[2:501 + i, 2:501 + j]), idxShift$Var1, idxShift$Var2), na.rm = TRUE), nrow = 500)
)

   user  system elapsed 
   0.03    0.00    0.03 

使用future.apply 将其与相同矢量化代码的并行版本进行比较:

library(future.apply)
plan(multisession)

system.time(
  idxShift <- expand.grid(rep(list(-1:1), 2))[-5,]
  myMatrix <- matrix(nrow = 502, ncol = 502)
  myMatrix[2:501, 2:501] <- matrix(1:250000, nrow = 500)
  myMatrix <- matrix(rowMins(future_mapply(function(i,j) c(myMatrix[2:501 + i, 2:501 + j]), idxShift$Var1, idxShift$Var2), na.rm = TRUE), nrow = 500)
)

future:::ClusterRegistry("stop")

   user  system elapsed 
   0.10    0.05    2.11 

如果我没有搞砸什么,并行解决方案会更慢,甚至在计时中不包括plan(multisession)

【讨论】:

以上是关于有没有办法在这个 R 代码中进行并行处理?的主要内容,如果未能解决你的问题,请参考以下文章

R 并行共享内存对象(Windows)

R中的并行foreach共享内存

在 AWS 中使用雪(和降雪)在 R 中进行并行处理

如何使用 rowwise 进行并行处理

R doParallel foreach 中的并行处理

R中的并行处理