Kafka CommitFailedException异常的一点思考

Posted 大数据Kafka技术分享

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka CommitFailedException异常的一点思考相关的知识,希望对你有一定的参考价值。

一、含义

CommitFailedException异常:位移提交失败时候抛出的异常。通常该异常被抛出时还会携带这样的一段话:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

简单翻译一下,“位移提交失败,原因是消费者组开启了rebalance且已然分配对应分区给其他消费者。这表明poll调用间隔超过了max.poll.interval.ms的值,这通常表示poll循环中的消息处理花费了太长的时间。解决方案有两个:1. 增加session.timeout.ms值;2. 减少max.poll.records值”

下面这段话完全是我自己的理解,请谨慎听取~~

在我看来,上面英文中的最后一句话实际上依然是0.10.0.0或之前版本时的解决之道,因为在那些版本中尚未提供max.poll.interval.ms参数,因此session.timeout.ms既用于失败检测,也用于控制消息处理时间,同时还承担着rebalance过程的超时控制。在0.10.1.0版本时社区对该参数的含义进行了解耦,推出了max.poll.interval.ms参数。实际上,在0.10.1.0.0或之后的版本中,作者推荐用户将session.timeout.ms设置一个很小的值,比如5s,但需要把max.poll.interval.ms设置成平均的消息处理时间。举个例子,假设你一次poll调用返回的消息数是N,你处理每条消息的平均时间是t0,那么你需要设置max.poll.interval.ms稍稍大于N * t0以保证poll调用间隔不会超过该阈值。

如此来看,上面英文最后一句话中的第一个解决办法应该修改成:增加max.poll.interval.ms值,而非session.timeout.ms

二、抛出时机

从源代码方面说,CommitFailedException异常通常发生在手动提交位移时,即用户显式调用KafkaConsumer.commitSync()方法。从使用场景来说,有两种场景可以抛出该异常

2.1 消息处理时间>max.poll.interval.ms时: 如前所述,这是该异常最“正宗”的出现场景。复现也比较容易,用户只需写一个consumer程序,订阅topic(即使用consumer.subscribe),设置max.poll.interval.ms=N,然后在consumer.poll循环中Thread.sleep(>N),之后手动提交位移即可复现,比如:

(c)2006-2024 SYSTEM All Rights Reserved IT常识