单个 Spring Kafka Consumer 侦听器可以侦听多个主题吗?

Posted

技术标签:

【中文标题】单个 Spring Kafka Consumer 侦听器可以侦听多个主题吗?【英文标题】:Can a single Spring's KafkaConsumer listener listens to multiple topic? 【发布时间】:2017-06-11 04:49:24 【问题描述】:

有人知道单个听众是否可以收听以下多个主题吗?我知道只有“topic1”有效,如果我想添加其他主题怎么办?你能举出下面两个例子吗?感谢您的帮助!

@KafkaListener(topics = "topic1,topic2")
public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) 
    System.out.println(record);
 

ContainerProperties containerProps = new ContainerProperties(new TopicPartitionInitialOffset("topic1, topic2", 0));

【问题讨论】:

我的案例需要连接一个 Kafka 主题以使用 Spring Boot 获取数据,该数据具有另一个 Kafka 主题名称读取此信息并连接到新主题获取数据并执行一些业务逻辑。你能帮我写一下spring boot代码吗? 【参考方案1】:

是的,只需关注@KafkaListener JavaDocs:

/**
 * The topics for this listener.
 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
 * Expression must be resolved to the topic name.
 * Mutually exclusive with @link #topicPattern() and @link #topicPartitions().
 * @return the topic names or expressions (SpEL) to listen to.
 */
String[] topics() default ;

/**
 * The topic pattern for this listener.
 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
 * Expression must be resolved to the topic pattern.
 * Mutually exclusive with @link #topics() and @link #topicPartitions().
 * @return the topic pattern or expression (SpEL).
 */
String topicPattern() default "";

/**
 * The topicPartitions for this listener.
 * Mutually exclusive with @link #topicPattern() and @link #topics().
 * @return the topic names or expressions (SpEL) to listen to.
 */
TopicPartition[] topicPartitions() default ;

所以,你的用例应该是这样的:

@KafkaListener(topics = "topic1" , "topic2")

【讨论】:

对于另一种情况,您需要为每个人提供一个TopicPartitionInitialOffset 谢谢你们,非常有帮助!也得到了加里的工作方式! 有没有办法用同一个监听器为不同的主题配置不同的containerFactories?就我而言,不同主题的消息格式会有所不同。 不行,你需要有不同的@KafkaListener,为自己的containerFactory 我可以知道这对于批处理侦听器是如何工作的,假设如果我们有带有2 主题和并发1 的批处理侦听器,那么单个批处理轮询可以从两个主题中获取数据吗? @ArtemBilan 和 @GaryRussell

以上是关于单个 Spring Kafka Consumer 侦听器可以侦听多个主题吗?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Consumer 在 Spring Boot 中没有收到消息

Spring Integration Kafka Consumer Listener 不接收消息

如何增加Spring Kafka Consumer每批消费的消息数?

Kafka Consumer 指标在从 Spring Boot 2.2.2 升级到 2.3.0 后消失

springboot kafka集成(实现producer和consumer)

在Spring Kafka Consumer Unit Test中没有调用MessageListner的onMessage