单个 Spring Kafka Consumer 侦听器可以侦听多个主题吗?
Posted
技术标签:
【中文标题】单个 Spring Kafka Consumer 侦听器可以侦听多个主题吗?【英文标题】:Can a single Spring's KafkaConsumer listener listens to multiple topic? 【发布时间】:2017-06-11 04:49:24 【问题描述】:有人知道单个听众是否可以收听以下多个主题吗?我知道只有“topic1”有效,如果我想添加其他主题怎么办?你能举出下面两个例子吗?感谢您的帮助!
@KafkaListener(topics = "topic1,topic2")
public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack)
System.out.println(record);
或
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionInitialOffset("topic1, topic2", 0));
【问题讨论】:
我的案例需要连接一个 Kafka 主题以使用 Spring Boot 获取数据,该数据具有另一个 Kafka 主题名称读取此信息并连接到新主题获取数据并执行一些业务逻辑。你能帮我写一下spring boot代码吗? 【参考方案1】:是的,只需关注@KafkaListener
JavaDocs:
/**
* The topics for this listener.
* The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
* Expression must be resolved to the topic name.
* Mutually exclusive with @link #topicPattern() and @link #topicPartitions().
* @return the topic names or expressions (SpEL) to listen to.
*/
String[] topics() default ;
/**
* The topic pattern for this listener.
* The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
* Expression must be resolved to the topic pattern.
* Mutually exclusive with @link #topics() and @link #topicPartitions().
* @return the topic pattern or expression (SpEL).
*/
String topicPattern() default "";
/**
* The topicPartitions for this listener.
* Mutually exclusive with @link #topicPattern() and @link #topics().
* @return the topic names or expressions (SpEL) to listen to.
*/
TopicPartition[] topicPartitions() default ;
所以,你的用例应该是这样的:
@KafkaListener(topics = "topic1" , "topic2")
【讨论】:
对于另一种情况,您需要为每个人提供一个TopicPartitionInitialOffset
。
谢谢你们,非常有帮助!也得到了加里的工作方式!
有没有办法用同一个监听器为不同的主题配置不同的containerFactories?就我而言,不同主题的消息格式会有所不同。
不行,你需要有不同的@KafkaListener
,为自己的containerFactory
我可以知道这对于批处理侦听器是如何工作的,假设如果我们有带有2
主题和并发1
的批处理侦听器,那么单个批处理轮询可以从两个主题中获取数据吗? @ArtemBilan 和 @GaryRussell以上是关于单个 Spring Kafka Consumer 侦听器可以侦听多个主题吗?的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Consumer 在 Spring Boot 中没有收到消息
Spring Integration Kafka Consumer Listener 不接收消息
如何增加Spring Kafka Consumer每批消费的消息数?
Kafka Consumer 指标在从 Spring Boot 2.2.2 升级到 2.3.0 后消失
springboot kafka集成(实现producer和consumer)
在Spring Kafka Consumer Unit Test中没有调用MessageListner的onMessage