如何在 R 中将“foreach”和“%dopar%”与“R6”类一起使用?

Posted

技术标签:

【中文标题】如何在 R 中将“foreach”和“%dopar%”与“R6”类一起使用?【英文标题】:How to use `foreach` and `%dopar%` with an `R6` class in R? 【发布时间】:2019-12-12 11:13:14 【问题描述】:

我在尝试将%dopar%foreach()R6 类一起使用时遇到了问题。环顾四周,我只能找到两个与此相关的资源,一个未答复的 SO question 和一个在 R6 存储库上打开的 GitHub issue。

在一条评论(即 GitHub 问题)中,建议通过将类的 parent_env 重新分配为 SomeClass$parent_env <- environment() 来解决此问题。我想了解在foreach%dopar% 中调用此表达式(即SomeClass$parent_env <- environment())时,environment() 到底指的是什么?

这是一个最小的可重现示例:

Work <- R6::R6Class("Work",

    public = list(
        values = NULL,


        initialize = function() 
            self$values <- "some values"
        
    )
)

现在,以下Task 类在构造函数中使用Work 类。

Task <- R6::R6Class("Task",
    private = list(
        ..work = NULL
    ),


    public = list(
        initialize = function(time) 
            private$..work <- Work$new()
            Sys.sleep(time)
        
    ),


    active = list(
        work = function() 
            return(private$..work)
        
    )
)

Factory类中,Task类被创建,foreach..m.thread()中实现。

Factory<- R6::R6Class("Factory",

    private = list(
        ..warehouse = list(),
        ..amount = NULL,
        ..parallel = NULL,


        ..m.thread = function(object, ...) 
            cluster <- parallel::makeCluster(parallel::detectCores() -  1)
            doParallel::registerDoParallel(cluster)

            private$..warehouse <- foreach::foreach(1:private$..amount, .export = c("Work")) %dopar% 
                # What exactly does `environment()` encapsulate in this context?
                object$parent_env <- environment()
                object$new(...) 
            

            parallel::stopCluster(cluster)
        ,


        ..s.thread = function(object, ...) 
            for (i in 1:private$..amount) 
               private$..warehouse[[i]] <- object$new(...)
            
        ,


        ..run = function(object, ...) 
            if(private$..parallel) 
                private$..m.thread(object, ...)
             else 
                private$..s.thread(object, ...)
            
        
    ),


    public = list(
        initialize = function(object, ..., amount = 10, parallel = FALSE) 
            private$..amount = amount
            private$..parallel = parallel

            private$..run(object, ...)
        
    ),


    active = list(
        warehouse = function() 
            return(private$..warehouse)
        
    )
)

然后,它被称为:

library(foreach)

x = Factory$new(Task, time = 2, amount = 10, parallel = TRUE)

如果没有以下行 object$parent_env &lt;- environment(),则会引发错误(即,如其他两个链接中所述):Error in : task 1 failed - "object 'Work' not found"

我想知道,(1) 在 foreach 中分配 parent_env 时有哪些潜在的陷阱,以及 (2) 为什么它首先会起作用?


更新 1:

我从 foreach() 中返回了 environment(),以便 private$..warehouse 捕获这些环境 在调试会话中使用 rlang::env_print()(即,browser() 语句是在 foreach 结束执行之后立即放置的)它们的组成如下:
Browse[1]> env_print(private$..warehouse[[1]])

# <environment: 000000001A8332F0>
# parent: <environment: global>
# bindings:
#  * Work: <S3: R6ClassGenerator>
#  * ...: <...>

Browse[1]> env_print(environment())

# <environment: 000000001AC0F890>
# parent: <environment: 000000001AC20AF0>
# bindings:
#  * private: <env>
#  * cluster: <S3: SOCKcluster>
#  * ...: <...>

Browse[1]> env_print(parent.env(environment()))

# <environment: 000000001AC20AF0>
# parent: <environment: global>
# bindings:
#  * private: <env>
#  * self: <S3: Factory>

Browse[1]> env_print(parent.env(parent.env(environment())))

# <environment: global>
# parent: <environment: package:rlang>
# bindings:
#  * Work: <S3: R6ClassGenerator>
#  * .Random.seed: <int>
#  * Factory: <S3: R6ClassGenerator>
#  * Task: <S3: R6ClassGenerator>

【问题讨论】:

我运气不好,试图让具有环境的对象可以跨 parallel 集群的节点使用。 R6 对象是inherently environments,通常用于完成pass-by-reference 语义(而不是R 的默认pass-by-value)。为了做到这一点,environment 被就地修改。不幸的是,这个 env 不是在集群节点之间共享的,所以即使一个environment 可以转移到其他节点,也经常会丢失对象的前提。 (我不知道env可以转移,顺便说一句。) 阅读那个 github 问题,我完全有可能遗漏了一些东西...... 在阅读 GitHub 问题之前我也是这么想的!现在我认为这是可能的,至少object$parent_env &lt;- environment() 使它成为可能。不过,我不明白背后的原因...... 推理一下,我预计如果我在 .export 中包含 self 这也会起作用,但它没有:object$parent_env &lt;- parent.env(self$.__enclos_env__) 能否让您的示例更简洁? 【参考方案1】:

免责声明:我在这里所说的很多内容都是基于我所知道的有根据的猜测和推论, 我不能保证一切都是 100% 正确的。

我认为可能有很多陷阱, 哪一个适用真的取决于你做什么。 我认为你的第二个问题更重要, 因为如果你明白这一点, 您将能够自己评估一些陷阱。

题目比较复杂, 但您可能可以从阅读R's lexical scoping 开始。 本质上,R 有一种环境层次结构, 当R代码被执行时, 在当前环境中找不到其值的变量 (这是environment() 返回的内容) 在 parent 环境中寻找 (不要与调用者环境混淆)。

根据您链接的 GitHub 问题, R6 生成器保存对其父环境的“引用”, 他们希望他们的类可能需要的一切都可以在所述父级或环境层次结构中的某个地方找到, 从该父级开始并“向上”。

您使用的解决方法有效的原因是您将生成器的父环境替换为并行工作器中当前 foreach 调用中的父环境 (可能是不同的 R 进程,不一定是不同的线程), 并且,鉴于您的 .export 规范可能会导出必要的值, 然后,R 的词法作用域可以从单独的线程/进程中的 foreach 调用开始搜索缺失值。

对于您链接的具体示例, 我发现了一种更简单的方法来使它工作 (至少在我的 Linux 机器上) 是做到以下几点:

library(doParallel)

cluster <- parallel::makeCluster(parallel::detectCores() -  1)
doParallel::registerDoParallel(cluster)
parallel::clusterExport(cluster, setdiff(ls(), "cluster"))

x = Factory$new(Task, time = 1, amount = 3)

但是..m.thread 函数保留为:

..m.thread = function(object, amount, ...) 
    private$..warehouse <- foreach::foreach(1:amount) %dopar% 
        object$new(...) 
    

(完成后手动调用stopCluster)。

clusterExport 调用的语义应该类似于*: 从主 R 进程的全局环境中获取除cluster 之外的所有内容, 并使其在每个并行工作者的全局环境中可用。 这样,当词法作用域到达它们各自的全局环境时,foreach 调用中的任何代码都可以使用生成器。 foreach 可以很聪明,自动导出一些变量 (如 GitHub 问题所示), 但它有局限性, 并且在词法作用域期间使用的层次结构会变得非常混乱。

*我说“类似于”是因为如果使用分叉,我不知道 R 究竟做了什么来区分(全局)环境, 但既然需要出口, 我认为它们确实是相互独立的。

PS:如果您在函数调用中创建工作线程,我会调用 on.exit(parallel::stopCluster(cluster)), 这样,您就可以避免留下进程,直到它们在发生错误时以某种方式停止。

【讨论】:

嗨@Alexis,感谢您提供如此详尽的答案和有关词法范围的资源。它确实帮助我更好地了解正在发生的事情以及如何利用搜索路径来发挥我的优势。附言on.exit 为集群相关的连接保存了很多警告...我会继续并将其标记为已接受的答案。 @Mihai 没问题。另一个建议:如果您可以将所有生成器放在您自己的 R 包中,然后您可以告诉foreach 将该包加载到每个工人中,这可能会为您节省一些问题。您必须停止将生成器直接传递给它,但您可以传递生成器名称并在 foreach 调用中使用 object &lt;- get(... 或类似名称。 这似乎是一个非常优雅的解决方案!事实上,所有的生成器都生活在同一个包命名空间中,工厂的唯一目的是从它们创建对象。谢谢,我从你那里学到了一些非常有价值的东西!

以上是关于如何在 R 中将“foreach”和“%dopar%”与“R6”类一起使用?的主要内容,如果未能解决你的问题,请参考以下文章

R:如何在 foreach %dopar% 中拆分数据帧

为啥将 %dopar% 与 foreach 一起使用导致 R 无法识别包?

R在HPC MPIcluster上运行foreach dopar循环

如何以编程方式在 foreach 中的 %do% 和 %dopar% 之间切换?

使用 foreach 进行并行处理时出错:“找不到函数“%dopar%””

使用 %dopar% 时如何打印