数据流作业 GCS 到 Pub/sub 最大批量大小

Posted

技术标签:

【中文标题】数据流作业 GCS 到 Pub/sub 最大批量大小【英文标题】:Dataflow Job GCS to Pub/sub Maximum batch size 【发布时间】:2020-12-15 01:34:27 【问题描述】:

我正在使用默认数据流模板 GCS 来发布/订阅。云存储中的输入文件,大小为 300MB,每个文件有 2-3 百万行。

启动数据流批处理作业时,会引发以下错误

来自工作人员的错误消息:javax.naming.SizeLimitExceededException:Pub/Sub 消息大小 (1089680070) 超过了最大批处理大小 (7500000) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Write$PubsubBoundedWriter.processElement(PubsubIO.java:1160)

来自文档:Pub/Sub 一个批次最多接受 1,000 条消息,并且一个批次的大小不能超过 10 兆字节。

这是否意味着我必须将输入文件拆分为 10MB 块或 1000 条消息才能发布?

将如此大的文件(每个 300MB)加载到 pubsub 的推荐方法是什么?

提前感谢您的帮助。

【问题讨论】:

您是否尝试过创建自定义模板并将“MaxBatchBytesSize”增加到更大的值? 【参考方案1】:

这是 Dataflow 端的一个已知限制,此时存在一个feature request 以增加批量大小的大小。使用 +1 按钮并为问题加注星标以跟踪它的进展。

我建议您查看此post,其中建议了解决方法。重要的是要考虑到此解决方法意味着修改 Cloud Storage Text to Pub/Sub 模板以实现其中提到的自定义转换。

另一方面,您可以尝试创建云功能来拆分您的文件,然后由 Dataflow 处理,我想是这样的:

    创建一个“暂存”存储桶来上传您的大文件。 写一个Cloud Function 来拆分您的文件并将小块写入另一个存储桶。您可以尝试使用filesplit Python 包来执行此操作。 每次使用Google Cloud Storage Triggers在“暂存”存储桶中上传新文件时,触发云函数运行。 将文件拆分成小块后,使用相同的 Cloud Function 从“暂存”存储桶中删除大文件以避免额外费用。 使用 Dataflow 模板 Cloud Storage Text to Pub/Sub 处理第二个存储桶的小块。

【讨论】:

以上是关于数据流作业 GCS 到 Pub/sub 最大批量大小的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Python 中创建从 Pub/Sub 到 GCS 的数据流管道

如何保护由 GCS 触发的 App Engine Pub/Sub Push Endpoint?

如何从 Dataflow 批量(有效)发布到 Pub/Sub?

将流转换为小批量以加载到 bigquery

如何从 Google Pub/Sub 获取 objectId、bucketId 等

使用数据流的 GCS 文件流式传输(apachebeam python)