我们如何使用 python sdk 在 Apache Beam 中读取带有附件的 CSV 文件?

Posted

技术标签:

【中文标题】我们如何使用 python sdk 在 Apache Beam 中读取带有附件的 CSV 文件?【英文标题】:How can we read CSV Files with enclosure in Apache Beam using python sdk? 【发布时间】:2020-01-13 08:31:10 【问题描述】:

我正在阅读一个逗号分隔的 CSV 文件,其中的字段用双引号括起来,其中一些字段的值中还包含逗号,例如:"abc","def,ghi","jkl"

有没有办法我们可以使用 Apache Beam 将此文件读入 PCollection?

【问题讨论】:

这个问题似乎根本没有包括任何解决问题的尝试。请编辑问题以显示您尝试过的内容,并使用Minimal, Reproducible Example 显示您遇到的特定障碍。欲了解更多信息,请参阅How to Ask。 【参考方案1】:

数据用双引号括起来的示例 csv 文件。

"AAA", "BBB", "Test, Test", "CCC" 
"111", "222, 333", "XXX", "YYY, ZZZ"

您可以使用标准库中的csv module:

def print_row(element):
  print element

def parse_file(element):
  for line in csv.reader([element], quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL, skipinitialspace=True):
    return line

parsed_csv = (
                p 
                | 'Read input file' >> beam.io.ReadFromText(input_filename)
                | 'Parse file' >> beam.Map(parse_file)
                | 'Print output' >> beam.Map(print_row)
             )

这给出了以下输出

['AAA', 'BBB', 'Test, Test', 'CCC']
['111', '222, 333', 'XXX', 'YYY, ZZZ ']

需要注意的一件事是csv.reader 对象需要一个iterator,它将返回iterator 字符串。这意味着您不能将字符串直接传递给reader(),但您可以将其包含在list 中,如上所述。然后,您将遍历输出以获取最终字符串。

【讨论】:

感谢 Faizan,这就是我想要的。 很棒的答案。我喜欢mycsvreader = csv.reader(...) 后跟return next(mycsvreader) 之类的东西,以在代码中阐明只有一行会被返回,因为通过 Beam 移动行可能会让初学者感到困惑。基本上,mycsvreader 将是一个迭代器,你会得到下一个迭代器,因为这就是全部。

以上是关于我们如何使用 python sdk 在 Apache Beam 中读取带有附件的 CSV 文件?的主要内容,如果未能解决你的问题,请参考以下文章

Tron使用Python玩转SunSwap

Tron使用Python玩转SunSwap

Tron使用Python玩转SunSwap

华为 IoTDA(物联网平台)如何使用Python SDK 实现应用侧连接

Mac下配置apach服务

使用Python SDK时如何防止GCS自动解压对象?