如何避免 writeToBq 步骤中数据流束管道中的重复?

Posted

技术标签:

【中文标题】如何避免 writeToBq 步骤中数据流束管道中的重复?【英文标题】:How to avoid duplication in a dataflow beam pipeline in the writeToBq step? 【发布时间】:2021-11-26 11:13:35 【问题描述】:

我们有一项工作在 Dataflow 上,它从 Pub/Sub 提取数据并将其写入 BigQuery。在有限的数据量上,我们没有任何重复,但在我们当前的 100 evts/s 卷上,我们在 BigQuery 表中有重复。我们这里所说的副本是具有相同事件 uuid 的行。

这是我的代码:


class CustomParse(beam.DoFn):
    """ Custom ParallelDo class to apply a custom transformation """

    def to_runner_api_parameter(self, unused_context):
        return "beam:transforms:custom_parsing:custom_v0", None

    def process(self, message: beam.io.PubsubMessage, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
        import uuid
        data_parsed = 
            "data": message.data,
            "dataflow_timestamp": timestamp.to_rfc3339(),
            "uuid": uuid.uuid4().hex
        
        
        yield data_parsed


def run():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_subscription",
        help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'
    )
    parser.add_argument(
        "--output_table", help="Output BigQuery Table"
    )

    known_args, pipeline_args = parser.parse_known_args()
    additional_bq_parameters = 
        'timePartitioning': 'type': 'HOUR'

    # Creating pipeline options
    pipeline_options = PipelineOptions(pipeline_args)

    def get_table_name(x):
        namespace = NAMESPACE_EXTRACTED
        date = x['dataflow_timestamp'][:10].replace('-', '')
        return f"known_args.output_table_namespace_date"

    # Defining our pipeline and its steps
    p = beam.Pipeline(options=pipeline_options)
    (
            p
            | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
                subscription=known_args.input_subscription, timestamp_attribute=None, with_attributes=True
            )
            | "Prevent fusion" >> beam.transforms.util.Reshuffle()
            | "CustomParse" >> beam.ParDo(CustomParse(broker_model))
            | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
                table=get_table_name,
                schema=BIGQUERY_SCHEMA,
                additional_bq_parameters=additional_bq_parameters,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                batch_size=1000
            )
    )
    pipeline_result = p.run()


if __name__ == "__main__":
    run()

我们应该怎么做才能避免这种情况?我们是否缺少组合步骤?作为记录,它没有在错误之后发生。

【问题讨论】:

【参考方案1】:

我缺少一些上下文(例如,您没有包含 BrokerParsing 转换),但根据您在此处包含的内容,问题似乎是您没有包含 @ ReadFromPubSub 转换中的 987654323@ 参数。根据documentation:

id_label – 传入 Pub/Sub 消息的属性,用作唯一记录标识符。指定后,此属性的值(可以是唯一标识记录的任何字符串)将用于消息的重复数据删除。如果未提供,我们无法保证不会在 Pub/Sub 流上传送重复数据。在这种情况下,流的重复数据删除将是严格的尽力而为。

【讨论】:

我已经正确地重命名了这个类,很抱歉。然而,令我感到奇怪的是,重复事件具有相同的 uuid,而 uuid 仅在 customParse 的 process 方法期间设置。因此,如果事件被处理两次,每个重复项应该有 2 个不同的 uuid 吗?非常感谢您的帮助。

以上是关于如何避免 writeToBq 步骤中数据流束管道中的重复?的主要内容,如果未能解决你的问题,请参考以下文章

weblogic束管server报错

在 beam.io.writetobigquery 中使用模式更新选项

《三衢道中》曾几

丑奴儿·书博山道中壁

禅道中怎么向bug状态里多添加几个状态

禅道使用之产品经理篇