Pub/Sub Lite 延迟消费者

Posted

技术标签:

【中文标题】Pub/Sub Lite 延迟消费者【英文标题】:Pub/Sub Lite Delayed Consumer 【发布时间】:2021-12-28 22:21:26 【问题描述】:

我正在使用consumer.pause(<partitions>) 实现 Kafka 延迟主题消费。

Pub/Sub Kafka shim 将暂停变为 NoOp:

https://github.com/googleapis/java-pubsublite-kafka/blob/v0.6.7/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java#L590-L600

是否有任何文档说明如何将 pub sub lite 主题的消费延迟设定的持续时间?

即我想使用来自 Pub/Sub Lite 主题的所有消息,但有 4 分钟的综合延迟。

这是我使用 Kafka 原生的算法:

致电consumer.poll() 恢复所有分配的分区consumer.resume(consumer.assignment()) 将之前的 delayed 记录与最近轮询的记录结合起来 将记录分隔到 足以处理的记录 记录还太年轻,无法处理 为任何太年轻的记录暂停分区consumer.pause(<partitions of too young>) 保留一个太年轻的记录缓冲区,以便在下一次通过时重新考虑,称为delayed 处理足够旧的记录 冲洗,重复

我们只提交足够老的记录的偏移量,如果进程死亡,“太年轻”缓冲区中的任何记录都将保持未提交,并且在随后的重新平衡中接收分区的任何消费者都会重新访问它们。

此算法是否有更通用的形式可以与原生 Kafka 和 Pub/Sub Lite 一起使用?

编辑:CloudTasks 在这里是个坏主意,因为它断开了偏移提交链。我需要确保我只为从下游系统得到确认的记录提交偏移量。

【问题讨论】:

【参考方案1】:

如果您删除 pauseresume 阶段,与上述类似的操作可能会正常工作。我会注意到,对于这两个系统,您不能保证在任何给定的 poll() 调用中直到现在都接收到服务器上存在的所有消息,因此如果您没有为给定分区提供任何记录,您可能会增加额外的延迟投票电话。

如果您在启用自动提交的情况下执行以下操作,您应该有效地将处理延迟严格超过 4 分钟。

    致电consumer.poll() 睡到每记录 4 分钟前 流程记录 转到 1。

如果您使用手动提交,您可以使每条消息的睡眠时间接近 4 分钟,但缺点是需要手动管理偏移量:

    致电consumer.poll() 将记录放入有序的每个分区缓冲区中 睡眠直到任何分区的最早记录是过去 4 分钟 处理过去 4 分钟以上的记录 提交已处理记录的偏移量 去1

【讨论】:

睡眠不是我的选择。只有允许阻塞的调用是对轮询本身的调用。这只会在没有新消息的情况下阻塞。我们已经自己管理偏移量。 "只有被允许阻塞的调用是对自身进行轮询的调用。"这似乎是一个任意的约束;鉴于您已经在 poll() 调用中阻塞了线程,您应该重新考虑它。假设您无法控制此约束的另一种方法是计算轮询时间,因为制作最旧记录所需的时间是过去 4 分钟。当最早的记录是过去 4 分钟时,这将累积新消息或超时。尽管与睡眠方法相比,这具有内存开销。

以上是关于Pub/Sub Lite 延迟消费者的主要内容,如果未能解决你的问题,请参考以下文章

Jms:具有多个消费者的 Pub/Sub

redis pub sub 生产者可以当消费者吗

RabbitMQ:在 pub/sub 中,消费者是轮询队列以获取新消息还是服务器推送消息?

Redis pub/sub

列出 Pub/Sub 订阅使用者

如何通过 Google Cloud Monitoring JAVA 客户端库获取 Pub/Sub 订阅中未确认消息的数量