@StreamListener 为 kafka 定义 groupId-如何为同一主题设置多个消费者

Posted

技术标签:

【中文标题】@StreamListener 为 kafka 定义 groupId-如何为同一主题设置多个消费者【英文标题】:@StreamListener define groupId for kafka- How can I set many consumers for the same topic 【发布时间】:2022-01-23 13:53:56 【问题描述】:

我在我的应用程序中使用 java、springboot 和 Kafka。

我想在 Kafka 中为同一个主题定义多个消费者。 现在,我在我的应用程序属性文件中定义组 ID:

spring.cloud.stream.bindings.myFirstTopic.destination=my-first-topic
spring.cloud.stream.bindings.myFirstTopic.group=my-first-consumer

关于我使用注解的方法:

@StreamListener(MyFirstTopicBinding.MY_FIRST_TOPIC)
    public void firstConsumer(@Payload MessageDto dto) 

@StreamListener(MyFirstTopicBinding.MY_FIRST_TOPIC)
    public void secondConsumer(@Payload MessageDto dto) 

我希望这两种方法都能得到相同的消息....

我该怎么做?

【问题讨论】:

消费者应该属于不同的消费者组才能接收相同的消息。 docs.spring.io/spring-cloud-stream/docs/***lyn.RELEASE/… 【参考方案1】:

你不能那样做;你需要给每个人一个不同的频道名称,然后

spring.cloud.stream.bindings.myFirstConsumer.destination=my-first-topic
spring.cloud.stream.bindings.myFirstConsumer.group=my-first-consumer
spring.cloud.stream.bindings.mySecondConsumer.destination=my-first-topic
spring.cloud.stream.bindings.mySecondConsumer.group=my-second-consumer

另外,@StreamListener 已被弃用,很快将被删除;您应该转换为功能模型。

@Bean
Consumer<MessageDto> myFirstConsumer() 
    return dto -> ...;

@Bean
Consumer<MessageDto> mySecondConsumer() 
    return dto -> ...;

然后

spring.cloud.function.definition=myFirstConsumer;mySecondConsumer
spring.cloud.stream.bindings.myFirstConsumer-in-0.destination=my-first-topic
spring.cloud.stream.bindings.myFirstConsumer-in-0.group=my-first-consumer
spring.cloud.stream.bindings.mySecondConsumer-in-0.destination=my-first-topic
spring.cloud.stream.bindings.mySecondConsumer-in-0.group=my-first-consumer

编辑

以下是使用单个绑定并将每条消息发布到多个侦听器的示例:

@SpringBootApplication
public class So70447378Application 

    public static void main(String[] args) 
        SpringApplication.run(So70447378Application.class, args);
    

    @Bean
    public Consumer<Message<MyDto>> input(PublishSubscribeChannel multiplex) 
        return msg -> multiplex.send(msg);
    

    @Bean
    PublishSubscribeChannel multiplex() 
        return new PublishSubscribeChannel();
    

    @ServiceActivator(inputChannel = "multiplex")
    void firstConsumer(MyDto dto) 
        System.out.println("1:" + dto);
    

    @ServiceActivator(inputChannel = "multiplex")
    void secondConsumer(MyDto dto) 
        System.out.println("2:" + dto);
    

    public static class MyDto 

        private String foo;

        public String getFoo() 
            return this.foo;
        

        public void setFoo(String foo) 
            this.foo = foo;
        

    


spring.cloud.function.definition=input
spring.cloud.stream.bindings.input-in-0.destination=my-topic
spring.cloud.stream.bindings.input-in-0.group=my-group

【讨论】:

但是如果我有一个频道并且我想设置两个听众,那是不可能的? 为此,您可以定义一个 PublishSubscribeChannel 类型的 bean,并将 MyFirstTopicBinding.MY_FIRST_TOPIC 作为 bean 名称,它将被使用而不是默认的 DirectChannel。然后,每个侦听器都会收到消息的副本。 我很抱歉我的粗鲁... ypu 可以和我分享一个代码 sn-p 以便我更好地理解它吗? 我添加了一个使用首选函数模型的示例。

以上是关于@StreamListener 为 kafka 定义 groupId-如何为同一主题设置多个消费者的主要内容,如果未能解决你的问题,请参考以下文章

@KafkaListener、@StreamListener 和 @ServiceActivator 的区别?

Spring Cloud Stream Kafka 消费者模式

为 Google PubSub 暂停 Spring Cloud StreamListener

如何在 Kafka Connect Sink 中指定 Kafka 主题的分区

Kafka深度解析(如何在producer中指定partition)(转)

spring cloud stream kafka - 获取批量并错过心跳