Apache Beam:如果前面有状态转换,WriteToBigQuery 将不起作用,除非应用重新窗口化

Posted

技术标签:

【中文标题】Apache Beam:如果前面有状态转换,WriteToBigQuery 将不起作用,除非应用重新窗口化【英文标题】:Apache Beam : WriteToBigQuery does not work if preceded by a stateful Transform unless re-windowing is applied 【发布时间】:2021-06-03 06:40:48 【问题描述】:

我有一个无法按预期工作的简单管道,我找到的任何文档都没有解释它的行为。简而言之,流模式下的 WriteToBigQuery 如果前面有状态转换(如 GroupIntoBatches)会失败,除非在将其流式传输到 BQ 之前重新应用全局窗口。有没有人有有意义的解释?

这确实有效:

    result = (
        p
        | "Read Data"
            >> beam.io.ReadFromPubSub(
                subscription="projects/myproject/subscriptions/mysubscription",
                with_attributes=False,
            ).with_output_types(bytes)
        | "Decompose Data" >> beam.ParDo(DecomposeData())
        | "Window into Fixed Intervals"
                >> beam.WindowInto(window.FixedWindows(self.window_size))
        | "Transform Data" >> beam.ParDo(TransformData())
        | "Write to BigQuery Table" >> beam.io.WriteToBigQuery(
            table=_choose_table,
            schema=_choose_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

这失败了:

    result = (
        p
        | "Read Data"
            >> beam.io.ReadFromPubSub(
                subscription="projects/myproject/subscriptions/mysubscription",
                with_attributes=False,
            ).with_output_types(bytes)
        | "Decompose Data" >> beam.ParDo(DecomposeData())
        | "Window into Fixed Intervals"
                >> beam.WindowInto(window.FixedWindows(self.window_size))
        | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
        | "Group into Batches"
            >> beam.GroupIntoBatches(
                max_buffering_duration_secs=self.window_size,
                batch_size=self.batch_size,
            )
        | "Discard Dummy Key" >> beam.MapTuple(lambda _, val: val)**
        | "Transform Data" >> beam.ParDo(TransformData())
        | "Write to BigQuery Table" >> beam.io.WriteToBigQuery(
            table=_choose_table,
            schema=_choose_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

这又可以了:

    result = (
        p
        | "Read Data"
            >> beam.io.ReadFromPubSub(
                subscription="projects/myproject/subscriptions/mysubscription",
                with_attributes=False,
            ).with_output_types(bytes)
        | "Decompose Data" >> beam.ParDo(DecomposeData())
        | "Window into Fixed Intervals"
                >> beam.WindowInto(window.FixedWindows(self.window_size))
        | "Add Dummy Key" >> beam.Map(lambda elem: (None, elem))
        | "Group into Batches"
            >> beam.GroupIntoBatches(
                max_buffering_duration_secs=self.window_size,
                batch_size=self.batch_size,
            )
        | "Discard Dummy Key" >> beam.MapTuple(lambda _, val: val)
        | "Transform Data" >> beam.ParDo(TransformData())
        | "Re-window" >> beam.WindowInto(window.GlobalWindows())
        | "Write to BigQuery Table" >> beam.io.WriteToBigQuery(
            table=_choose_table,
            schema=_choose_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

【问题讨论】:

管道如何失败?是否有任何随附的错误消息?我还看到您在其他地方提到您正在使用 DirectRunner。您是否检查过错误是否发生在其他跑步者身上? 【参考方案1】:

这很奇怪,因为我使用相同的方法,在 WriteToBigQuery 之前我正在批处理元素。这就是我所拥有的:

        tagged_events = (
            pcoll
            | "Key events" >> WithKeys(add_keys)
            | "With timestamps" >> ParDo(AddTimestamp())
            | "Window"
            >> WindowInto(
                windowfn=FixedWindows(self.window_interval_seconds),
                allowed_lateness=self.window_lateness_seconds,
            )
            | "Get latest elements per key" >> combiners.Latest.PerKey()
            | "Reshuffle elements" >> Reshuffle()
            | "Prepare for insertion" >> FlatMapTuple(prepare_for_insert)
            | "Tag Value" >> ParDo(TagTableValue()).with_outputs(*BQ_TABLES)
        )

        for table in BQ_TABLES:
            errors = (
                tagged_events[table]
                | f"Reshuffle to table" >> Reshuffle()
                | f"Batch  table elements"
                >> BatchElements(
                    min_batch_size=500, max_batch_size=500
                )
                | f"Flatten batch of table" >> FlatMap(flatten_list)
                | f"Write to table" >> WriteToBigQuery(
                    table=f"self.project:self.dataset.table",
                    create_disposition=BigQueryDisposition.CREATE_NEVER,
                    write_disposition=BigQueryDisposition.WRITE_APPEND,
                    insert_retry_strategy=RetryStrategy.RETRY_NEVER,
                    ignore_insert_ids=True,
                    batch_size=BIGQUERY_INSERT_MAX_BATCH_SIZE,
                )
            )

需要注意的是,批处理会产生一个批处理,这意味着要插入的元素列表,但 BigQuery 接收器需要单个元素。这就是为什么我在插入之前有这个 Flatten 步骤。

【讨论】:

确实,我已经缩写了代码,但我确实有一个步骤,在将列表提供给 BQ 之前,我还将它们分解为单行。顺便说一句,即使使用 BatchElements 它也不起作用。为了完整起见,这个管道在 DirectRunner 上运行,我必须使用 Apache Beam 2.30rc1 来使其部分工作(在 2.29 上会弹出与 BQ 相关的类型转换错误)。 嗯,我用 2.28 运行这个,还没有更新到 2.29... 这里要补充一点,我确实尝试更新到 2.29.0,但我遇到了 BigQuery 接收器的一些问题,这是我的性能问题。 Google 支持工程师建议等待 2.30.0 发布并跳过 2.29.0。所以也许尝试降级?不确定 Dataflow 是否支持 2.30rc1...

以上是关于Apache Beam:如果前面有状态转换,WriteToBigQuery 将不起作用,除非应用重新窗口化的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam 管道中的连续状态

Apache Beam 处理“路由”的方式是啥

有没有办法使用 ReadFromText 转换(Python)在 Apache Beam 中读取多行 csv 文件?

无法在 DataFlow Apache Beam 中创建通用日期转换类

如何使用 BigQuery 和 Apache Beam 将 SQL 表转换为行序列列表?

Apache Beam - 跳过管道步骤