R foreach:并行读取和操作多个文件

Posted

技术标签:

【中文标题】R foreach:并行读取和操作多个文件【英文标题】:R foreach: Read and manipulate multiple files in parallel 【发布时间】:2021-03-18 00:05:51 【问题描述】:

我有 500 个 tar.xz 文件,其中包含 2000 个 csv 文件。我需要一次解压几个 tar 文件(因为磁盘空间),将它们处理成 data.table,从磁盘中删除 csv 文件,然后将结果保存为 RDS,然后再继续下几个 tar 文件。

我的函数在串行中运行良好,但在并行时它会在内核之间混淆文件。这是为什么呢?

一些样本数据:

    for(j in 1:5)
     for(i in 1:5)
      a<-df[sample(x = 1:nrow(df), size = 50, replace = TRUE),]
      write.csv(a,paste0("seed_",i,".csv"))
      lf<-list.files(pattern=".csv")
                  
     tar(tarfile = paste0("seed_",j,".tar"),files = lf,compression = c("xz"), tar="tar")
                 

foreach 示例代码

require(dplyr)
require(tidyr)
require(foreach)
require(doParallel)
require(magrittr)

#List all tar files in directory
list_of_files<-list.files(pattern = ".tar")

  packsINeed<-c("vroom","magrittr","dplyr","tidyr","doParallel")
    
  #Start for loop

myCluster<-makeCluster(6,type="PSOCK")
registerDoParallel(myCluster) 

  foreach(i= 1:NROW(list_of_files),.packages = packsINeed)%dopar%

print(paste(list_of_files[i], "which is", i, "of", NROW(list_of_files) ))

print("2. Untar .csv files inside")
 untar(tarfile = list_of_files[i], exdir = "tempOutputFiles")



 print("#3. Read in files and add up two columns")
df<-vroom::vroom(list.files("tempOutputFiles/$.csv"), id="path")

df$A<-df$B+df$C

    print("#4. save RDS")

saveRDS(object = df, file = paste0(tools::file_path_sans_ext(list_of_files[i], compression = TRUE),".rds"))

 print("#5. Clean up files")

.files<-list.files("tempOutputFiles",pattern=".csv")

    file.remove(basename(.files))

使用 mclapply - 行为相同

require(dplyr)
require(tidyr)
require(foreach)
require(doParallel)
require(magrittr)

#List all tar files in directory
list_of_files<-list.files(pattern = ".tar")

myParFun

print(paste(filename))

print("2. Untar all .csv files inside")
 untar(tarfile = filename, exdir = "tempOutputFiles")



 print("#3. Read in files and add up two columns")
df<-vroom::vroom(list.files("tempOutputFiles/$.csv"), id="path")

df$A<-df$B+df$C

    print("#4. save RDS")

saveRDS(object = df, file = paste0(tools::file_path_sans_ext(filename, compression = TRUE),".rds"))

 print("#5. Clean up files")

   .files<-list.files("tempOutputFiles",pattern=".csv")

    file.remove(.files)


mclapply(FUN=myParFun, list_of_files, mc.cores=4)

根据 Waldi 的评论,我为 list_of_files 中的每个文件创建了一个目录,现在它可以正常工作了。但是有打鼾的方法吗?以 tempdir 为例?

【问题讨论】:

您能否提供一些代码,至少了解一下您是如何设计代码的? 感谢您查看此内容。我已经添加了循环的基本框架。我对文件的实际处理需要一段时间,但这只是一个示例。 这看起来很奇怪:file.remove(basename(.files))。这些文件已经是基本名称,我认为它们不应该。 啊,那是因为我认为 untar 提供了完整的路径。我发现文件在整个循环完成之前就被删除了,这很奇怪。 这里 (files&lt;-list.files("tempOutputFiles",pattern=".csv")) 你从这个目录中得到所有的 csv 文件(但只有基本名称,如果你想得到完整的路径,有一个参数)。 【参考方案1】:

按照 cmets 中的建议,下面的代码为每个进程 / tar 文件创建一个目录,解压缩,将 CSV 合并到 .rds 文件中并删除它们。 请注意,vroom 似乎需要 altrep = FALSE 参数来避免在删除时出现 permission denied error。

# Generate sample tars for test
write.csv(mtcars,'file1.csv')
write.csv(mtcars,'file2.csv')
write.csv(iris,'file3.csv')
write.csv(iris,'file4.csv')
tar('tar1.tar',files=c('file1.csv','file2.csv'),tar="tar")
tar('tar2.tar',files=c('file3.csv','file4.csv'),tar="tar")

require(dplyr)
require(tidyr)
require(foreach)
require(doParallel)
require(magrittr)

#List all tar files in directory
list_of_files<-list.files(pattern = "\\.tar")

packsINeed<-c("vroom","magrittr","dplyr","tidyr","doParallel")

#Start for loop

myCluster<-makeCluster(2,type="PSOCK")
registerDoParallel(myCluster) 

foreach(i= 1:NROW(list_of_files),.packages = packsINeed)%dopar%
  print(paste(list_of_files[i], "which is", i, "of", NROW(list_of_files) ))
  
  print("2. Untar .csv files inside")
  fileout <- tools::file_path_sans_ext(list_of_files[i], compression = TRUE)
  exdir <- paste0("temp",fileout)
  untar(tarfile = list_of_files[i], exdir = exdir)
  
  print("#3. Read in files and add up two columns")
  df<-vroom::vroom(file.path(exdir,dir(exdir,"*.csv")),altrep = FALSE)
  
  # df$A<-df$B+df$C   # These columns don't exist in mtcars used as example
  
  print("#4. save RDS")
  
  saveRDS(object = df, file = file.path(exdir,paste0(fileout,".rds")))
  
  print("#5. Clean up files")
  
  .files<-list.files(exdir,pattern="\\.csv")
  
  file.remove(file.path(exdir,.files))

不确定 .rds 应该放在哪里,所以暂时留在临时文件夹中。

【讨论】:

以上是关于R foreach:并行读取和操作多个文件的主要内容,如果未能解决你的问题,请参考以下文章

使用Java 8 Parallel Stream在并行读取多个文件时排除某些文件

R中的并行foreach共享内存

使用 R doParallel 或 foreach 从 mysql 并行获取数据

R 并行共享内存对象(Windows)

在foreach循环内没有收到标准输出[重复]

与 foreach 并行预测 nnet 输出时 R 内存爆炸