Spring Cloud Stream 手动偏移管理

Posted

技术标签:

【中文标题】Spring Cloud Stream 手动偏移管理【英文标题】:Spring cloud stream manual offset management 【发布时间】:2017-03-30 07:49:40 【问题描述】:

我可以使用 Spring Cloud Steam 实现手动 Kafka 偏移管理,如下所示:

    每当我的消费者处理消息时,它都会将其偏移量提交到数据库中。不喜欢卡夫卡 每当我的消费者重新启动时,它都会从数据库中读取最后处理的偏移量,寻找该偏移量并开始处理下一条消息。

【问题讨论】:

【参考方案1】:

Spring Cloud Stream 允许您在消费者应用程序中手动确认消息。不知道为什么要将偏移量保存到数据库中(可能是您的需要)。但我想知道使用手动确认模式是否有助于您的情况。

您可以通过将spring.cloud.stream.kafka.bindings.<inboundChannelName>.consumer.autoCommitOffset 设置为 false 来使用手动确认模式,并仅在消费者处理完消息后手动确认消息。你可以找到一个例子here

【讨论】:

问题在于重启。我想严格控制消费者重启后从哪里开始,以便实现一次性交付。 结合手动确认模式,在消费者重启时从latest偏移量开始,是不是有助于实现exactly-once投递? 底层 spring-kafka 项目(使用 kafka 10 时)通过 seek mechanism 支持此功能,但 Spring Cloud Stream 目前并未将其作为功能公开。 在 Kafka 活页夹中有一个未解决的问题:github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/… 但是请注意,重置偏移量和重播仅适用于单个使用者应用程序(或在 Spring Cloud Stream 中用于静态分配的分区) - 否则它最终会在多实例启动时处于竞争状态.

以上是关于Spring Cloud Stream 手动偏移管理的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Stream Kafka 消费者模式

确认Spring Cloud Stream中的消息

Spring Cloud Stream功能手册确认 - KafkaHeaders.ACKNOWLEDGMENT不可用。

spring-cloud-stream 请求-回复消息模式

spring cloud stream

spring cloud-stream 和 spring cloud-bus 有啥区别?