如何初始化worker以并行使用包函数
Posted
技术标签:
【中文标题】如何初始化worker以并行使用包函数【英文标题】:How to initialize workers to use package functions in parallel 【发布时间】:2020-12-11 01:21:30 【问题描述】:我正在开发一个 R 包并尝试在其中使用并行处理来解决令人尴尬的并行问题。我想编写一个循环或函数来使用我包中的其他函数。我在 Windows 中工作,我尝试使用 parallel::parLapply
和 foreach::%dopar%
,但无法让工作人员(核心)访问我的包中的功能。
这是一个包含两个函数的简单包的示例,其中第二个函数使用%dopar%
在并行循环中调用第一个函数:
add10 <- function(x) x + 10
slowadd <- function(m)
cl <- parallel::makeCluster(parallel::detectCores() - 1)
doParallel::registerDoParallel(cl)
`%dopar%` <- foreach::`%dopar%` # so %dopar% doesn't need to be attached
foreach::foreach(i = 1:m) %dopar%
Sys.sleep(1)
add10(i)
stopCluster(cl)
当我使用devtools::load_all()
加载包并调用slowadd
函数时,返回Error in : task 1 failed - "could not find function "add10""
。
我还尝试使用我的包显式初始化工作人员:
add10 <- function(x) x + 10
slowadd <- function(m)
cl <- parallel::makeCluster(parallel::detectCores() - 1)
doParallel::registerDoParallel(cl)
`%dopar%` <- foreach::`%dopar%` # so %dopar% doesn't need to be attached
foreach::foreach(i = 1:m, .packages = 'mypackage') %dopar%
Sys.sleep(1)
add10(i)
stopCluster(cl)
但我收到错误 Error in e$fun(obj, substitute(ex), parent.frame(), e$data) : worker initialization failed: there is no package called 'mypackage'
。
如何让工作人员访问我的包中的功能?使用foreach
的解决方案会很棒,但我完全愿意接受使用parLapply
或其他功能/包的解决方案。
【问题讨论】:
我只熟悉并行而不是 dopar 的东西,但是对于并行,您需要向每个节点添加库调用/函数/数据,例如 parallel::clusterEvalQ、clusterExport 等。有示例在帮助页面中 ?clusterExport @user20650 我试过使用parallel 和clusterExport,但同样的问题是找不到我的包。我不确定它是否在错误的环境中寻找(我已经尝试明确定义环境),或者它是否与包开发的工作方式有关。我可以成功地使用其他包,例如,parallel::clusterEvalQ(cl, library(dplyr))。 id 是这样设置的:chat.***.com/rooms/220232/neal 你的包安装正确了吗? 你在使用 RStudio 吗?安装并重启就足够了。 【参考方案1】:感谢人们提供帮助的 cmets,我能够使用我的包的功能初始化工作人员。通过确保在 NAMESPACE 中导出了所有需要的包函数并使用devtools::install()
安装我的包,foreach
能够找到用于初始化的包。该示例的 R 脚本如下所示:
#' @export
add10 <- function(x) x + 10
#' @export
slowadd <- function(m)
cl <- parallel::makeCluster(parallel::detectCores() - 1)
doParallel::registerDoParallel(cl)
`%dopar%` <- foreach::`%dopar%` # so %dopar% doesn't need to be attached
out <- foreach::foreach(i = 1:m, .packages = 'mypackage') %dopar%
Sys.sleep(1)
add10(i)
stopCluster(cl)
return(out)
这是可行的,但它不是一个理想的解决方案。首先,它使工作流程慢得多。每次我对包进行更改并想对其进行测试(在合并并行性之前)时,我都使用devtools::load_all()
,但是现在我每次都必须重新安装包,当包很大时这很慢。其次,并行循环中需要的每个函数都需要导出,以便foreach
可以找到它。我的实际用例有很多小的实用功能,我宁愿保留在内部。
【讨论】:
以上是关于如何初始化worker以并行使用包函数的主要内容,如果未能解决你的问题,请参考以下文章
Pysyft学习笔记四:MINIST数据集下的联邦学习(并行训练与非并行训练)
Pysyft学习笔记四:MINIST数据集下的联邦学习(并行训练与非并行训练)