有没有办法使用 ReadFromText 转换(Python)在 Apache Beam 中读取多行 csv 文件?

Posted

技术标签:

【中文标题】有没有办法使用 ReadFromText 转换(Python)在 Apache Beam 中读取多行 csv 文件?【英文标题】:Is there a way to read a multi-line csv file in Apache Beam using the ReadFromText transform (Python)? 【发布时间】:2018-09-29 11:40:59 【问题描述】:

有没有办法在 Python 中使用ReadFromText 转换来读取多行 csv 文件?我有一个文件,其中包含一行我试图让 Apache Beam 将输入读取为一行,但无法让它工作。

def print_each_line(line):
    print line

path = './input/testfile.csv'
# Here are the contents of testfile.csv
# foo,bar,"blah blah
# more blah blah",baz

p = apache_beam.Pipeline()

(p
 | 'ReadFromFile' >> apache_beam.io.ReadFromText(path)
 | 'PrintEachLine' >> apache_beam.FlatMap(lambda line: print_each_line(line))
 )

# Here is the output:
# foo,bar,"blah blah
# more blah blah",baz

尽管多行 csv 文件的标准是将多行元素包含在双引号中,但上述代码将输入解析为两行。

【问题讨论】:

你需要一个只有一行的 PCollection。我说的对吗? @ArjunKay 是的,目前我的输入是一行,但 beam 将其视为两行 你们知道新版本是否改进了对多行 CSV 的支持吗?鉴于这是很久以前问的?我找不到很多相关材料。 【参考方案1】:

Beam 不支持解析 CSV 文件。但是,您可以使用 Python 的 csv.reader。这是一个例子:

import apache_beam
import csv

def print_each_line(line):
  print line

p = apache_beam.Pipeline()

(p 
 | apache_beam.Create(["test.csv"])
 | apache_beam.FlatMap(lambda filename:
     csv.reader(apache_beam.io.filesystems.FileSystems.open(filename)))
 | apache_beam.FlatMap(print_each_line))

p.run()

输出:

['foo', 'bar', 'blah blah\nmore blah blah', 'baz']

【讨论】:

【参考方案2】:

没有一个答案对我有用,但确实如此

(
  p
  | beam.Create(['data/test.csv'])
  | beam.FlatMap(lambda filename:
    csv.reader(io.TextIOWrapper(beam.io.filesystems.FileSystems.open(known_args.input)))
  | "Take only name" >> beam.Map(lambda x: x[0])
  | WriteToText(known_args.output)
)

【讨论】:

【参考方案3】:

ReadFromText 将文本文件解析为换行符分隔的元素。所以ReadFromText 将两行视为两个元素。如果您想将文件的内容作为单个元素,您可以执行以下操作:

contents = []
contents.append(open(path).read()) 
p = apache_beam.Pipeline()
p | beam.Create(contents)

【讨论】:

以上是关于有没有办法使用 ReadFromText 转换(Python)在 Apache Beam 中读取多行 csv 文件?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 python 将字典写入 Dataflow 中的 Bigquery

Apache Beam 处理文件

如何在 Linux 中转换 SSL 证书

有没有办法使用 Guava 获取 InputStream 的哈希码?

需要 DocuSign JWT 身份验证 .p12 而不是 PEM

Lodash:有没有办法用 lodash 将单词保持重音大写?