如何避免 writeToBq 步骤中数据流束管道中的重复?
Posted
技术标签:
【中文标题】如何避免 writeToBq 步骤中数据流束管道中的重复?【英文标题】:How to avoid duplication in a dataflow beam pipeline in the writeToBq step? 【发布时间】:2021-11-26 11:13:35 【问题描述】:我们有一项工作在 Dataflow 上,它从 Pub/Sub 提取数据并将其写入 BigQuery。在有限的数据量上,我们没有任何重复,但在我们当前的 100 evts/s 卷上,我们在 BigQuery 表中有重复。我们这里所说的副本是具有相同事件 uuid 的行。
这是我的代码:
class CustomParse(beam.DoFn):
""" Custom ParallelDo class to apply a custom transformation """
def to_runner_api_parameter(self, unused_context):
return "beam:transforms:custom_parsing:custom_v0", None
def process(self, message: beam.io.PubsubMessage, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
import uuid
data_parsed =
"data": message.data,
"dataflow_timestamp": timestamp.to_rfc3339(),
"uuid": uuid.uuid4().hex
yield data_parsed
def run():
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_subscription",
help='Input PubSub subscription of the form "projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'
)
parser.add_argument(
"--output_table", help="Output BigQuery Table"
)
known_args, pipeline_args = parser.parse_known_args()
additional_bq_parameters =
'timePartitioning': 'type': 'HOUR'
# Creating pipeline options
pipeline_options = PipelineOptions(pipeline_args)
def get_table_name(x):
namespace = NAMESPACE_EXTRACTED
date = x['dataflow_timestamp'][:10].replace('-', '')
return f"known_args.output_table_namespace_date"
# Defining our pipeline and its steps
p = beam.Pipeline(options=pipeline_options)
(
p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
subscription=known_args.input_subscription, timestamp_attribute=None, with_attributes=True
)
| "Prevent fusion" >> beam.transforms.util.Reshuffle()
| "CustomParse" >> beam.ParDo(CustomParse(broker_model))
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
table=get_table_name,
schema=BIGQUERY_SCHEMA,
additional_bq_parameters=additional_bq_parameters,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
batch_size=1000
)
)
pipeline_result = p.run()
if __name__ == "__main__":
run()
我们应该怎么做才能避免这种情况?我们是否缺少组合步骤?作为记录,它没有在错误之后发生。
【问题讨论】:
【参考方案1】:我缺少一些上下文(例如,您没有包含 BrokerParsing
转换),但根据您在此处包含的内容,问题似乎是您没有包含 @ ReadFromPubSub
转换中的 987654323@ 参数。根据documentation:
id_label – 传入 Pub/Sub 消息的属性,用作唯一记录标识符。指定后,此属性的值(可以是唯一标识记录的任何字符串)将用于消息的重复数据删除。如果未提供,我们无法保证不会在 Pub/Sub 流上传送重复数据。在这种情况下,流的重复数据删除将是严格的尽力而为。
【讨论】:
我已经正确地重命名了这个类,很抱歉。然而,令我感到奇怪的是,重复事件具有相同的 uuid,而 uuid 仅在 customParse 的 process 方法期间设置。因此,如果事件被处理两次,每个重复项应该有 2 个不同的 uuid 吗?非常感谢您的帮助。以上是关于如何避免 writeToBq 步骤中数据流束管道中的重复?的主要内容,如果未能解决你的问题,请参考以下文章