R中的流处理大型csv文件

Posted

技术标签:

【中文标题】R中的流处理大型csv文件【英文标题】:Stream processing large csv file in R 【发布时间】:2017-07-18 10:56:38 【问题描述】:

我需要对一个非常大的 csv 文件 (c.8.5GB) 进行一些相对简单的更改。我最初尝试使用各种阅读器功能:read.csv、readr::read.csv、data.table::fread。但是:它们都内存不足。

我想我需要改用流处理方法;读取一个块,更新它,写入它,重复。我在右边找到了this answer;但是我不知道如何终止循环(我对 R 比较陌生)。

所以我有两个问题:

    什么是使 while 循环工作的正确方法? 是否有更好的方法(对于“更好”的某些定义)?例如有没有办法使用 dplyr 和管道来做到这一点?

当前代码如下:

src_fname <- "testdata/model_input.csv"
tgt_fname <- "testdata/model_output.csv"

#Changes needed in file: rebase identifiers, set another col to constant value
rebase_data <- function(data, offset) 
  data$'Unique Member ID' <- data$'Unique Member ID' - offset
  data$'Client Name' <- "TestClient2"
  return(data)


CHUNK_SIZE <- 1000
src_conn = file(src_fname, "r")
data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE)
cols <- colnames(data)
offset <- data$'Unique Member ID'[1] - 1

data <- rebase_data(data, offset)
#1st time through, write the headers
tgt_conn = file(tgt_fname, "w")
write.csv(data,tgt_conn, row.names=FALSE)

#loop over remaining data
end = FALSE
while(end == FALSE) 
  data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE, col.names = cols)
  data <- rebase_data(data, offset)
  #write.csv doesn't support col.names=FALSE; so use write.table which does
  write.table(data, tgt_conn, row.names=FALSE, col.names=FALSE, sep=",")
  # ??? How to test for EOF and set end = TRUE if so  ???
  # This doesn't work, presumably because nrow() != CHUNK_SIZE on final loop?
  if (nrow(data) < CHUNK_SIZE) 
    end <- TRUE
  


close(src_conn)
close(tgt_conn)

感谢您的任何指点。

【问题讨论】:

在 CRAN 上查看包 chunked。它允许从文本文件中逐块读取,特别有趣的是,使用 dplyr 进行逐块处理。没有小插图,但在 github.com/edwindj/chunked 上的用法介绍我想自己尝试但没有找到时间! 【参考方案1】:

抱歉戳了一个 2 年前的线程,但现在使用 readr::read_csv_chunked(在加载 tidyverse 时与 dplyr 一起自动加载),我们也可以这样做:

require(tidyverse)

## For non-exploratory code, as @antoine-sac suggested, use:
# require(readr)  # for function `read_csv_chunked` and `read_csv`
# require(dplyr)  # for the pipe `%>%` thus less parentheses

src_fname = "testdata/model_input.csv"
tgt_fname = "testdata/model_output.csv"

CHUNK_SIZE = 1000

offset = read_csv(src_fname, n_max=1)$comm_code %>% as.numeric() - 1 

rebase.chunk = function(df, pos) 
  df$comm_code = df$comm_code %>% as.numeric() - offset
  df$'Client Name' = "TestClient2"
  is.append = ifelse(pos > 1, T, F)
  df %>% write_csv(
    tgt_fname,
    append=is.append
  )


read_csv_chunked(
  src_fname, 
  callback=SideEffectChunkCallback$new(rebase.chunk), 
  chunk_size = chunck.size,
  progress = T    # optional, show progress bar
)

这里棘手的部分是根据参数pos设置is.append,该参数表示原始文件中数据框df的起始行号。在readr::write_csv 内,当append=F 时,标头(列名)将被写入文件,否则不会。

【讨论】:

是的,截至 2019 年,这是 IMO 的最佳解决方案!关键的新功能是来自包readrread_csv_chunked。不建议在非探索性代码中使用tidyverse 包。 @antoine-sac 感谢您的评论,按照您的建议进行了更新。【参考方案2】:

试试这个:

library("chunked")

read_chunkwise(src_fname, chunk_size=CHUNK_SIZE) %>%
rebase_data(offset) %>%
write_chunkwise(tgt_fname)

你可能需要稍微调整一下列名才能得到你想要的。

(免责声明:没有尝试过代码)

请注意,包中没有小插图,但标准用法在 github 上进行了描述:https://github.com/edwindj/chunked/

【讨论】:

非常感谢 - 在我的谷歌搜索中没有发现分块。看起来就是这样。【参考方案3】:

好的我找到了解决办法,如下:

# src_fname <- "testdata/model_input.csv"
# tgt_fname <- "testdata/model_output.csv"

CHUNK_SIZE <- 20000

#Changes needed in file: rebase identifiers, set another col to constant value
rebase_data <- function(data, offset) 
  data$'Unique Member ID' <- data$'Unique Member ID' - offset
  data$'Client Name' <- "TestClient2"
  return(data)


#--------------------------------------------------------
# Get the structure first to speed things up
#--------------------------------------------------------
structure <- read.csv(src_fname, nrows = 2, check.names = FALSE)
cols <- colnames(structure)
offset <- structure$'Unique Member ID'[1] - 1

#Open the input & output files for reading & writing
src_conn = file(src_fname, "r")
tgt_conn = file(tgt_fname, "w")

lines_read <- 0
end <- FALSE
read_header <- TRUE
write_header <- TRUE
while(end == FALSE) 
  data <- read.csv(src_conn, nrows = CHUNK_SIZE, check.names=FALSE, col.names = cols, header = read_header)
  if (nrow(data) > 0) 
    lines_read <- lines_read + nrow(data)
    print(paste0("lines read this chunk: ", nrow(data), ", lines read so far: ", lines_read))
    data <- rebase_data(data, offset)
    #write.csv doesn't support col.names=FALSE; so use write.table which does
    write.table(data, tgt_conn, row.names=FALSE, col.names=write_header, sep = ",")
  
  if (nrow(data) < CHUNK_SIZE) 
    end <- TRUE
  
  read_header <- FALSE
  write_header <- FALSE

close(src_conn)
close(tgt_conn)

【讨论】:

以上是关于R中的流处理大型csv文件的主要内容,如果未能解决你的问题,请参考以下文章

在 R 中处理大型 csv 文件时避免挂断

从 R 中的大型 .CSV 导入和提取随机样本

从R中的大型.CSV导入和提取随机样本

109:大型CSV文件的处理方式

Flink 管理大型状态之增量 Checkpoint

109.大型的csv文件的处理方式