从 csv 文件中逐块读取和反转数据并复制到新的 csv 文件
Posted
技术标签:
【中文标题】从 csv 文件中逐块读取和反转数据并复制到新的 csv 文件【英文标题】:Read and reverse data chunk by chunk from a csv file and copy to a new csv file 【发布时间】:2019-04-02 01:21:47 【问题描述】:假设我正在处理一个非常大的 csv 文件。所以,我只能将数据逐块读取到内存中。预期的事件流应该如下:
1) 使用 pandas 从 csv 读取数据块(例如:10 行)。
2) 颠倒数据顺序
3) 将每一行反向复制到新的 csv 文件。所以每个块(10行)是 以相反的顺序从头开始写入 csv。
最后,csv 文件应该是相反的顺序,并且应该在不将整个文件加载到 Windows 操作系统的内存中的情况下完成。
我正在尝试进行时间序列预测,我需要数据从旧到最新(第一行最旧条目)。我无法将整个文件加载到内存中,如果可能的话,我正在寻找一种方法来一次处理每个块。
我在来自 kaggle 的 Rossmann dataset 的 train.csv
上尝试的数据集。你可以从这个github repo
得到它
我的尝试没有正确地将行复制到新的 csv 文件中。
下面是我的代码:
import pandas as pd
import csv
def reverse():
fields = ["Store","DayOfWeek","Date","Sales","Customers","Open","Promo","StateHoliday",
"SchoolHoliday"]
with open('processed_train.csv', mode='a') as stock_file:
writer = csv.writer(stock_file,delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
writer.writerow(fields)
for chunk in pd.read_csv("train.csv", chunksize=10):
store_data = chunk.reindex(index=chunk.index[::-1])
append_data_csv(store_data)
def append_data_csv(store_data):
with open('processed_train.csv', mode='a') as store_file:
writer = csv.writer(store_file,delimiter=',', quotechar='"',
quoting=csv.QUOTE_MINIMAL)
for index, row in store_data.iterrows():
print(row)
writer.writerow([row['Store'],row['DayOfWeek'],row['Date'],row['Sales'],
row['Customers'],row['Open'],row['Promo'],
row['StateHoliday'],row['SchoolHoliday']])
reverse()
提前谢谢你
【问题讨论】:
您希望对整个数据进行时间排序,但您从一开始就一次排序和写入 10 个数据。这就是为什么你没有得到结果。为什么不从最后阅读并反转 train.csv。见,***.com/a/10933932/2895956 我之前尝试过该帖子中给出的代码。但它没有给我一个合适的结果。 您能详细说明一下吗?你到底得到了什么?如果可能的话,将文件上传到保管箱中。过程太长,(下载该文件一个人必须登录,注册然后填写信息......bla..bla..)只是为了下载文件。 注意:不是一个完美的解决方案。但是,您可以使用 cmd 行、tail -r train.csv > reverse.csv 简单地还原文件。并手动删除最后一行并将其添加到 reverse.csv 中的第一行 【参考方案1】:使用 bash,您可以将除第一行之外的整个文件拖尾,然后将其反转并存储:
tail -n +2 train.csv | tac > train_rev.csv
如果要在反转文件中保留header,先写好再追加反转内容
head -1 train.csv > train_rev.csv; tail -n +2 train.csv | tac >> train_rev.csv
【讨论】:
除非您有硬性要求在不同的操作系统上运行它,否则这可能是最快/最好的答案。【参考方案2】:我不建议使用pandas
来解析或流式传输任何文件,因为您只会引入额外的开销。最好的方法是从下往上读取文件。好吧,这段代码的很大一部分实际上来自here,它在其中接收一个文件并在生成器中返回相反的内容,我相信这就是你想要的。
我所做的只是使用您提供的链接中的文件train.csv
对其进行测试,并将结果输出到一个新文件中。
import os
def reverse_readline(filename, buf_size=8192):
"""a generator that returns the lines of a file in reverse order"""
with open(filename) as fh:
segment = None
offset = 0
fh.seek(0, os.SEEK_END)
file_size = remaining_size = fh.tell()
while remaining_size > 0:
offset = min(file_size, offset + buf_size)
fh.seek(file_size - offset)
buffer = fh.read(min(remaining_size, buf_size))
remaining_size -= buf_size
lines = buffer.split('\n')
# the first line of the buffer is probably not a complete line so
# we'll save it and append it to the last line of the next buffer
# we read
if segment is not None:
# if the previous chunk starts right from the beginning of line
# do not concact the segment to the last line of new chunk
# instead, yield the segment first
if buffer[-1] != '\n':
lines[-1] += segment
else:
yield segment
segment = lines[0]
for index in range(len(lines) - 1, 0, -1):
if lines[index]:
yield lines[index]
# Don't yield None if the file was empty
if segment is not None:
yield segment
reverse_gen = reverse_readline('train.csv')
with open('rev_train.csv','w') as f:
for row in reverse_gen:
f.write('\n'.format(row))
它基本上是反向读取它,直到找到一个换行符,然后从文件中从下到上生成一个line
。一种非常有趣的方式。
【讨论】:
如果使用 UTF-8,这实际上会导致文本块重叠和丢失。您不能只在可变宽度编码文件中寻找!接下来,如果 CSV 文件在单元格值中嵌入换行符,则反转不能只是在换行符上任意拆分。 接下来,符合 RFC 的 CSV 文件将使用\r\n
行分隔符,您的代码假定始终使用 \n
,并且单元格值中没有嵌入的换行符。【参考方案3】:
这完全符合您的要求,但没有 Pandas。它逐行读取 intest.csv(而不是将整个文件读入 RAM)。它使用文件系统执行大部分处理,该文件系统使用一系列块文件,这些块文件在最后聚合到 outtest.csv 文件中。如果您更改 maxLines,您可以优化生成的块文件数量与消耗的 RAM(数字越大消耗的 RAM 越多,但生成的块文件越少)。如果要保留 CSV 标题的第一行,请将 keepHeader 设置为 True;如果设置为 False,它将反转整个文件,包括第一行。
为了好玩,我在旧的 Raspberry Pi 上使用 128GB 闪存驱动器在 6MB csv 测试文件上运行了这个程序,我认为出了点问题,因为它几乎立即返回,所以即使在较慢的硬件上它也很快。它只导入一个标准的python库函数(删除),所以它非常便携。此代码的一个优点是它不会重新定位任何文件指针。一个限制是它不适用于数据中有换行符的 CSV 文件。对于该用例,pandas 将是读取块的最佳解决方案。
from os import remove
def writechunk(fileCounter, reverseString):
outFile = 'tmpfile' + str(fileCounter) + '.csv'
with open(outFile, 'w') as outfp:
outfp.write(reverseString)
return
def main():
inFile = 'intest.csv'
outFile = 'outtest.csv'
# This is our chunk expressed in lines
maxLines = 10
# Is there a header line we want to keep at the top of the output file?
keepHeader = True
fileCounter = 0
lineCounter = 0
with open(inFile) as infp:
reverseString = ''
line = infp.readline()
if (line and keepHeader):
headerLine = line
line = infp.readline()
while (line):
lineCounter += 1
reverseString = line + reverseString
if (lineCounter == maxLines):
fileCounter += 1
lineCounter = 0
writechunk(fileCounter, reverseString)
reverseString = ''
line = infp.readline()
# Write any leftovers to a chunk file
if (lineCounter != 0):
fileCounter += 1
writechunk(fileCounter,reverseString)
# Read the chunk files backwards and append each to the outFile
with open(outFile, 'w') as outfp:
if (keepHeader):
outfp.write(headerLine)
while (fileCounter > 0):
chunkFile = 'tmpfile' + str(fileCounter) + '.csv'
with open(chunkFile, 'r') as infp:
outfp.write(infp.read())
remove(chunkFile)
fileCounter -= 1
if __name__ == '__main__':
main()
【讨论】:
这确实有效,但假设 CSV 单元格值中没有换行符。 谢谢,@MartijnPieters。你是对的。但是,这是原始问题中表达的用例。我将更新答案以指出此限制。这是为了尽可能减少占用空间而设计的,但我们可能会使用 pandas 的内置块函数来读取文件的位,这会以牺牲效率(以及适度增加内存需求)为代价来克服这一限制。 绝对是用例表示的,它要求能够处理任何个巨大的CSV文件。【参考方案4】:如果你有足够的硬盘空间,你可以分块读取,反向存储。然后以相反的顺序取出存储的块并写入一个新的 csv 文件。
以下是 Pandas 的示例,它还使用了 pickle(提高性能)和 gzip(提高存储效率)。
import pandas as pd, numpy as np
# create a dataframe for demonstration purposes
df = pd.DataFrame(np.arange(5*9).reshape((-1, 5)))
df.to_csv('file.csv', index=False)
# number of rows we want to chunk by
n = 3
# iterate chunks, output to pickle files
for idx, chunk in enumerate(pd.read_csv('file.csv', chunksize=n)):
chunk.iloc[::-1].to_pickle(f'file_pkl_idx:03.pkl.gzip', compression='gzip')
# open file in amend mode and write chunks in reverse
# idx stores the index of the last pickle file written
with open('out.csv', 'a') as fout:
for i in range(idx, -1, -1):
chunk_pkl = pd.read_pickle(f'file_pkl_i:03.pkl.gzip', compression='gzip')
chunk_pkl.to_csv(fout, index=False, header=False if i!=idx else True)
# read new file to check results
df_new = pd.read_csv('out.csv')
print(df_new)
0 1 2 3 4
0 40 41 42 43 44
1 35 36 37 38 39
2 30 31 32 33 34
3 25 26 27 28 29
4 20 21 22 23 24
5 15 16 17 18 19
6 10 11 12 13 14
7 5 6 7 8 9
8 0 1 2 3 4
【讨论】:
实际上,每个文件块都应该被反转,接下来每个反转的行块应该被前置而不是附加到 csv 文件。因此,整个文件将在最后反转,但不会将整个文件加载到内存中。 @SaiKumar,知道了,已经更新了一个完全反转的解决方案。【参考方案5】:你有重复的代码块,你根本没有利用 pandas。
@sujay kumar 指出的非常正确,我会更仔细地阅读。
文件一点也不大。我使用以 GB 为单位的 OHLCV 刻度数据,没有问题。如果您使用pandas.read_csv()
,则不必进行分块传输。当然这需要时间,但它会正常工作。除非你要进入太字节。我没有测试过。
当您read_csv()
时,您没有指定任何索引。如果您这样做了,您可以根据订单在有或没有ascending=False
的情况下致电sort_index()
。
Pandas 也可以编写 CSV,请改用它。我正在粘贴一些示例代码供您整理。
df_temp = pd.read_csv(file_path, parse_dates=True, index_col="Date",
usecols=["Date", "Adj Close"], na_values=["nan"])
对系列进行排序
s = pd.Series(list('abcde'), index=[0,3,2,5,4])
s.sort_index()
注意:如果您坚持使用 Pandas 及其功能,您将运行已经优化的代码,不需要将整个文件加载到内存中。这太容易了,几乎就像作弊:)
【讨论】:
以上是关于从 csv 文件中逐块读取和反转数据并复制到新的 csv 文件的主要内容,如果未能解决你的问题,请参考以下文章