Spring Cloud Stream 3.x - 重播消息策略
Posted
技术标签:
【中文标题】Spring Cloud Stream 3.x - 重播消息策略【英文标题】:Spring Cloud Stream 3.x - replay message strategy 【发布时间】:2020-12-07 21:26:06 【问题描述】:我正在寻找有关使用 Spring Cloud Stream 3.x / Kafka binder implementations 从 Kafka 主题重播消息策略的一些指导 -
重播特定消息[例如。通过时间戳窗口]来自一个主题。如何重置消费者组中所有或部分消费者的偏移量?
是否可以从主题的特定分区重播[如果我们知道我们有兴趣重播的消息的分区]?
一般来说,关于消息重播的最佳做法是什么。感谢您的宝贵时间。
【问题讨论】:
【参考方案1】:添加一个重新平衡侦听器 bean,它将被连接到 binder...
@Bean
KafkaBindingRebalanceListener rebal()
return new KafkaBindingRebalanceListener()
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial)
consumer.seekToBeginning(partitions);
;
您可以使用任何消费者查找操作;您也可以拨打consumer.offsetsForTimes(...)
等。
initial
标志对于第一次重新平衡为 true,对于其他标志为 false。
【讨论】:
这需要重新启动对吗?有没有办法强制给定的侦听器重新平衡(例如,我想进行 REST 调用 - POST /api/kafka/reset 并提供分区 ID 和新偏移量) 您不必重新启动整个应用程序。如果您有(或添加)Boot actuator starter 到类路径,您可以使用BindingsEndpoint.changeState()
停止和重新启动绑定。以上是关于Spring Cloud Stream 3.x - 重播消息策略的主要内容,如果未能解决你的问题,请参考以下文章
spring cloud-stream 和 spring cloud-bus 有啥区别?
Spring Cloud(12)——基于Kafka的Stream实现
Spring Cloud 2020.0.0 中的 Spring Cloud Bus/Stream 问题
spring cloud 2.x版本 Spring Cloud Stream消息驱动组件基础教程(kafaka篇)
使用 spring-boot:1.5.1 和 spring-cloud-stream 时无法启动 bean 'inputBindingLifecycle'