kafka重复消费问题
Posted 彭薄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka重复消费问题相关的知识,希望对你有一定的参考价值。
问题描述:
在消费者处理数据慢的时候(消费能力低),消费者会重复消费某个局部数据。在消费能力不变的情况下,陷入死循环。只有在消费能力增强后,才会跳出这个重复消费的死循环。
原理解析:
上图就是完整的kafka消费的过程,在consumer里面配置了一个超时时间。如果步骤2处理消息超时,那么consumer进行第3步会失败。这时候再次进入步骤1拉取重复的数据,如此往复。
验证过程:
搭建一个简单的springboot,集成kakfa,添加配置信息如下:
spring.kafka.bootstrap_servers=$KAFKA_ADDRESS:10.199.1.0:9092
spring.kafka.consumer.group_id=$KAFKA_GROUP_ID:0
spring.kafka.consumer.max-poll-records=1000
spring.kafka.consumer.properties.max.partition.fetch.bytes=65536
spring.kafka.consumer.properties.receive.buffer.bytes=65536
spring.kafka.consumer.properties.session.timeout.ms=10000
spring.kafka.consumer.max-poll-records表示每次最多能拉取1000个数据,spring.kafka.consumer.properties.session.timeout.ms表示超时时间,这里设置10秒。
再来看一下consumer的代码:
@Component
public class KafkaConsumer
@KafkaListener(topics = "test")
public void receive(ConsumerRecord<String, String> consumerRecord)
System.out.println("test--消费消息:" + consumerRecord.key());
try
Thread.sleep(500);
catch (InterruptedException e)
e.printStackTrace();
这样的预期结果是每次都在重复执行同样的连续1000条数据。下面看一下验证结果:
开始消费的日志:
重复消费的日志:
结果正好是在1000个数据之后,提交失败。并且日志打出提示,建议你增加session timeout的时间或者减少从kafka里面取到一批的数据量。
总结:
1、kafka取到数据后,应该异步去处理。这样可以增加消费能力。
2、坚持做一件事不容易,比如写博客。自律是做人做事一项必要修炼。
以上是关于kafka重复消费问题的主要内容,如果未能解决你的问题,请参考以下文章