PubsubIO , msg 超过最大大小,如何执行错误处理

Posted

技术标签:

【中文标题】PubsubIO , msg 超过最大大小,如何执行错误处理【英文标题】:PubsubIO , msg exceeding max size, how to perform error handling 【发布时间】:2019-06-09 09:21:26 【问题描述】:

我们在 GCP Dataflow 中运行管道,并遇到了 pubsub 消息 [1] 的最大消息大小 当这种情况发生时,管道延迟时间将开始增加,最终停止运行......

此日志消息是在 GCP 堆栈驱动程序中的“dataflow_step”下生成的,

我的问题,有没有办法在管道中定义错误处理...

.apply(PubsubIO.writeMessages()
                        .to("topic")
                        .withTimestampAttribute(Instant.now().toString()));

类似

.onError(...perform error handling ...)

以与 Java8 流 api 类似的流畅方式。这将允许管道继续使用 pubsub 限制内的输出。

非常欢迎使用其他解决方案来处理这种情况。

谢谢你, 克里斯托夫·布希耶

[1] 由于验证错误而无法提交请求:generic::invalid_argument: Pubsub 发布请求限制为 10MB,拒绝超过 7MB 的消息以避免超出 byte64 请求编码的限制。

【问题讨论】:

【参考方案1】:

对于 Dataflow 上的 PubsubIO 的特殊情况,请注意 Dataflow 会覆盖 PubsubIO 并处理对 Pubsub 的读取和写入消息,作为其流式实现的一部分。由于这种替换,我看到您正在讨论的相同错误显示在“shuffler”而不是“worker”下的日志中。

我通过在 PubsubIO.write() 步骤之前实现自定义转换来解决相同的问题。此 LimitPayloadSize 转换仅检查 PubsubMessage 中有多少字节,并且只允许通过有效负载小于 7 MB 的消息。

目前还没有一个流畅的 API 用于转换中的错误处理,尽管这已经讨论过了。目前,公认的模式是定义具有多个输出集合的转换,然后将失败消息的集合写入其他地方(例如通过 FileIO 的 GCS)。您可以将其实现为裸 DoFn,或者您可以查看 Partition:

PCollectionList<PubsubMessage> limitedPayloads = input
        .apply("Limit payload size",
                Partition
                        .of(2, new PartitionFn<PubsubMessage>() 
  public int partitionFor(PubsubMessage message, int numPartitions) 
    return message.getPayload().size < 7 * 1000 * 1000 ? 0 : 1;
  
));
limitedPayloads.get(0).apply(PubsubIO.write()...);
limitedPayloads.get(1).apply(FileIO.write()...);

【讨论】:

以上是关于PubsubIO , msg 超过最大大小,如何执行错误处理的主要内容,如果未能解决你的问题,请参考以下文章

如何在不超过最大文档大小的情况下编写聚合?

超过 (65536) 时如何增加 WCF 服务的传入消息的最大消息大小配额

Laravel - 超过 PHP 最大上传大小限制时验证文件大小

如何更改最大文件大小上传?

如何设置/配置solr索引文件的最大大小?

Dataflow 何时确认来自 PubSubIO 的批处理项目消息?