ReadFromPubSub->CloudStorage->BigQuery:订阅的大小永远不会减少,而且似乎只有 0.002% 的订阅到达 BigQuery

Posted

技术标签:

【中文标题】ReadFromPubSub->CloudStorage->BigQuery:订阅的大小永远不会减少,而且似乎只有 0.002% 的订阅到达 BigQuery【英文标题】:ReadFromPubSub->CloudStorage->BigQuery: Subscription never decreases in size and only 0.002% seemingly arrive in BigQuery 【发布时间】:2020-07-03 16:03:03 【问题描述】:

管道正在使用ReadFromPubSub 源来读取指向 Cloud Storage blob 的链接,读取存储在每个文件中的事件,然后将它们插入 BigQuery:

with beam.Pipeline(options=pipeline_options) as pipeline:
    dlq = DeadletterQueue(known_args.output_dlq)

    pipeline = (
        pipeline
        | "Read PubSub Messages"
        >> beam.io.ReadFromPubSub(
            topic=topic,
            id_label="messageId",
        )
        | "Read Records" >> ReadCloudStorageEvents(deadletter_queue=dlq)
        | "Parse Events" >> ParseEventRecords(deadletter_queue=dlq)
        | "window events" >> WindowOnTimeAndSize(60, 10)
        | "Upload To BigQuery" >> BigQuerySink(project, deadletter_queue=dlq)
    )

问题在于,即使从 PubSub 以极好的速率消耗项目,并且以同样好的速率从云存储中读取文件,它们也根本没有在 bigquery 附近的任何位置插入到 BigQuery流媒体限制。

数据新鲜度和系统延迟不断攀升:。

这样做的副作用是队列项不会被删除。

BigQuerySink 本质上是这样的:

class BigQuerySink(beam.PTransform):
    def __init__(self, project: str, deadletter_queue: beam.PTransform):
        self.deadletter_queue = deadletter_queue
        self.project = project

    def expand(self, pcoll):
        def yield_from(events: Iterable[Dict]) -> Iterable[Dict]:
            for event in events:
                yield event

        pcoll = (
            pcoll
            | "flatten events" >> beam.FlatMap(yield_from)
            | f"push events to BigQuery"
            >> beam.io.WriteToBigQuery(
                table=lambda event: f"self.project:events_v2.event['type']",
                schema=lambda table: load_schema(table.split(".")[-1]),
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
                validate=True,
                additional_bq_parameters=
                    "clustering": "fields": ["accountId"],
                    "timePartitioning": "type": "DAY", "field": "receivedAt",
                ,
            )
        )

        # # https://***.com/questions/59102519/monitoring-writetobigquery
        pcoll[
            beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS
        ] | "Map to Error" >> beam.Map(
            lambda x: Error(
                message=f"BigQuery exception",
                record="destination": x[0], "index": x[1],
                data=None,
                stacktrace=None,
            )
            | self.deadletter_queue
        )

使用 200.000 个元素进行测试,引用总共包含大约 10,000,000 个事件的文件导致只有大约 0.002% 的文件进入 BigQuery。我们离配额很近,我没有看到任何错误或任何东西(有时当字段与架构不匹配时,我会看到一些错误,但我什么也没看到)。

非常欢迎任何有关确定问题所在的见解。是否有地方可以查看项目是否在 BigQuery 端未通过某些验证,因此未在订阅中删除?

似乎是瓶颈的步骤是_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/Map(reify_timestamps),如下图所示。接近 1000 MB 进入该步骤,只有 5 MB 退出(如果我正确阅读图表):

更新:WindowOnTimeAndSize

class WindowOnTimeAndSize(beam.PTransform):
    """A composite transform that groups Pub/Sub messages based on publish
    time and outputs a list of dictionaries, where each contains one message
    and its publish timestamp.
    """

    def __init__(self, window_size_seconds, after_count):
        # Convert minutes into seconds.
        self.window_size = int(window_size_seconds)
        self.after_count = after_count

    def expand(self, pcoll):
        return (
            pcoll
            # Assigns window info to each Pub/Sub message based on its
            # publish timestamp.
            | "Window into Fixed Intervals"
            >> beam.WindowInto(
                beam.window.FixedWindows(self.window_size),
                trigger=Repeatedly(
                    AfterAny(
                        AfterCount(self.after_count),
                        AfterProcessingTime(self.window_size),
                    )
                ),
                accumulation_mode=AccumulationMode.DISCARDING,
            )
        )

【问题讨论】:

也许值得重新考虑您的解决方案并简化它。您实际上并不需要 Pub/Sub 或 Dataflow。当您不需要时,您正在创建额外的跃点,例如读取 GCS 中文件的内容,然后插入到 BigQuery。相反,您可以让云函数监听文件到达时触发的存储桶。发生这种情况时,Cloud Function 会将文件直接加载到 BigQuery 中。这符合您的要求吗? 一个很好的观点。之所以如此,是因为在创建时间获取事件对于我们的目的来说是不够的。我们需要不时重播数据(迁移、新字段很有趣等),因此使用我们控制的格式通过 PubSub 提供事件允许我们这样做。即使我们这样做了,但问题仍然存在,因为瓶颈似乎是在读取文件之后和 WriteToBiguery 内部(我认为) 但我确实错过了您的 Cloud Function 参数。我想我担心当我们开始将剩下的数据扔到这个主题上时,我们基本上会有一个永远触发的云功能,尤其是在非常预期的重放情况下。根据您的经验,成本权衡是什么? 另一种选择是将您的 Dataflow 管道直接指向 GCS,它会在文件到达后读取文件并将其写入 BigQuery。 我看到的问题是,如果我们想重播这些事件(在 GCS 文件中,如果不通过 PubSub 或其他一些中间层,你将如何触发它你可以用一些脚本来打吗? 【参考方案1】:

WriteToBigQuery 进行了一些更改,导致转换非常缓慢。在 Beam 2.24.0(几周后即将推出)中,转换应该能够达到更高的性能(我已经测试了每个 CPU 约 500-600 EPS)。

很抱歉给您带来麻烦。您可以立即使用代码,或者等待几周的 Beam 2.24.0。

【讨论】:

这是个好消息@pablo,您之前在数据集上看到的数字是多少?只是想在这里比较数字。加速百分比是多少? 我看到工人的每股收益非常低(约 100 或更少) 好的,我会监控 Beam 版本的更新 - 谢谢。 我可以知道 2.25.0 版本的价格吗?我有 28 名工人(=48cpus),但只是以 500~800 EPS 的速度流式传输到 BQ【参考方案2】:

您最好的资源可能是数据流监控控制台,特别是当viewing the pipeline 时,您可以单击各个步骤来确定导致延迟的步骤。不要忘记,可以通过单击小 V 形符号来展开复合变换,以深入了解有问题的部分。

【讨论】:

是的,抱歉,我应该提到现在似乎出现瓶颈的点是在我几乎无法控制的beam.io.WriteToBigQuery 内部。所以我假设这与我是否在做窗口有关,这就是问题所在。 我用信息和似乎是瓶颈的屏幕截图更新了原始帖子 好像是这个Reshuffle GBK。 WindowOnTimeAndSize 中使用的窗口是什么? 嗨@robertwb,我已经用它更新了原始帖子,但我在 60 秒时开窗,但在吞吐量极高的情况下,我想使用 @987654324 输出每 10 个事件@ 和 AfterAny。主要是因为最初的 60 秒确实没有什么不同。

以上是关于ReadFromPubSub->CloudStorage->BigQuery:订阅的大小永远不会减少,而且似乎只有 0.002% 的订阅到达 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

Paper Clouds 浮云

HR_Jumping on the Clouds

Associatively Segmenting Instances and Semantics in Point Clouds

Unity3d Realtime Dynamic Volume Clouds Rendering

论文阅读DGCNN:Dynamic Graph CNN for Learning on Point Clouds

自动驾驶 11-2: 激光雷达传感器模型和点云 LIDAR Sensor Models and Point Clouds