@StreamListener 为 kafka 定义 groupId-如何为同一主题设置多个消费者
Posted
技术标签:
【中文标题】@StreamListener 为 kafka 定义 groupId-如何为同一主题设置多个消费者【英文标题】:@StreamListener define groupId for kafka- How can I set many consumers for the same topic 【发布时间】:2022-01-23 13:53:56 【问题描述】:我在我的应用程序中使用 java、springboot 和 Kafka。
我想在 Kafka 中为同一个主题定义多个消费者。 现在,我在我的应用程序属性文件中定义组 ID:
spring.cloud.stream.bindings.myFirstTopic.destination=my-first-topic
spring.cloud.stream.bindings.myFirstTopic.group=my-first-consumer
关于我使用注解的方法:
@StreamListener(MyFirstTopicBinding.MY_FIRST_TOPIC)
public void firstConsumer(@Payload MessageDto dto)
@StreamListener(MyFirstTopicBinding.MY_FIRST_TOPIC)
public void secondConsumer(@Payload MessageDto dto)
我希望这两种方法都能得到相同的消息....
我该怎么做?
【问题讨论】:
消费者应该属于不同的消费者组才能接收相同的消息。 docs.spring.io/spring-cloud-stream/docs/***lyn.RELEASE/… 【参考方案1】:你不能那样做;你需要给每个人一个不同的频道名称,然后
spring.cloud.stream.bindings.myFirstConsumer.destination=my-first-topic
spring.cloud.stream.bindings.myFirstConsumer.group=my-first-consumer
spring.cloud.stream.bindings.mySecondConsumer.destination=my-first-topic
spring.cloud.stream.bindings.mySecondConsumer.group=my-second-consumer
另外,@StreamListener
已被弃用,很快将被删除;您应该转换为功能模型。
@Bean
Consumer<MessageDto> myFirstConsumer()
return dto -> ...;
@Bean
Consumer<MessageDto> mySecondConsumer()
return dto -> ...;
然后
spring.cloud.function.definition=myFirstConsumer;mySecondConsumer
spring.cloud.stream.bindings.myFirstConsumer-in-0.destination=my-first-topic
spring.cloud.stream.bindings.myFirstConsumer-in-0.group=my-first-consumer
spring.cloud.stream.bindings.mySecondConsumer-in-0.destination=my-first-topic
spring.cloud.stream.bindings.mySecondConsumer-in-0.group=my-first-consumer
编辑
以下是使用单个绑定并将每条消息发布到多个侦听器的示例:
@SpringBootApplication
public class So70447378Application
public static void main(String[] args)
SpringApplication.run(So70447378Application.class, args);
@Bean
public Consumer<Message<MyDto>> input(PublishSubscribeChannel multiplex)
return msg -> multiplex.send(msg);
@Bean
PublishSubscribeChannel multiplex()
return new PublishSubscribeChannel();
@ServiceActivator(inputChannel = "multiplex")
void firstConsumer(MyDto dto)
System.out.println("1:" + dto);
@ServiceActivator(inputChannel = "multiplex")
void secondConsumer(MyDto dto)
System.out.println("2:" + dto);
public static class MyDto
private String foo;
public String getFoo()
return this.foo;
public void setFoo(String foo)
this.foo = foo;
spring.cloud.function.definition=input
spring.cloud.stream.bindings.input-in-0.destination=my-topic
spring.cloud.stream.bindings.input-in-0.group=my-group
【讨论】:
但是如果我有一个频道并且我想设置两个听众,那是不可能的? 为此,您可以定义一个PublishSubscribeChannel
类型的 bean,并将 MyFirstTopicBinding.MY_FIRST_TOPIC
作为 bean 名称,它将被使用而不是默认的 DirectChannel
。然后,每个侦听器都会收到消息的副本。
我很抱歉我的粗鲁... ypu 可以和我分享一个代码 sn-p 以便我更好地理解它吗?
我添加了一个使用首选函数模型的示例。以上是关于@StreamListener 为 kafka 定义 groupId-如何为同一主题设置多个消费者的主要内容,如果未能解决你的问题,请参考以下文章
@KafkaListener、@StreamListener 和 @ServiceActivator 的区别?
Spring Cloud Stream Kafka 消费者模式
为 Google PubSub 暂停 Spring Cloud StreamListener
如何在 Kafka Connect Sink 中指定 Kafka 主题的分区