R中的并行计算:如何使用内核

Posted

技术标签:

【中文标题】R中的并行计算:如何使用内核【英文标题】:Parallel Computing in R : how to use the cores 【发布时间】:2016-02-10 03:52:21 【问题描述】:

我目前正在 R 中尝试并行计算。 我正在尝试训练一个逻辑脊模型,我的计算机上目前有 4 个核心。我想将我的数据集平均分成 4 部分,并使用每个核心来训练模型(在训练数据上)并将每个核心的结果保存到单个向量中。问题是我不知道该怎么做,现在我尝试与 foreach 包并行,但问题是每个核心看到相同的训练数据。这是带有 foreach 包的代码(不拆分数据):

library(ridge)
library(parallel)
library(foreach)

num_of_cores <- detectCores()
mydata <- read.csv("http://www.ats.ucla.edu/stat/data/binary.csv")
data_per_core <- floor(nrow(mydata)/num_of_cores)
result <- data.frame()

r <- foreach(icount(4), .combine = cbind) %dopar% 
      result <- logisticRidge(admit~ gre + gpa + rank,data = mydata)
      coefficients(result)

知道如何同时将数据拆分为 x 个块并并行训练模型吗?

【问题讨论】:

你绑定到parallelforeach吗?或者你对snowfall-solution 满意吗? 【参考方案1】:

这样的事情怎么样?它使用snowfall 而不是foreach-library,但应该给出相同的结果。

library(snowfall)
library(ridge)

# for reproducability
set.seed(123)
num_of_cores <- parallel::detectCores()
mydata <- read.csv("http://www.ats.ucla.edu/stat/data/binary.csv")
data_per_core <- floor(nrow(mydata)/num_of_cores)

# we take random rows to each cluster, by sampleid
mydata$sampleid <- sample(1:num_of_cores, nrow(mydata), replace = T)

# create a small function that calculates the coefficients
regfun <- function(dat) 
  library(ridge) # this has to be in the function, otherwise snowfall doesnt know the logistic ridge function
  result <- logisticRidge(admit~ gre + gpa + rank, data = dat)
  coefs <- as.numeric(coefficients(result))
  return(coefs)


# prepare the data
datlist <- lapply(1:num_of_cores, function(i)
  dat <- mydata[mydata$sampleid == i, ]
)

# initiate the clusters
sfInit(parallel = T, cpus = num_of_cores)

# export the function and the data to the cluster
sfExport("regfun")

# calculate, (sfClusterApply is very similar to sapply)
res <- sfClusterApply(datlist, function(datlist.element) 
  regfun(dat = datlist.element)
)

#stop the cluster
sfStop()

# convert the list to a data.frame. data.table::rbindlist(list(res)) does the same job
res <- data.frame(t(matrix(unlist(res), ncol = num_of_cores)))
names(res) <- c("intercept", "gre", "gpa", "rank")
res
# res
# intercept          gre
# 1 -3.002592 1.558363e-03
# 2 -4.142939 1.060692e-03
# 3 -2.967130 2.315487e-03
# 4 -1.176943 4.786894e-05
# gpa         rank
# 1  0.7048146997 -0.382462408
# 2  0.9978841880 -0.314589628
# 3  0.6797382218 -0.464219036
# 4 -0.0004576679 -0.007618317

【讨论】:

感谢您的回答!但是我试图模拟数据位于不同机器上的情况,因此我想问是否可以只将部分数据导出到每个集群而不是整个数据集?另一个问题是,是否有人知道如何使用并行包实现解决方案(我不受 foreach 约束,但不能使用降雪)? 我编辑了答案,现在不要将整个数据集发送给每个从站,而只发送子集。尽管如此,它还是一个降雪解决方案。请问为什么不能使用降雪功能? 现在运行您的示例时出现错误,因为尽管您没有向工作人员发送mydata,但您仍然在工作人员函数中引用mydata 你说得对,我一定是忘记检查功能了,现在已经更正了!给您带来的不便深表歉意! 首先,非常感谢大卫的帮助!我误以为降雪不能在 OSX 上运行,所以我实际上可以使用你的代码。再次感谢您的帮助。【参考方案2】:

itertools 包提供了许多函数,用于使用 foreach 循环遍历各种数据结构。在这种情况下,您可以使用isplitRows 函数将数据帧逐行拆分为每个工作人员的一个

library(ridge)
library(doParallel)
library(itertools)

num_of_cores <- detectCores()
cl <- makePSOCKcluster(num_of_cores)
registerDoParallel(cl)
mydata <- read.csv("http://www.ats.ucla.edu/stat/data/binary.csv")

r <- foreach(d=isplitRows(mydata, chunks=num_of_cores),
             .combine = cbind, .packages="ridge") %dopar% 
  result <- logisticRidge(admit~ gre + gpa + rank, data = d)
  coefficients(result)

如果你想控制每个块的最大大小,isplitRows 也需要一个chunkSize 参数。

请注意,使用这种技术,每个工人只能收到mydata 的适当部分。这对于具有PSOCK 集群的较大数据帧尤其重要。

【讨论】:

史蒂夫,也非常感谢您,您的代码让一切变得简单! 谢谢!这有帮助! foreach 嵌套小插图 (cran.r-project.org/web/packages/foreach/vignettes/nested.pdf) 仍然提到 doNWS 作为进行分块的唯一方法,但该包似乎不再存在。也许您可以在下一个 foreach 版本中修改小插图?

以上是关于R中的并行计算:如何使用内核的主要内容,如果未能解决你的问题,请参考以下文章

如何在R中的并行任务中删除临时文件

matlab中如何使用多GPU并行计算?

R doParallel foreach 中的并行处理

doMPI 和节点、处理器和内核

如何使用 sfInit 和 makeCluster 类型“MPI”/R 中的消息传递/集群上的并行化

并行处理中的最佳内核数是多少?