Pub/Sub Lite 延迟消费者
Posted
技术标签:
【中文标题】Pub/Sub Lite 延迟消费者【英文标题】:Pub/Sub Lite Delayed Consumer 【发布时间】:2021-12-28 22:21:26 【问题描述】:我正在使用consumer.pause(<partitions>)
实现 Kafka 延迟主题消费。
Pub/Sub Kafka shim 将暂停变为 NoOp:
https://github.com/googleapis/java-pubsublite-kafka/blob/v0.6.7/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java#L590-L600
是否有任何文档说明如何将 pub sub lite 主题的消费延迟设定的持续时间?
即我想使用来自 Pub/Sub Lite 主题的所有消息,但有 4 分钟的综合延迟。
这是我使用 Kafka 原生的算法:
致电consumer.poll()
恢复所有分配的分区consumer.resume(consumer.assignment())
将之前的 delayed
记录与最近轮询的记录结合起来
将记录分隔到
足以处理的记录
记录还太年轻,无法处理
为任何太年轻的记录暂停分区consumer.pause(<partitions of too young>)
保留一个太年轻的记录缓冲区,以便在下一次通过时重新考虑,称为delayed
处理足够旧的记录
冲洗,重复
我们只提交足够老的记录的偏移量,如果进程死亡,“太年轻”缓冲区中的任何记录都将保持未提交,并且在随后的重新平衡中接收分区的任何消费者都会重新访问它们。
此算法是否有更通用的形式可以与原生 Kafka 和 Pub/Sub Lite 一起使用?
编辑:CloudTasks 在这里是个坏主意,因为它断开了偏移提交链。我需要确保我只为从下游系统得到确认的记录提交偏移量。
【问题讨论】:
【参考方案1】:如果您删除 pause
和 resume
阶段,与上述类似的操作可能会正常工作。我会注意到,对于这两个系统,您不能保证在任何给定的 poll() 调用中直到现在都接收到服务器上存在的所有消息,因此如果您没有为给定分区提供任何记录,您可能会增加额外的延迟投票电话。
如果您在启用自动提交的情况下执行以下操作,您应该有效地将处理延迟严格超过 4 分钟。
-
致电
consumer.poll()
睡到每记录 4 分钟前
流程记录
转到 1。
如果您使用手动提交,您可以使每条消息的睡眠时间接近 4 分钟,但缺点是需要手动管理偏移量:
-
致电
consumer.poll()
将记录放入有序的每个分区缓冲区中
睡眠直到任何分区的最早记录是过去 4 分钟
处理过去 4 分钟以上的记录
提交已处理记录的偏移量
去1
【讨论】:
睡眠不是我的选择。只有允许阻塞的调用是对轮询本身的调用。这只会在没有新消息的情况下阻塞。我们已经自己管理偏移量。 "只有被允许阻塞的调用是对自身进行轮询的调用。"这似乎是一个任意的约束;鉴于您已经在 poll() 调用中阻塞了线程,您应该重新考虑它。假设您无法控制此约束的另一种方法是计算轮询时间,因为制作最旧记录所需的时间是过去 4 分钟。当最早的记录是过去 4 分钟时,这将累积新消息或超时。尽管与睡眠方法相比,这具有内存开销。以上是关于Pub/Sub Lite 延迟消费者的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ:在 pub/sub 中,消费者是轮询队列以获取新消息还是服务器推送消息?
如何通过 Google Cloud Monitoring JAVA 客户端库获取 Pub/Sub 订阅中未确认消息的数量