Spring Cloud Stream 3.x - 重播消息策略

Posted

技术标签:

【中文标题】Spring Cloud Stream 3.x - 重播消息策略【英文标题】:Spring Cloud Stream 3.x - replay message strategy 【发布时间】:2020-12-07 21:26:06 【问题描述】:

我正在寻找有关使用 Spring Cloud Stream 3.x / Kafka binder implementations 从 Kafka 主题重播消息策略的一些指导 -

    重播特定消息[例如。通过时间戳窗口]来自一个主题。如何重置消费者组中所有或部分消费者的偏移量?

    是否可以从主题的特定分区重播[如果我们知道我们有兴趣重播的消息的分区]?

一般来说,关于消息重播的最佳做法是什么。感谢您的宝贵时间。

【问题讨论】:

【参考方案1】:

添加一个重新平衡侦听器 bean,它将被连接到 binder...

@Bean
KafkaBindingRebalanceListener rebal() 
    return new KafkaBindingRebalanceListener() 

        @Override
        public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
                Collection<TopicPartition> partitions, boolean initial) 

            consumer.seekToBeginning(partitions);
        

    ;

您可以使用任何消费者查找操作;您也可以拨打consumer.offsetsForTimes(...)等。

initial 标志对于第一次重新平衡为 true,对于其他标志为 false。

【讨论】:

这需要重新启动对吗?有没有办法强制给定的侦听器重新平衡(例如,我想进行 REST 调用 - POST /api/kafka/reset 并提供分区 ID 和新偏移量) 您不必重新启动整个应用程序。如果您有(或添加)Boot actuator starter 到类路径,您可以使用BindingsEndpoint.changeState() 停止和重新启动绑定。

以上是关于Spring Cloud Stream 3.x - 重播消息策略的主要内容,如果未能解决你的问题,请参考以下文章

spring cloud stream

spring cloud-stream 和 spring cloud-bus 有啥区别?

Spring Cloud(12)——基于Kafka的Stream实现

Spring Cloud 2020.0.0 中的 Spring Cloud Bus/Stream 问题

spring cloud 2.x版本 Spring Cloud Stream消息驱动组件基础教程(kafaka篇)

使用 spring-boot:1.5.1 和 spring-cloud-stream 时无法启动 bean 'inputBindingLifecycle'