如何从流中读取 CSV 文件并在写入时处理每一行?

Posted

技术标签:

【中文标题】如何从流中读取 CSV 文件并在写入时处理每一行?【英文标题】:How to read a CSV file from a stream and process each line as it is written? 【发布时间】:2011-09-27 05:13:26 【问题描述】:

我想从标准输入中读取一个 CSV 文件并处理每一行。我的 CSV 输出代码一一写入行,但我的读者在迭代行之前等待流终止。这是csv 模块的限制吗?我是不是做错了什么?

我的阅读器代码:

import csv
import sys
import time


reader = csv.reader(sys.stdin)
for row in reader:
    print "Read: (%s) %r" % (time.time(), row)

我的作者代码:

import csv
import sys
import time


writer = csv.writer(sys.stdout)
for i in range(8):
    writer.writerow(["R%d" % i, "$" * (i+1)])
    sys.stdout.flush()
    time.sleep(0.5)

python test_writer.py | python test_reader.py 的输出:

Read: (1309597426.3) ['R0', '$']
Read: (1309597426.3) ['R1', '$$']
Read: (1309597426.3) ['R2', '$$$']
Read: (1309597426.3) ['R3', '$$$$']
Read: (1309597426.3) ['R4', '$$$$$']
Read: (1309597426.3) ['R5', '$$$$$$']
Read: (1309597426.3) ['R6', '$$$$$$$']
Read: (1309597426.3) ['R7', '$$$$$$$$']

如您所见,所有打印语句都是同时执行的,但我预计会有 500 毫秒的间隔。

【问题讨论】:

如果你只运行python test_writer.py会发生什么? 【参考方案1】:

因为它says in the documentation,

为了使for 循环成为循环文件行的最有效方式(一种非常常见的操作),next() 方法使用隐藏的预读缓冲区。

您可以通过查看 the implementation of the csv module(第 784 行)看到 csv.reader 调用了底层迭代器的 next() 方法(通过 PyIter_Next)。

因此,如果您真的想要无缓冲读取 CSV 文件,则需要将文件对象(此处为 sys.stdin)转换为迭代器,其 next() 方法实际上调用 readline()。这可以使用iter 函数的两个参数形式轻松完成。所以把test_reader.py中的代码改成这样:

for row in csv.reader(iter(sys.stdin.readline, '')):
    print("Read: () !r".format(time.time(), row))

例如,

$ python test_writer.py | python test_reader.py
Read: (1388776652.964925) ['R0', '$']
Read: (1388776653.466134) ['R1', '$$']
Read: (1388776653.967327) ['R2', '$$$']
Read: (1388776654.468532) ['R3', '$$$$']
[etc]

您能解释一下为什么需要无缓冲读取 CSV 文件吗?无论您尝试做什么,都可能有更好的解决方案。

【讨论】:

很好的答案,谢谢。我需要这个的原因是因为处理结果会为我提供速度。第一个操作是从网络读取,第二个操作是写入磁盘,它们都需要一定的 CPU 密集型转换。我还需要它们是可链接的(通过管道)以便能够重用脚本(a la unix)。【参考方案2】:

也许这是一个限制。阅读此http://docs.python.org/using/cmdline.html#cmdoption-unittest-discover-u

注意有内部缓冲 在 file.readlines() 和文件对象中 (对于 sys.stdin 中的行)不是 受此选项影响。去工作 围绕这个,你会想要使用 file.readline() 在 while 1 内: 循环。

我修改了 test_reader.py 如下:

import csv, sys, time

while True:
    print "Read: (%s) %r" % (time.time(), sys.stdin.readline())

输出

python test_writer.py | python  test_reader.py
Read: (1309600865.84) 'R0,$\r\n'
Read: (1309600865.84) 'R1,$$\r\n'
Read: (1309600866.34) 'R2,$$$\r\n'
Read: (1309600866.84) 'R3,$$$$\r\n'
Read: (1309600867.34) 'R4,$$$$$\r\n'
Read: (1309600867.84) 'R5,$$$$$$\r\n'
Read: (1309600868.34) 'R6,$$$$$$$\r\n'
Read: (1309600868.84) 'R7,$$$$$$$$\r\n'

【讨论】:

你是对的。但是我如何让 csv.reader 来利用这个 hack?【参考方案3】:

您正在刷新标准输出,但不是标准输入。

Sys.stdin 也有一个flush() 方法,如果您真的想禁用缓冲,请尝试在读取每一行后使用它。

【讨论】:

如果反对者留下一点解释,那就太酷了。我真的很想知道为什么打电话stdin.flush()没有帮助。

以上是关于如何从流中读取 CSV 文件并在写入时处理每一行?的主要内容,如果未能解决你的问题,请参考以下文章

python如何根据csv中一列的内容对另一列进行写入

NAudio在改变音高而不是文件时寻找一种从流中读取的方法

java如何直接从流中读取excel的文件内容?Stream

使用流异步读取文件时如何同步处理每一行/缓冲区[重复]

StreamReader.ReadLine() 不消耗流

iOS - 如何从流中读取音频并播放音频