python 3中beam.io FileBasedSource中的open_file问题

Posted

技术标签:

【中文标题】python 3中beam.io FileBasedSource中的open_file问题【英文标题】:open_file in beam.io FileBasedSource issue with python 3 【发布时间】:2020-02-18 09:17:17 【问题描述】:

我正在使用 CSVRecordSource 读取 Apache Beam 管道中的 CSV,该管道在 read_records 函数中使用 open_file。

使用 python 2 一切正常,但是当我迁移到 python 3 时,它在下面抱怨

next(csv_reader)
_csv.Error: iterator should return strings, not bytes (did you open the file in text mode?)

默认情况下 open_file 方法以二进制模式打开文件。

所以我把它改成使用

with open(filename, "rt") as f:

但是当我在谷歌云中运行数据流时它失败了,因为它无法找到文件并给出错误

FileNotFoundError: [Errno 2] No such file or directory

下面是我的代码

 with self.open_file(filename) as f:
      csv_reader = csv.reader(f, delimiter=self.delimiter, quotechar=self.quote_character)
      header = next(csv_reader)

如何在 python 3 中使用 CSVRecordSource?

【问题讨论】:

请告诉我你在哪里使用这个功能?在 DoFn 中? 我在 Beam 管道的 Read(CSVRecordSource(input)) 中使用它。 【参考方案1】:

您是否使用此处定义的 open_file 方法:https://github.com/apache/beam/blob/6f6feaaeebfc82302ba83c52d087b06a12a5b119/sdks/python/apache_beam/io/filebasedsource.py#L166?

如果是这样,我认为您可以调用底层的FileSystems.open(),并将'application/octet-stream' 替换为'text/plain'

【讨论】:

【参考方案2】:

我通过使用迭代解码迭代器提供的输入(字节)的 iterdecode 解决了这个问题

csv.reader(codecs.iterdecode(f, "utf-8"), delimiter=self.delimiter, quotechar=self.quote_character)

【讨论】:

以上是关于python 3中beam.io FileBasedSource中的open_file问题的主要内容,如果未能解决你的问题,请参考以下文章

使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS

在 beam.io.writetobigquery 中使用模式更新选项

使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表

如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表

apache beam ElasticSearchIO 遇到异常后job中断执行 自己定制beam IO

如何解决这个 MERN 堆栈 filebase64 错误?