Kafka - 如何在使用高级消费者的每条消息后提交偏移量?

Posted

技术标签:

【中文标题】Kafka - 如何在使用高级消费者的每条消息后提交偏移量?【英文标题】:Kafka - How to commit offset after every message using High-Level consumer? 【发布时间】:2014-10-07 06:04:40 【问题描述】:

我正在使用 Kafka 的高级消费者。因为我将 K​​afka 用作我的应用程序的“事务队列”,所以我需要绝对确保我不会错过或重新阅读任何消息。我对此有 2 个问题:

    如何将偏移量提交给 zookeeper?在每条消息成功消费后,我将关闭自动提交和提交偏移量。我似乎无法找到如何使用高级消费者执行此操作的实际代码示例。谁能帮我解决这个问题?

    另一方面,我听说提交给 zookeeper 可能会很慢,所以另一种方法可能是在本地跟踪偏移量?这种替代方法是否可取?如果是,您会如何处理?

【问题讨论】:

【参考方案1】:

http://kafka.apache.org/documentation.html#consumerconfigs有两个相关设置。

auto.commit.enable

auto.commit.interval.ms

如果您想将其设置为消费者在每条消息后提交偏移量,这将很困难,因为唯一的设置是在计时器间隔之后,而不是在每条消息之后。您必须对传入消息进行一些速率预测并相应地设置时间。

一般来说,不建议将此间隔保持得太小,因为它会极大地增加 zookeeper 中的读/写速率,而 zookeeper 会变慢,因为它在其仲裁中具有很强的一致性。

【讨论】:

你觉得使用 commitOffsets() 方法怎么样? 好点!我认为这也可行,但问题是消费者连接器可以同时提交许多不同流的偏移量,而您无法真正控制要提交哪些流。如果您想同时提交所有内容,那么这确实有效。谢谢!我不知道这件事。当然,这只会加剧频繁写入 zookeeper 的问题。【参考方案2】:

您可以先禁用自动提交:auto.commit.enable=false

获取消息后提交:consumer.commitOffsets(true)

【讨论】:

以上是关于Kafka - 如何在使用高级消费者的每条消息后提交偏移量?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 实现动态goupId 实现广播消息

Kafka Consumer(消费者组)

kafkaStream

7张图了解kafka基本概念

Kafka - 使用高级消费者的延迟队列实现

Spring Cloud Stream Kafka 消费者模式