spring-cloud-stream-kafka 在应用程序启动后仅使用最新消息

Posted

技术标签:

【中文标题】spring-cloud-stream-kafka 在应用程序启动后仅使用最新消息【英文标题】:spring-cloud-stream-kafka Consume only latest messages after application startup 【发布时间】:2017-12-01 08:24:03 【问题描述】:

在我们的项目中,我们使用 spring-cloud-stream-binder-kafka 1.1.2 版与 kafka 集成。最近我们遇到了一种情况,我们的一项服务在启动后消耗了来自主题的旧消息(已经消耗)。该主题有 2 个分区和 2 个消费者分组在一个消费者组下。我们不确定偏移量是否正确提交给zookeeper。在启动过程中,每条消息都会抛出以下错误消息。

[-kafka-listener-2] ERROR o.s.k.listener.LoggingErrorHandler.handle - Error while processing: ConsumerRecord(topic = statemachine_deal_notification, partition = 1, offset = 926, key = null, value = [B@6fab0a32)

为了确保不再发生这种情况,我们一直希望只阅读主题中的最新消息。我发现将resetOffsets 设置为truestartOffset 设置为latest 就可以了。但这些属性不会影响消费者。后来发现这个功能已经下架了。

有没有其他方法可以确保特定组中的消费者只消费最新消息???.

【问题讨论】:

【参考方案1】:

这是一个错误,随后在此处修复:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/7355ada4613ad50fe95430f1859d4ea65f004be1。您可以试用 SNAPSHOT(1.2.2.BUILD-SNAPSHOT 或 1.3.0.BUILD-SNAPSHOT)版本来验证此修复。

【讨论】:

以上是关于spring-cloud-stream-kafka 在应用程序启动后仅使用最新消息的主要内容,如果未能解决你的问题,请参考以下文章