有没有办法使用 ReadFromText 转换(Python)在 Apache Beam 中读取多行 csv 文件?
Posted
技术标签:
【中文标题】有没有办法使用 ReadFromText 转换(Python)在 Apache Beam 中读取多行 csv 文件?【英文标题】:Is there a way to read a multi-line csv file in Apache Beam using the ReadFromText transform (Python)? 【发布时间】:2018-09-29 11:40:59 【问题描述】:有没有办法在 Python 中使用ReadFromText
转换来读取多行 csv 文件?我有一个文件,其中包含一行我试图让 Apache Beam 将输入读取为一行,但无法让它工作。
def print_each_line(line):
print line
path = './input/testfile.csv'
# Here are the contents of testfile.csv
# foo,bar,"blah blah
# more blah blah",baz
p = apache_beam.Pipeline()
(p
| 'ReadFromFile' >> apache_beam.io.ReadFromText(path)
| 'PrintEachLine' >> apache_beam.FlatMap(lambda line: print_each_line(line))
)
# Here is the output:
# foo,bar,"blah blah
# more blah blah",baz
尽管多行 csv 文件的标准是将多行元素包含在双引号中,但上述代码将输入解析为两行。
【问题讨论】:
你需要一个只有一行的 PCollection。我说的对吗? @ArjunKay 是的,目前我的输入是一行,但 beam 将其视为两行 你们知道新版本是否改进了对多行 CSV 的支持吗?鉴于这是很久以前问的?我找不到很多相关材料。 【参考方案1】:Beam 不支持解析 CSV 文件。但是,您可以使用 Python 的 csv.reader。这是一个例子:
import apache_beam
import csv
def print_each_line(line):
print line
p = apache_beam.Pipeline()
(p
| apache_beam.Create(["test.csv"])
| apache_beam.FlatMap(lambda filename:
csv.reader(apache_beam.io.filesystems.FileSystems.open(filename)))
| apache_beam.FlatMap(print_each_line))
p.run()
输出:
['foo', 'bar', 'blah blah\nmore blah blah', 'baz']
【讨论】:
【参考方案2】:没有一个答案对我有用,但确实如此
(
p
| beam.Create(['data/test.csv'])
| beam.FlatMap(lambda filename:
csv.reader(io.TextIOWrapper(beam.io.filesystems.FileSystems.open(known_args.input)))
| "Take only name" >> beam.Map(lambda x: x[0])
| WriteToText(known_args.output)
)
【讨论】:
【参考方案3】:ReadFromText
将文本文件解析为换行符分隔的元素。所以ReadFromText
将两行视为两个元素。如果您想将文件的内容作为单个元素,您可以执行以下操作:
contents = []
contents.append(open(path).read())
p = apache_beam.Pipeline()
p | beam.Create(contents)
【讨论】:
以上是关于有没有办法使用 ReadFromText 转换(Python)在 Apache Beam 中读取多行 csv 文件?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 python 将字典写入 Dataflow 中的 Bigquery
有没有办法使用 Guava 获取 InputStream 的哈希码?