Kafka - 如何在使用高级消费者的每条消息后提交偏移量?
Posted
技术标签:
【中文标题】Kafka - 如何在使用高级消费者的每条消息后提交偏移量?【英文标题】:Kafka - How to commit offset after every message using High-Level consumer? 【发布时间】:2014-10-07 06:04:40 【问题描述】:我正在使用 Kafka 的高级消费者。因为我将 Kafka 用作我的应用程序的“事务队列”,所以我需要绝对确保我不会错过或重新阅读任何消息。我对此有 2 个问题:
如何将偏移量提交给 zookeeper?在每条消息成功消费后,我将关闭自动提交和提交偏移量。我似乎无法找到如何使用高级消费者执行此操作的实际代码示例。谁能帮我解决这个问题?
另一方面,我听说提交给 zookeeper 可能会很慢,所以另一种方法可能是在本地跟踪偏移量?这种替代方法是否可取?如果是,您会如何处理?
【问题讨论】:
【参考方案1】:http://kafka.apache.org/documentation.html#consumerconfigs有两个相关设置。
auto.commit.enable
和
auto.commit.interval.ms
如果您想将其设置为消费者在每条消息后提交偏移量,这将很困难,因为唯一的设置是在计时器间隔之后,而不是在每条消息之后。您必须对传入消息进行一些速率预测并相应地设置时间。
一般来说,不建议将此间隔保持得太小,因为它会极大地增加 zookeeper 中的读/写速率,而 zookeeper 会变慢,因为它在其仲裁中具有很强的一致性。
【讨论】:
你觉得使用 commitOffsets() 方法怎么样? 好点!我认为这也可行,但问题是消费者连接器可以同时提交许多不同流的偏移量,而您无法真正控制要提交哪些流。如果您想同时提交所有内容,那么这确实有效。谢谢!我不知道这件事。当然,这只会加剧频繁写入 zookeeper 的问题。【参考方案2】:您可以先禁用自动提交:auto.commit.enable=false
获取消息后提交:consumer.commitOffsets(true)
【讨论】:
以上是关于Kafka - 如何在使用高级消费者的每条消息后提交偏移量?的主要内容,如果未能解决你的问题,请参考以下文章