Python Apache Beam 侧输入断言错误

Posted

技术标签:

【中文标题】Python Apache Beam 侧输入断言错误【英文标题】:Python Apache Beam Side Input Assertion Error 【发布时间】:2017-07-13 00:04:20 【问题描述】:

我还是 Apache Beam/Cloud Dataflow 的新手,所以如果我的理解不正确,我深表歉意。

我正在尝试通过管道读取约 30,000 行长的数据文件。我的简单管道首先从 GCS 打开 csv,从数据中提取标头,通过 ParDo/DoFn 函数运行数据,然后将所有输出写入 csv 回 GCS。该管道有效,是我的第一个测试。

然后我编辑了管道以读取 csv,提取标题,从数据中删除标题,通过 ParDo/DoFn 函数运行数据,并将标题作为侧输入,然后将所有输出写入一个.csv。唯一的新代码是将标头作为侧面输入传入并从数据中过滤。

ParDo/DoFn 函数 build_rows 只生成 context.element,以便我可以确保我的辅助输入正常工作。

我得到的错误如下: 我不确定问题是什么,但我认为这可能是由于内存限制。我将示例数据从 30,000 行减少到 100 行,我的代码终于可以工作了。

没有侧输入的管道会读/写所有 30,000 行,但最终我需要侧输入来对我的数据进行转换。

如何修复我的管道,以便我可以处理来自 GCS 的大型 csv 文件,并且仍然使用边输入作为文件的伪全局变量?

【问题讨论】:

*注意:这是在本地测试的。我在添加代码时一直在做增量测试。如果它在本地工作,那么我在 Google Cloud Dataflow 上运行它以确保它也在那里运行。如果它在 Cloud Dataflow 中有效,那么我会添加更多代码。 【参考方案1】:

我最近为 Apache Beam 编写了 CSV file source,并将其添加到 beam_utils PiPy 包中。具体可以如下使用:

    安装梁工具:pip install beam_utils 导入:from beam_utils.sources import CsvFileSource。 将其用作来源:beam.io.Read(CsvFileSource(input_file))

在其默认行为中,CsvFileSource 返回按标题索引的字典 - 但您可以查看文档来决定要使用的选项。

另外,如果你想实现自己的自定义CsvFileSource,你需要继承Beam的FileBasedSource

import csv
class CsvFileSource(beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)
    reader = csv.reader(self._file)
    for i, rec in enumerate(reader):
      yield res

您可以扩展此逻辑以解析标头和其他特殊行为。

另外,作为一个注释,这个源不能被拆分,因为它需要被顺序解析,所以在处理数据时它可能代表一个瓶颈(尽管这可能没问题)。

【讨论】:

嗨,Pablo,感谢您查看我的另一个问题。我已更改我的代码以使用您编写的 beam_utils CsvFileSource 并且事情似乎工作得更好。我知道不再需要使用给我带来麻烦的侧面输入,但你能告诉我我的问题可能是什么吗?只是为了让我明白发生了什么。 给我一点时间来检查断言发生的原因。 您需要添加一个 init 来明确说明它是可拆分的。即 super(CsvFileSource, s).__init__(filename, splittable=False)。如果没有,您可能会冒着多个工作人员一遍又一遍地阅读相同内容的风险,相信 read_records 中的 range_tracker 参数是受尊重的。

以上是关于Python Apache Beam 侧输入断言错误的主要内容,如果未能解决你的问题,请参考以下文章

Python 上的 Apache Beam 将 beam.Map 调用相乘

使用Apache-beam在Python中删除字典中的第一项[重复]

在 python Apache Beam 中打开一个 gzip 文件

无法使用 Apache Beam(Python SDK)读取 Pub/Sub 消息

如何从 PCollection Apache Beam Python 创建 N 个元素组

如何使用 Apache Beam (Python) 将多个嵌套的 JSON 写入 BigQuery 表