Spring @KafkaListener 和并发
Posted
技术标签:
【中文标题】Spring @KafkaListener 和并发【英文标题】:Spring @KafkaListener and concurrency 【发布时间】:2020-03-12 07:41:19 【问题描述】:我正在使用 spring boot + spring @KafkaListener。我期望的行为是:我的 kafka 监听器在 10 个线程中读取消息。这样,如果其中一个线程挂起,其他消息将继续读取和处理消息。
我定义了bean的
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory)
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
return factory;
还有 Spring Boot 配置:
spring.kafka.listener.concurrency=10
我看到所有配置都有效,我在 jmx 中看到了我的 10 个线程:
然后我做了这样的测试:
@KafkaListener(topics =
"$topic.name" , clientIdPrefix = "$kafka.client.id.prefix", idIsGroup = false, id = "$kafka.listener.name", containerFactory = "kafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record)
if(record.getVersion() < 3)
try
Thread.sleep(20000);
catch (InterruptedException e)
e.printStackTrace();
else
System.out.println("It works!");
如果版本
也许我的期望不正确,这是 kafka 听众的正确行为。 请帮我处理一下kafka并发,为什么会这样?
【问题讨论】:
您是否按照预期的顺序获得记录版本?或者不一样 是的,我一个一个发:1,2,3 你可以阅读这篇文章-rmoff.net/2018/08/02/kafka-listeners-explained 每个@KafkaListener
都有自己的消费者。具有相同组 id 的消费者不能多于主题的分区。如果您有 10 个 @KafkaListener
共享同一个组 id 从一个具有 1 个分区的主题消费,那么 @KafkaListener
中的 9 个将闲置无所事事。
您的主题有多少个分区以及您生成消息的顺序是什么?
【参考方案1】:
卡夫卡不是这样工作的;您至少需要与消费者一样多的分区(由 spring 容器中的concurrency
控制)。
此外,一次只有一个消费者(在一个组中)可以从一个分区消费,因此,即使您增加分区,“卡住”消费者后面的同一分区中的记录也不会被其他消费者接收。
【讨论】:
【参考方案2】:如果您想要故障转移 Kafka,您必须启动更多应用程序实例。
示例:您有一个名为 test
的主题,带有 1 个分区,您将使用相同的 Kafka 组创建应用程序的 2 个实例。一个实例将处理您的数据,另一个将等待并开始处理消息,以防第一个实例崩溃。如果您有 N 个分区和 N + 1 或 2 或 3 个应用程序实例,则相同。此外,每个实例将只有一个消费者线程。
有关它的更多信息,请在 Google 上搜索:Kafka Consumer Groups。
【讨论】:
以上是关于Spring @KafkaListener 和并发的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 spring-kafka 暂停和恢复 @KafkaListener
Spring Kafka KafkaListener 注释是不是能够将配置属性作为主题名称?
Spring Kafka 之 @KafkaListener 单条或批量处理消息