python 3中beam.io FileBasedSource中的open_file问题
Posted
技术标签:
【中文标题】python 3中beam.io FileBasedSource中的open_file问题【英文标题】:open_file in beam.io FileBasedSource issue with python 3 【发布时间】:2020-02-18 09:17:17 【问题描述】:我正在使用 CSVRecordSource 读取 Apache Beam 管道中的 CSV,该管道在 read_records 函数中使用 open_file。
使用 python 2 一切正常,但是当我迁移到 python 3 时,它在下面抱怨
next(csv_reader)
_csv.Error: iterator should return strings, not bytes (did you open the file in text mode?)
默认情况下 open_file 方法以二进制模式打开文件。
所以我把它改成使用
with open(filename, "rt") as f:
但是当我在谷歌云中运行数据流时它失败了,因为它无法找到文件并给出错误
FileNotFoundError: [Errno 2] No such file or directory
下面是我的代码
with self.open_file(filename) as f:
csv_reader = csv.reader(f, delimiter=self.delimiter, quotechar=self.quote_character)
header = next(csv_reader)
如何在 python 3 中使用 CSVRecordSource?
【问题讨论】:
请告诉我你在哪里使用这个功能?在 DoFn 中? 我在 Beam 管道的 Read(CSVRecordSource(input)) 中使用它。 【参考方案1】:您是否使用此处定义的 open_file 方法:https://github.com/apache/beam/blob/6f6feaaeebfc82302ba83c52d087b06a12a5b119/sdks/python/apache_beam/io/filebasedsource.py#L166?
如果是这样,我认为您可以调用底层的FileSystems.open()
,并将'application/octet-stream'
替换为'text/plain'
。
【讨论】:
【参考方案2】:我通过使用迭代解码迭代器提供的输入(字节)的 iterdecode 解决了这个问题
csv.reader(codecs.iterdecode(f, "utf-8"), delimiter=self.delimiter, quotechar=self.quote_character)
【讨论】:
以上是关于python 3中beam.io FileBasedSource中的open_file问题的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS
在 beam.io.writetobigquery 中使用模式更新选项
使用 apache beam 中的 beam.io.gcp.bigquery.WriteToBigQuery 模块写入日期分区的 Bigquery 表
如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表