对于 R 中的大迭代,foreach 循环变得不活动
Posted
技术标签:
【中文标题】对于 R 中的大迭代,foreach 循环变得不活动【英文标题】:foreach loop becomes inactive for large iterations in R 【发布时间】:2020-11-18 06:29:48 【问题描述】:我有一个 4500 行的输入 csv 文件。每行都有一个唯一的 ID,对于每一行,我必须读取一些数据,进行一些计算,然后将输出写入 csv 文件,这样我的输出目录中就有 4500 个 csv 文件。单个输出 csv 文件包含 8 列的单行数据
由于我必须对输入 csv 的每一行执行相同的计算,我想我可以使用 foreach
并行化这个任务。以下是整体逻辑结构
library(doSNOW)
library(foreach)
library(data.table)
input_csv <- fread('inputFile.csv'))
# to track the progres of the loop
iterations <- nrow(input_csv)
pb <- txtProgressBar(max = iterations, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress = progress)
myClusters <- makeCluster(6)
registerDoSNOW(myClusters)
results <-
foreach(i = 1:nrow(input_csv),
.packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
.errorhandling = 'remove',
.options.snow = opts) %dopar%
rowRef <- input_csv[i, ]
# read data for the unique location in `rowRef`
weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations
# save the results as csv
fwrite(temp_result, file.path(paste0('output_iter_',i,'.csv')))
return(temp_result)
上述代码运行良好,但在完成input_csv
中 25% 或 30% 的行后总是卡住/不活动/不执行任何操作。我一直在查看我的输出目录,在 N% 的迭代之后,没有文件被写入。我怀疑 foreach 循环是否进入某种睡眠模式?我发现更令人困惑的是,如果我终止工作,重新运行上述代码,它确实会显示 16% 或 30%,然后再次进入非活动状态,即每次重新运行时,它都会以不同的进度级别“休眠”。
在这种情况下,我不知道如何给出一个最小的可重现示例,但我想如果有人知道我应该检查的任何清单或导致这种情况的潜在问题,那将非常有帮助。谢谢
编辑我仍在努力解决这个问题。如果我可以提供更多信息,请告诉我。
EDIT2
我原来的 inputFile
包含 213164 行。所以我拆分了我的大文件
分成 46 个小文件,每个文件有 4634 行
library(foreach)
library(data.table)
library(doParallel)
myLs <- split(mydat, (as.numeric(rownames(mydat))-1) %/% 46))
然后我这样做了:
for(pr in 1:46)
input_csv <- myLs[[pr]]
myClusters <- parallel::makeCluster(6)
doParallel::registerDoParallel(myClusters)
results <-
foreach(i = 1:nrow(input_csv),
.packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
.errorhandling = 'remove',
.verbose = TRUE) %dopar%
rowRef <- input_csv[i, ]
# read data for the unique location in `rowRef`
weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations
# save the results as csv
fwrite(temp_result, file.path(paste0('output_iter_',i,'_',pr,'.csv')))
gc()
parallel::stopCluster(myClusters)
gc()
这也有效,直到说 pr = 7 或 pr = 8 迭代然后不继续 也不会生成任何错误消息。我很困惑。
编辑 这就是我的 CPU 使用率。我只使用了 4 个核心来生成这个图像。谁能解释这张图片中是否有任何东西可以解决我的问题。
【问题讨论】:
好像你正在返回temp_result
。是内存问题吗?
是的,我正在返回 temp_result。有什么方法可以检查它是否确实是由内存问题引起的,因为没有产生错误。脚本仅在 25% 或 30% 或 10% 处停止并且不会移动。如果我终止作业,仍然不会产生错误。
你应该打开某种系统监视器。
几个月前,有人在导出大量文件时遇到问题,他们也使用了fwrite()
,但看起来他们删除了这个问题。如果我没记错的话,例如 50 个文件的速度更快,但例如 500 个文件的速度较慢。我无法记住差异的大小。综上所述,可能值得尝试将fwrite()
换成readr::write_csv()
。另一种可能性是,考虑到将文件全部保存到results
,您可以尝试在另一个步骤中写入文件
好的。感谢您的评论。我将阅读 readr 函数并检查它是否有帮助
【参考方案1】:
您可以使用progres-s-r 包以交互方式跟踪内存使用情况。
例如furrr
包:
library(furrr)
library(pryr)
plan(multisession,workers=6)
library(progres-s-r)
handlers("progress")
#input_csv <- fread('inputFile.csv'))
#filesID <- as.list(1:nrow(input_csv))
filesID <- as.list(1:12)
with_progress(
p <- progressor(along = filesID)
result <- future_map(filesID, function(fileID)
#rowRef <- input_csv[fileID, ]
# read data for the unique location in `rowRef`
#weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations : simulate memory increase
temp_result <- rnorm(2e7)
# save the results as csv
#fwrite(temp_result, file.path(paste0('output_iter_',fileID,'.csv')))
Sys.sleep(2)
p(sprintf("memory used=%g", pryr::mem_used()))
return(object.size(temp_result))
,.options=future_options(packages=c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr")))
)
[====================================================>-------] 90% memory used=6.75075e+08
同样的方法也适用于 foreach。
另一个建议是不要将结果返回到主进程,因为您已经将它们存储在文件中。除了return(temp_result)
,您还可以输出摘要,例如object.size
,因为您知道可以在相关文件中找到完整的结果。
【讨论】:
快速提问:Sys.sleep(3)
在您的代码中的用途是什么?
只是为了有足够的时间看到进度条,因为我的代码不处理数据
@89_Simple,这是否有助于获取有关意外冻结原因的更多信息? furrr
可以吗?还是你更愿意留在 foreach/doSNOW?
我现在正在测试这个。很抱歉这次延误。我会尽快回复您并提供更多信息
您也可以尝试在每次计算结束时运行gc()
,以检查垃圾回收是否有助于减少内存增加。【参考方案2】:
从您的代码中无法完全看出它为什么会停止。也许您的foreach
循环的某些部分不是线程安全的(例如data.table
使用多线程进行子集化)?
就目前而言,几乎没有什么可以改变的,@Waldi 的回答很可能擅长诊断实际问题。唯一似乎明显要改变的是,通过利用 foreach
的底层功能来避免迭代 data.frame
的单行。
foreach
执行并行编程的方式是在对象上创建一个迭代器。对于并行编程,每次迭代之间会有一些开销,因为线程/核心需要请求新信息。因此,通过最小化迭代次数来最小化这种开销时间是有益的。我们可以通过将数据集拆分成块或通过 iterators
包手动创建迭代器来做到这一点。
我无权访问您的数据,因此下面是使用 mtcars
数据集的可重现示例。为了便于阅读,我将其拆分为 setup 和 foreach 块。请注意,我的示例中的files
是一个简单的向量,因此需要对问题中显示的实际代码进行一些最小的更改,因为foreach
循环中的files
现在变成了data.frame
,而不是向量。
设置
library(iterators)
library(foreach)
library(data.table)
library(arrow)
library(doParallel)
# Set up reproducible example:
data(mtcars)
files <- replicate(100, tempfile())
lapply(files, function(x)write_parquet(mtcars, x))
# Split the files into chunks for the iterator
nc <- parallel::detectCores()
sfiles <- split(files, seq_len(length(files)) %% nc + 1)
# Set up backend
th <- parallel::makeCluster(nc)
registerDoParallel(th)
Foreach
foreach(files = sfiles, #Note the iterator will name each chunk 'files' within the loop.
.packages = c('data.table', 'arrow', 'dplyr'),
.combine = c, # Because I return the resulting file names
.multicombine = TRUE) %dopar%
# Iterate over each chunk within foreach
# Reduces loop overhead
outF <- character(length(files))
for(i in seq_along(files))
tib <- arrow::read_parquet(files[i])
# Do some stuff
tib <- tib %>% select(mpg, hp)
# Save output
outF[i] <- tempfile(fileext = '.csv')
fwrite(tib, outF[i])
# Return list of output files
return(outF)
现在我不相信这会解决问题,但它可以稍微减少您的开销。
【讨论】:
【参考方案3】:您需要将注意力从每个文件循环上移开,因为这不是问题。问题在于处理文件中的内容。问题是,当您尝试每行创建一个文件时,您并没有在每一行之后提交写入,因此一个文件和逐行的整个过程会堆积在内存中。您需要在写入文件并关闭连接时刷新内存。
如果可能,请尝试按照以下示例使用 apply
For each row in an R dataframe
尝试在写入文件时关闭与文件的连接 参考如下:
https://stat.ethz.ch/R-manual/R-devel/library/base/html/connections.html
【讨论】:
以上是关于对于 R 中的大迭代,foreach 循环变得不活动的主要内容,如果未能解决你的问题,请参考以下文章