有啥办法可以打破 foreach 循环?

Posted

技术标签:

【中文标题】有啥办法可以打破 foreach 循环?【英文标题】:Is there any way to break out of a foreach loop?有什么办法可以打破 foreach 循环? 【发布时间】:2013-04-11 09:16:46 【问题描述】:

我正在使用 R 包 foreach()%dopar% 并行进行长时间(~天)计算。如果其中一个产生错误,我希望能够停止整个计算集。但是,我还没有找到实现这一点的方法,并且从文档和各种论坛中我没有发现任何迹象表明这是可能的。特别是,break() 不起作用,stop() 仅停止当前计算,而不是整个 foreach 循环。

请注意,我不能使用简单的 for 循环,因为最终我想使用 doRNG 包将其并行化。

这是我正在尝试的一个简化的、可重现的版本(此处与%do% 串联显示,但在使用doRNG%dopar% 时我遇到了同样的问题)。请注意,实际上我想并行运行此循环的所有元素(此处为 10 个)。

library(foreach)
myfunc <- function() 
  x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% 
    cat("Element ", k, "\n")
    Sys.sleep(0.5) # just to show that stop does not cause exit from foreach
    if(is.element(k, 2:6)) 
      cat("Should stop\n")
      stop("Has stopped")
    
    k
  
  return(x)

x <- myfunc()
# stop() halts the processing of k=2:6, but it does not stop the foreach loop itself.
# x is not returned. The execution produces the error message
# Error in  : task 2 failed - "Has stopped"

我想要实现的是整个 foreach 循环可以在某些条件下立即退出(这里,当遇到 stop() 时)。

我发现没有办法通过foreach 实现这一点。看来我需要一种方法来向所有其他进程发送消息以使它们也停止。

如果不能使用foreach,有没有人知道替代方案?我也尝试使用parallel::mclapply 来实现这一点,但这也不起作用。

> sessionInfo()
R version 3.0.0 (2013-04-03)
Platform: x86_64-apple-darwin10.8.0 (64-bit)

locale:
[1] C/UTF-8/C/C/C/C

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods base

other attached packages:
[1] foreach_1.4.0

loaded via a namespace (and not attached):
[1] codetools_0.2-8 compiler_3.0.0  iterators_1.0.6

【问题讨论】:

不能用for代替吗? 不,因为最终我想使用 doRNG 包并行化它。 (抱歉,我在原始帖子中没有说清楚:我已经对其进行了编辑以使其明确。) 根据您的其他 cmets,您可能希望每个子进程能够在失败时设置一个“标志”对象,并使该对象可供所有子进程读取。它们都必须有一些内部断点或等效的东西来定期检查“标志”的值,这样它们都可以自行终止。 【参考方案1】:

我没有尝试跳出循环,而是在到达终端循环时将一个小文件写入磁盘,然后根据该文件的存在简单地跳过所有剩余的迭代。

检查文件是否存在花费我们不到一毫秒的计算时间。

# 1.4 seconds to check if a file exists a million times
system.time(lapply(1:1e6, function(x) file.exists("checker.txt")))
   user  system elapsed 
  1.204   0.233   1.437 

当您没有固定数量的迭代或您的过程可以在所有迭代完成之前完成(例如收敛)时,这非常有用

library(foreach)

alist <- foreach(i = 1:5000) %dopar%  
  if(file.exists("checker.txt")) 
    return(NULL)
   else 
    if(i = 20) 
      write("", "checker.txt") # write an empty file
    
    return(i)
  


file.remove("checker.txt")

这样做的好处是,即使您的列表非常长,如果您只是 unlist(),您也只能得到值。

> length(alist)
[1] 5000

> unlist(res)
 [1]  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20

不要费心尝试打破,而是“跳过其余部分”!

【讨论】:

太棒了 - 在这个问题上卡了很长时间。我在 try/catch 语句中使用了这个方法。当子进程产生错误时,它会跳过剩余的迭代。并且脚本再次启动 foreach 循环并进行了一些修改(这次没有产生错误)。 这太棒了!谢谢【参考方案2】:

我从 REvolution 技术支持得到的答案是:“不——foreach 目前没有办法停止对任何人的错误的所有并行计算”。

【讨论】:

【参考方案3】:

foreach 做我想做的事我运气不好,所以这里有一个使用parallel 包的解决方案,它似乎可以做我想做的事。我使用mcparallel() 中的intermediate 选项将我的函数do.task() 的结果立即传递给函数check.res()。如果do.task() 抛出错误,则在check.res() 中使用它来触发调用tools::pskill 以显式杀死所有工作人员。这可能不是很优雅,但它的工作原理是它会立即停止所有工作。此外,我可以简单地从当前环境继承do.task() 中处理所需的所有变量。 (实际上do.task() 是一个更复杂的函数,需要传入许多变量。)

library(parallel)

# do.task() and check.res() inherit some variables from enclosing environment

do.task <- function(x) 
  cat("Starting task", x, "\n")
  Sys.sleep(5*x)
  if(x==stopat)  
    stop("Error in job", x) # thrown to mccollect() which sends it to check.res()
  
  cat("  Completed task", x, "\n")
  return(10*x)


check.res <- function(r)  # r is list of results so far
  cat("Called check.res\n")
  sendKill <- FALSE
  for(j in 1:Njob)  # check whether need to kill
    if(inherits(r[[j]], 'try-error')) 
      sendKill <- TRUE
    
  
  if(sendKill)  # then kill all
    for(j in 1:Njob) 
      cat("Killing job", job[[j]]$pid, "\n") 
      tools::pskill(job[[j]]$pid) # mckill not accessible
    
  


Tstart <- Sys.time()
stopat <- 3
Njob <- 4
job <- vector("list", length=Njob)
for(j in 1:Njob) 
  job[[j]]<- mcparallel(do.task(j))

res <- mccollect(job, intermediate=check.res) # res is in order 1:Njob, regardless of how long jobs took
cat("Collected\n")
Tstop <- Sys.time()
print(difftime(Tstop,Tstart))
for(j in 1:Njob) 
  if(inherits(res[[j]], 'try-error')) 
    stop("Parallel part encountered an error")
  

这给出了变量res的以下屏幕转储和结果

> source("exp5.R")
Starting task 1 
Starting task 2 
Starting task 3 
Starting task 4 
  Completed task 1 
Called check.res
Called check.res
  Completed task 2 
Called check.res
Called check.res
Called check.res
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Called check.res
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Called check.res
Killing job 21423 
Killing job 21424 
Killing job 21425 
Killing job 21426 
Collected
Time difference of 15.03558 secs
Error in eval(expr, envir, enclos) : Parallel part encountered an error
> res
$`21423`
[1] 10

$`21424`
[1] 20

$`21425`
[1] "Error in do.task(j) : Error in job3\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<simpleError in do.task(j): Error in job3>

$`21426`
NULL

【讨论】:

【参考方案4】:

听起来您想要一个 不耐烦 版本的“停止”错误处理。您可以通过编写自定义组合函数来实现它,并安排foreach 在返回每个结果后立即调用它。为此,您需要:

使用支持即时调用combine 的后端,例如doMPIdoRedis 不要启用.multicombine.inorder 设置为FALSE.init 设置为某事(如NULL

这是一个例子:

library(foreach)
parfun <- function(errval, n) 
  abortable <- function(errfun) 
    comb <- function(x, y) 
      if (inherits(y, 'error')) 
        warning('This will leave your parallel backend in an inconsistent state')
        errfun(y)
      
      c(x, y)
    
    foreach(i=seq_len(n), .errorhandling='pass', .export='errval',
            .combine='comb', .inorder=FALSE, .init=NULL) %dopar% 
      if (i == errval)
        stop('testing abort')
      Sys.sleep(10)
      i
    
  
  callCC(abortable)

请注意,我还将错误处理设置为“通过”,因此foreach 将使用错误对象调用组合函数。 callCC 函数用于从foreach 循环返回,而不管foreach 和后端中使用的错误处理。在这种情况下,callCC 将调用abortable 函数,传递给它一个函数对象,该对象用于强制callCC 立即返回。通过从 combine 函数调用该函数,我们可以在检测到错误对象时从 foreach 循环中逃脱,并让 callCC 返回该对象。请参阅?callCC 了解更多信息。

您实际上可以在没有注册并行后端的情况下使用parfun,并验证foreach 循环在执行引发错误的任务时是否“中断”,但这可能需要一段时间,因为任务是按顺序执行的。例如,如果没有注册后端,则执行需要 20 秒:

print(system.time(parfun(3, 4)))

当并行执行parfun 时,我们需要做的不仅仅是跳出foreach 循环:我们还需要停止工作人员,否则他们将继续计算分配的任务。使用doMPI,可以使用mpi.abort 停止工作人员:

library(doMPI)
cl <- startMPIcluster()
registerDoMPI(cl)
r <- parfun(getDoParWorkers(), getDoParWorkers())
if (inherits(r, 'error')) 
  cat(sprintf('Caught error: %s\n', conditionMessage(r)))
  mpi.abort(cl$comm)

请注意,在循环中止后不能使用集群对象,因为没有正确清理,这就是正常的“停止”错误处理不能以这种方式工作的原因。

【讨论】:

+1 表示评论,以及对我帮助很大的书 :) 如果组合函数comb() 中的stop() 不再是导致提前退出的原因,我有点困惑。我认为 foreach 中的 stop() 触发了 comb() 的调用。那么是 errfun() 导致提前退出吗?但是 errfun() 是什么?它没有明确定义(并且名称是任意的)。此外,当我在 4 个内核上使用 %dopar% 和 doMPI 运行 parfun(6,12) 时,i=5,7,8,9 继续执行(在下面的答案中使用 sink() 方法验证),所以我'不确定并行运行时是否真的提前停止。 foreach 循环中的stop 只会导致错误对象作为任务结果返回给主服务器。由于错误处理是“通过”,foreach 将其传递给 combine 函数,并且由于指定的选项而立即执行此操作。如果 combine 函数调用 errfun,则 combine 函数不会返回到其调用者,而是返回到 callCC。但正如我在修订后的答案中所说,这对工人没有影响,这就是需要 mpi.abort 的原因。 我相信您从工作人员那里看到的额外输出是因为他们继续正常工作,直到调用 mpi.abort 实际上,附加输出是在调用 mpi.abort 之后产生的(在打印 Caught error' 后几秒钟),我在“top”中看到进程仍在运行。因此,尽管我们打破了 foreach 循环(从而阻止了后来的工作人员启动),但现有的工作人员并没有被停止。因此,这种方法似乎不允许我立即停止所有工作人员的错误。我尝试将cl 传递到foreach 循环并在那里使用mpi.abort(cl$comm),但是(不足为奇)这不起作用。【参考方案5】:

史蒂夫韦斯顿的原始答案基本上回答了这个问题。但这是他的答案的一个稍微修改的版本,它还保留了我需要的两个附加功能:(1)随机数生成; (2) 打印运行时诊断。

suppressMessages(library(doMPI))

comb <- function(x, y) 
  if(inherits(y, 'error')) 
    stop(y)
  
  rbind(x, y) # forces the row names to be 'y'


myfunc <- function() 
  writeLines(text="foreach log", con="log.txt")
  foreach(i=1:12, .errorhandling='pass', .combine='comb', .inorder=FALSE, .init=NULL) %dopar% 
    set.seed(100)
    sink("log.txt", append=TRUE)
    if(i==6) 
      stop('testing abort')
    
    Sys.sleep(10)
    cat("Completed task", i, "\n")
    sink(NULL)
    rnorm(5,mean=i)
  


myerr <- function(e) 
  cat(sprintf('Caught error: %s\n', conditionMessage(e)))
  mpi.abort(cl$comm)


cl <- startMPIcluster(4)
registerDoMPI(cl)
r <- tryCatch(myfunc(), error=myerr)
closeCluster(cl)

获取此文件后,它会按预期退出并显示错误消息

> source("exp2.R")
    4 slaves are spawned successfully. 0 failed.
Caught error: testing abort
[ganges.local:16325] MPI_ABORT invoked on rank 0 in communicator  with errorcode 0

“log.txt”文件提供了直到错误点的正确诊断,然后提供了额外的错误信息。至关重要的是,一旦遇到 foreach 循环中的 stop(),所有任务的执行就会停止:它不会等到整个 foreach 循环完成。因此,我最多只能看到 i=4 的“已完成任务”消息。 (请注意,如果 Sys.sleep() 较短,那么后面的任务可能会在处理 mpi.abort() 之前启动。)

如果我将停止条件更改为“i==100”,则不会触发停止,因此不会触发错误。代码成功存在,没有报错,r是12*5维度的二维数组。

顺便说一句,我似乎实际上并不需要 .inorder=FALSE(我认为这只会在发现错误的情况下给我带来一点速度提升)。

【讨论】:

我更改了答案,因为我发现它利用了 doMPI 中不正确的错误处理。在 combine 函数中执行 stop 不应中止 foreach,这在 R-forge 上的 doMPI 开发版本中已修复,因此发布时您的答案将不起作用。 如果不设置.inorder=FALSE,则在之前的所有任务都被combine函数处理完之后才会调用combine函数。因此,如果失败的任务不是第一个任务,则在您的示例中中止至少需要 10 秒。【参考方案6】:

这不是对您问题的直接回答,但使用when() 可以避免在满足条件时进入循环:

x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %:%
  when( !is.element(k, 2:6) ) %do%
  
    cat("Element ", k, "\n")
    Sys.sleep(0.5)
    k
  

编辑:

我忘记了一些事情:我认为这是设计使然,您不能仅仅停止 foreach 循环。如果您并行运行循环,则每个回合都是独立处理的,这意味着当您停止 k=2 的整个循环时,如果 k=1 的进程已经终止或仍在运行,则无法预测。因此,使用when() 条件可以得到确定性结果。

编辑 2:考虑您的评论的另一个解决方案。

shouldStop <- FALSE
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do%
  
    if( !shouldStop )
      # put your time consuming code here
      cat("Element ", k, "\n")
      Sys.sleep(0.5)
      shouldStop <- shouldStop ||  is.element(k, 2:6)
      k
    
  

使用此解决方案,在停止条件为真时正在运行的进程仍会计算到结束,但您可以避免所有即将到来的进程的时间消耗。

【讨论】:

问题是,只有在循环中完成一些计算后,我才会知道是否要退出循环。然而正是这些计算,我想与这个循环并行化。 (换句话说,条件本身是耗时的计算。) EDIT 2 是一个有用的建议,但我运行它的方式是要处理的事物的数量等于可用的 CPU 内核数(10-50)。因此,所有进程都是同时启动的,并且没有将来需要避免启动的进程。就像现在一样,我必须等待所有这些都完成,然后才能从 stop() 收到错误消息。一个解决方法是让我在看到 cat() 产生的消息后立即手动终止整个程序(在我的帖子中),但这是不切实际的,因为这是一个长期的运行(约 1 天)并且在远程机器上的背景。 这个信息改变了整个事情,应该在原帖中提到。但是,我必须承认,在这种情况下,我的想法是有限的。您可以尝试直接使用 snow 包中的 clusterApply 来控制您的节点,并在第一个具有所需结果的作业完成时调用 stopCluster()。但请注意,从从属进程调用stopCluster() 不仅会产生丑陋的错误。此外,结果不会返回给主服务器。也许其他人知道如何传递结果?

以上是关于有啥办法可以打破 foreach 循环?的主要内容,如果未能解决你的问题,请参考以下文章

如何打破嵌套的foreach循环然后转到c#上的父foreach循环

如何用 PHP 打破外循环?

如何在 TypeScript 中打破 ForEach 循环

详细讲解foreach循环的用法

如何在 laravel 刀片视图中打破 foreach 循环?

打破 foreach C#