Spring @KafkaListener 和并发

Posted

技术标签:

【中文标题】Spring @KafkaListener 和并发【英文标题】:Spring @KafkaListener and concurrency 【发布时间】:2020-03-12 07:41:19 【问题描述】:

我正在使用 spring boot + spring @KafkaListener。我期望的行为是:我的 kafka 监听器在 10 个线程中读取消息。这样,如果其中一个线程挂起,其他消息将继续读取和处理消息。

我定义了bean的

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory)


    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setMissingTopicsFatal(false);
    factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
    return factory;

还有 Spring Boot 配置:

spring.kafka.listener.concurrency=10

我看到所有配置都有效,我在 jmx 中看到了我的 10 个线程:

然后我做了这样的测试:

 @KafkaListener(topics = 
            "$topic.name" , clientIdPrefix = "$kafka.client.id.prefix", idIsGroup = false, id = "$kafka.listener.name", containerFactory = "kafkaListenerContainerFactory")
    public void listen(ConsumerRecord<String, String> record)
    
        if(record.getVersion() < 3) 
            try 
                Thread.sleep(20000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
        else
            System.out.println("It works!");

    

如果版本

也许我的期望不正确,这是 kafka 听众的正确行为。 请帮我处理一下kafka并发,为什么会这样?

【问题讨论】:

您是否按照预期的顺序获得记录版本?或者不一样 是的,我一个一个发:1,2,3 你可以阅读这篇文章-rmoff.net/2018/08/02/kafka-listeners-explained 每个@KafkaListener 都有自己的消费者。具有相同组 id 的消费者不能多于主题的分区。如果您有 10 个 @KafkaListener 共享同一个组 id 从一个具有 1 个分区的主题消费,那么 @KafkaListener 中的 9 个将闲置无所事事。 您的主题有多少个分区以及您生成消息的顺序是什么? 【参考方案1】:

卡夫卡不是这样工作的;您至少需要与消费者一样多的分区(由 spring 容器中的concurrency 控制)。

此外,一次只有一个消费者(在一个组中)可以从一个分区消费,因此,即使您增加分区,“卡住”消费者后面的同一分区中的记录也不会被其他消费者接收。

【讨论】:

【参考方案2】:

如果您想要故障转移 Kafka,您必须启动更多应用程序实例。

示例:您有一个名为 test 的主题,带有 1 个分区,您将使用相同的 Kafka 组创建应用程序的 2 个实例。一个实例将处理您的数据,另一个将等待并开始处理消息,以防第一个实例崩溃。如果您有 N 个分区和 N + 1 或 2 或 3 个应用程序实例,则相同。此外,每个实例将只有一个消费者线程。

有关它的更多信息,请在 Google 上搜索:Kafka Consumer Groups

【讨论】:

以上是关于Spring @KafkaListener 和并发的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 spring-kafka 暂停和恢复 @KafkaListener

如何为@KafkaListener 编写单元测试?

Spring Kafka KafkaListener 注释是不是能够将配置属性作为主题名称?

Spring Kafka 之 @KafkaListener 单条或批量处理消息

@KafkaListener和@KafkaListeners的使用

(8) KafkaListener注解