R中的并行foreach共享内存

Posted

技术标签:

【中文标题】R中的并行foreach共享内存【英文标题】:Shared memory in parallel foreach in R 【发布时间】:2015-10-13 01:31:31 【问题描述】:

问题描述:

我有一个大矩阵c,加载到 RAM 内存中。我的目标是通过并行处理对其进行只读访问。但是,当我创建连接时,我使用 doSNOWdoMPIbig.matrix 等,使用的 ram 数量急剧增加。

有没有办法在不创建所有数据的本地副本的情况下正确创建所有进程都可以读取的共享内存?

例子:

libs<-function(libraries)# Installs missing libraries and then load them
  for (lib in libraries)
    if( !is.element(lib, .packages(all.available = TRUE)) ) 
      install.packages(lib)
    
    library(lib,character.only = TRUE)
  


libra<-list("foreach","parallel","doSNOW","bigmemory")
libs(libra)

#create a matrix of size 1GB aproximatelly
c<-matrix(runif(10000^2),10000,10000)
#convert it to bigmatrix
x<-as.big.matrix(c)
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections    
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
out<-foreach(linID = 1:10, .combine=c) %dopar% 
  #load bigmemory
  require(bigmemory)
  # attach the matrix via shared memory??
  m <- attach.big.matrix(mdesc)
  #dummy expression to test data aquisition
  c<-m[1,1]

closeAllConnections()

内存: 在上图中,您可能会发现内存增加了很多,直到foreach 结束并被释放。

【问题讨论】:

我现在遇到了完全相同的问题,我对解决方案非常感兴趣。我还观察到复制而不是共享内存。 【参考方案1】:

我认为问题的解决方案可以从foreach 包的作者here 的作者Steve Weston 的帖子中看出。他在那里说:

doParallel 包会自动将变量导出到 foreach 循环中引用的工作线程。

所以我认为问题是在你的代码中你的大矩阵c 在赋值c&lt;-m[1,1] 中被引用。试试xyz &lt;- m[1,1] 看看会发生什么。

这是一个带有文件支持 big.matrix 的示例:

#create a matrix of size 1GB aproximatelly
n <- 10000
m <- 10000
c <- matrix(runif(n*m),n,m)
#convert it to bigmatrix
x <- as.big.matrix(x = c, type = "double", 
                 separated = FALSE, 
                 backingfile = "example.bin", 
                 descriptorfile = "example.desc")
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections    
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
## 1) No referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% 
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) 
    for (j in seq_len(m)) 
      y <- t[i,j]
    
  
  return(0L)

## 2) Referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% 
  invisible(c) ## c is referenced and thus exported to workers
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) 
    for (j in seq_len(m)) 
      y <- t[i,j]
    
  
  return(0L)

closeAllConnections()

【讨论】:

我看不到 c&lt;-m[1,1] 实际上加载了 c,因为我预计这会生成一个新变量而不是 ,很好阅读它。这意味着实际上内存是共享的,由于c,我一直在浪费时间探索不同的选项。十分感谢你的帮助! PS:我不认为下面不可见的代码曾经被执行过。 @Stanislav 我同意这有点出乎意料的行为。如果我的回答解决了你的问题,如果你能考虑接受,我会很高兴。 @Stanislav 这个答案是正确的,你需要确定你实际出口给工人的是什么。除非您实际修改同一个对象,否则循环内部和外部的变量名称通常不一样是一种好习惯。【参考方案2】:

或者,如果您在 Linux/Mac 上并且想要 CoW 共享内存,请使用分叉。首先将所有数据加载到主线程中,然后从parallel 包中使用通用函数mcparallel 启动工作线程(分叉)。

您可以使用mccollect 或使用真正的共享内存使用Rdsm 库收集他们的结果,如下所示:

library(parallel)
library(bigmemory) #for shared variables
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = 'double')
shared[1]<-1 #Init shared memory with some number

job<-mcparallel(shared[1]<-23) #...change it in another forked thread
shared[1,1] #...and confirm that it gets changed
# [1] 23

您可以确认,如果您延迟写入,该值确实会在背景中更新:

fn<-function()

  Sys.sleep(1) #One second delay
  shared[1]<-11


job<-mcparallel(fn())
shared[1] #Execute immediately after last command
# [1] 23
aaa[1,1] #Execute after one second
# [1] 11
mccollect() #To destroy all forked processes (and possibly collect their output)

要控制并发性并避免竞争条件,请使用锁:

library(synchronicity) #for locks
m<-boost.mutex() #Lets create a mutex "m"

bad.incr<-function() #This function doesn't protect the shared resource with locks:

  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1


good.incr<-function()

  lock(m)
  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1
  unlock(m)


shared[1]<-1
for (i in 1:5) job<-mcparallel(bad.incr())
shared[1] #You can verify, that the value didn't get increased 5 times due to race conditions

mccollect() #To clear all threads, not to get the values
shared[1]<-1
for (i in 1:5) job<-mcparallel(good.incr())
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6
#[1] 6 

mccollect()

编辑:

我通过将Rdsm::mgrmakevar 交换为bigmemory::big.matrix 来稍微简化依赖关系。 mgrmakevar 内部调用 big.matrix 无论如何,我们不再需要任何东西。

【讨论】:

以上是关于R中的并行foreach共享内存的主要内容,如果未能解决你的问题,请参考以下文章

R语言并行计算中的内存控制

与 foreach 并行预测 nnet 输出时 R 内存爆炸

CUDA 并行扫描算法共享内存竞争条件

使用 numpy 数组和共享内存并行化 python 循环

多处理中的共享内存对象

我可以将 MPI 与共享内存一起使用吗