kafka重复消费问题

Posted 彭薄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka重复消费问题相关的知识,希望对你有一定的参考价值。

问题描述:

       在消费者处理数据慢的时候(消费能力低),消费者会重复消费某个局部数据。在消费能力不变的情况下,陷入死循环。只有在消费能力增强后,才会跳出这个重复消费的死循环。

原理解析:

上图就是完整的kafka消费的过程,在consumer里面配置了一个超时时间。如果步骤2处理消息超时,那么consumer进行第3步会失败。这时候再次进入步骤1拉取重复的数据,如此往复。

验证过程:

搭建一个简单的springboot,集成kakfa,添加配置信息如下:

spring.kafka.bootstrap_servers=$KAFKA_ADDRESS:10.199.1.0:9092
spring.kafka.consumer.group_id=$KAFKA_GROUP_ID:0
spring.kafka.consumer.max-poll-records=1000
spring.kafka.consumer.properties.max.partition.fetch.bytes=65536
spring.kafka.consumer.properties.receive.buffer.bytes=65536
spring.kafka.consumer.properties.session.timeout.ms=10000

spring.kafka.consumer.max-poll-records表示每次最多能拉取1000个数据,spring.kafka.consumer.properties.session.timeout.ms表示超时时间,这里设置10秒。

再来看一下consumer的代码:

@Component
public class KafkaConsumer 
    @KafkaListener(topics = "test")
    public void receive(ConsumerRecord<String, String> consumerRecord)
        System.out.println("test--消费消息:" + consumerRecord.key());
        try 
            Thread.sleep(500);
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

这样的预期结果是每次都在重复执行同样的连续1000条数据。下面看一下验证结果:

开始消费的日志:

重复消费的日志:

结果正好是在1000个数据之后,提交失败。并且日志打出提示,建议你增加session timeout的时间或者减少从kafka里面取到一批的数据量。

总结:

1、kafka取到数据后,应该异步去处理。这样可以增加消费能力。

2、坚持做一件事不容易,比如写博客。自律是做人做事一项必要修炼。

以上是关于kafka重复消费问题的主要内容,如果未能解决你的问题,请参考以下文章

kafka重复消费的原因

kafka重复消费问题

kafka重复消费问题

kafka 重复消费问题

Kafka重复消费数据

Kafka问题优化之消费重复问题