kafka之消费超时死循环

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka之消费超时死循环相关的知识,希望对你有一定的参考价值。

参考技术A 07-28 14:34:46.111 -ERROR 279920[skynet.stream.kfk.consumer-2] skynet.boot.stream.kafka.MyConsumer [188]: kafka consumer poll msg error:org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms , which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

服务正常输出结果,但是后续任务无法输出结果

根据服务日志发现该报错信息

这个错误的意思是,消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是由于当前消费者线程消费的分区已经被broker给回收了,因为kafka认为这个消费者死去。

这里就涉及到问题是消费者在创建时会有一个属性max.poll.interval.ms,

该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

kafkaConsumer调用一次轮询方法只是拉取一次消息。客户端为了不断拉取消息,会用一个外部循环不断调用消费者的轮询方法。每次轮询到消息,在处理完这一批消息后,才会继续下一次轮询。但如果一次轮询返回的结构没办法及时处理完成,会有什么后果呢?服务端约定了和客户端max.poll.interval.ms,两次poll最大间隔。如果客户端处理一批消息花费的时间超过了这个限制时间,服务端可能就会把消费者客户端移除掉,并触发rebalance。

kafka的偏移量(offset)是由消费者进行管理的,偏移量有两种,拉取偏移量(position)与提交偏移量(committed)。拉取偏移量代表当前消费者分区消费进度。每次消息消费后,需要提交偏移量。在提交偏移量时,kafka会使用拉取偏移量的值作为分区的提交偏移量发送给协调者。

如果没有提交偏移量,下一次消费者重新与broker连接后,会从当前消费者group已提交到broker的偏移量处开始消费。

所以,问题就在这里,当我们处理消息时间太长时,已经被broker剔除,提交偏移量又会报错。所以拉取偏移量没有提交到broker,分区又rebalance。下一次重新分配分区时,消费者会从最新的已提交偏移量处开始消费。这里就出现了重复消费的问题。

1、kafka消费者  默认 此间隔时长为300s,本次故障是600s都没处理完成,于是改成3600s

max.poll.interval.ms=3600000

2、根据逻辑,当处理数据失败后,进行rebalance,跳出该轮回,进行下一项任务,这样也可以解决该问题, 但遗留部分数据异常可能性。

kafka重复消费问题

问题描述:

       在消费者处理数据慢的时候(消费能力低),消费者会重复消费某个局部数据。在消费能力不变的情况下,陷入死循环。只有在消费能力增强后,才会跳出这个重复消费的死循环。

原理解析:

上图就是完整的kafka消费的过程,在consumer里面配置了一个超时时间。如果步骤2处理消息超时,那么consumer进行第3步会失败。这时候再次进入步骤1拉取重复的数据,如此往复。

验证过程:

搭建一个简单的springboot,集成kakfa,添加配置信息如下:

spring.kafka.bootstrap_servers=$KAFKA_ADDRESS:10.199.1.0:9092
spring.kafka.consumer.group_id=$KAFKA_GROUP_ID:0
spring.kafka.consumer.max-poll-records=1000
spring.kafka.consumer.properties.max.partition.fetch.bytes=65536
spring.kafka.consumer.properties.receive.buffer.bytes=65536
spring.kafka.consumer.properties.session.timeout.ms=10000

spring.kafka.consumer.max-poll-records表示每次最多能拉取1000个数据,spring.kafka.consumer.properties.session.timeout.ms表示超时时间,这里设置10秒。

再来看一下consumer的代码:

@Component
public class KafkaConsumer 
    @KafkaListener(topics = "test")
    public void receive(ConsumerRecord<String, String> consumerRecord)
        System.out.println("test--消费消息:" + consumerRecord.key());
        try 
            Thread.sleep(500);
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

这样的预期结果是每次都在重复执行同样的连续1000条数据。下面看一下验证结果:

开始消费的日志:

重复消费的日志:

结果正好是在1000个数据之后,提交失败。并且日志打出提示,建议你增加session timeout的时间或者减少从kafka里面取到一批的数据量。

总结:

1、kafka取到数据后,应该异步去处理。这样可以增加消费能力。

2、坚持做一件事不容易,比如写博客。自律是做人做事一项必要修炼。

以上是关于kafka之消费超时死循环的主要内容,如果未能解决你的问题,请参考以下文章

kafka重复消费问题

架构实战(10)——消息处理中的死循环

kafka broker 进入 conflicted ephemeral node 死循环

自己写的第一个while循环之死循环

JAVA非静态成员变量之死循环

读源码学编程之——死循环妙用