数据流作业 GCS 到 Pub/sub 最大批量大小
Posted
技术标签:
【中文标题】数据流作业 GCS 到 Pub/sub 最大批量大小【英文标题】:Dataflow Job GCS to Pub/sub Maximum batch size 【发布时间】:2020-12-15 01:34:27 【问题描述】:我正在使用默认数据流模板 GCS 来发布/订阅。云存储中的输入文件,大小为 300MB,每个文件有 2-3 百万行。
启动数据流批处理作业时,会引发以下错误
来自工作人员的错误消息:javax.naming.SizeLimitExceededException:Pub/Sub 消息大小 (1089680070) 超过了最大批处理大小 (7500000) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Write$PubsubBoundedWriter.processElement(PubsubIO.java:1160)
来自文档:Pub/Sub 一个批次最多接受 1,000 条消息,并且一个批次的大小不能超过 10 兆字节。
这是否意味着我必须将输入文件拆分为 10MB 块或 1000 条消息才能发布?
将如此大的文件(每个 300MB)加载到 pubsub 的推荐方法是什么?
提前感谢您的帮助。
【问题讨论】:
您是否尝试过创建自定义模板并将“MaxBatchBytesSize”增加到更大的值? 【参考方案1】:这是 Dataflow 端的一个已知限制,此时存在一个feature request 以增加批量大小的大小。使用 +1 按钮并为问题加注星标以跟踪它的进展。
我建议您查看此post,其中建议了解决方法。重要的是要考虑到此解决方法意味着修改 Cloud Storage Text to Pub/Sub 模板以实现其中提到的自定义转换。
另一方面,您可以尝试创建云功能来拆分您的文件,然后由 Dataflow 处理,我想是这样的:
-
创建一个“暂存”存储桶来上传您的大文件。
写一个Cloud Function 来拆分您的文件并将小块写入另一个存储桶。您可以尝试使用filesplit Python 包来执行此操作。
每次使用Google Cloud Storage Triggers在“暂存”存储桶中上传新文件时,触发云函数运行。
将文件拆分成小块后,使用相同的 Cloud Function 从“暂存”存储桶中删除大文件以避免额外费用。
使用 Dataflow 模板 Cloud Storage Text to Pub/Sub 处理第二个存储桶的小块。
【讨论】:
以上是关于数据流作业 GCS 到 Pub/sub 最大批量大小的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Python 中创建从 Pub/Sub 到 GCS 的数据流管道
如何保护由 GCS 触发的 App Engine Pub/Sub Push Endpoint?
如何从 Dataflow 批量(有效)发布到 Pub/Sub?