Kafka消费异常处理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka消费异常处理相关的知识,希望对你有一定的参考价值。

参考技术A

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

https://blog.csdn.net/shibuwodai_/article/details/80678717

两次poll()的时间间隔大于配置的session.timeout.ms,根本原因是处理时间太长,大于设定的session.timeout.ms。如果长时间不调用poll()方法,集群会认为该消费者已经挂掉了,就不会让它提交偏移量了,这样就会造成重复消费数据。

Assuming we are talking about Kafka 0.10.1.0 or upwards where each consumer instance employs two threads to function. One is user thread from which poll is called; the other is heartbeat thread that specially takes care of heartbeat things.

session.timeout.ms is for heartbeat thread. If coordinator fails to get any heartbeat from a consumer before this time interval elapsed, it marks consumer as failed and triggers a new round of rebalance.

max.poll.interval.ms is for user thread. If message processing logic is too heavy to cost larger than this time interval, coordinator explicitly have the consumer leave the group and also triggers a new round of rebalance.

heartbeat.interval.ms is used to have other healthy consumers aware of the rebalance much faster. If coordinator triggers a rebalance, other consumers will only know of this by receiving the heartbeat response with REBALANCE_IN_PROGRESS exception encapsulated. Quicker the heartbeat request is sent, faster the consumer knows it needs to rejoin the group.

Suggested values:
session.timeout.ms : a relatively low value, 10 seconds for instance.
max.poll.interval.ms : based on your processing requirements
heartbeat.interval.ms : a relatively low value, better 1/3 of the session.timeout.ms

修改配置参数,调大间隔,调小一次处理的最大任务数量

使用多线程并行处理

kafka消费者和offset的关系,以及异常处理问题

参考技术A

earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

简单来说,如果partition里已经有数据,但还没有消费,earliest就会从没消费的起始点来消费,反观latest就不会去消费;如果partition已经有已消费的数据,再放新的数据进去,那么它们都会从新的数据开始消费。

offset会保存在kafka内部,一开始发送数据到kafka的时候就有offset,只是有没有提交而已。而使用spring-kafka时,客户端在监听topic的时候,它有2种提交offset的方式:

1、自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功。

2、手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。

spring-kafka版本2.5.5,官网 https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#listener-error-handlers ,设置的是批量消费。

因为是批量消费,所以@KafkaListener需要使用list来接收消息,如果使用单个bean会报错。正常不设置异常处理,它会不断循环重复消费这条数据,不像别的地方说有一定数量的重试。

实现接口new BatchErrorHandler自定义属于自己的批量异常处理,但只会到:
public void handle(Exception thrownException, ConsumerRecords<?, ?> data)

而不到
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container)

再定义逻辑自定处理。如果像官网那样seek回开始的offset,也是无限循环,不太了解所以不采用。

实现接口ConsumerAwareListenerErrorHandler,注意区别是有个Listener的,

如果同时存在局部和全局,在@KafkaListener注解中标注了这个局部的异常处理器,会优先使用局部的。

如果发生异常,来的是一批数据,如果头部发生了异常,那么后面的都会略过。按照参考链接中的异常处理,定义一个死信,来接收这些失败的msg,如果异常处理在全局异常处理器中,那么它们都被发送到死信,后续就算数据是正确的,都不会处理,所以 还是建议个人使用try catch来包裹 处理,个人尝试在kafka处理业务远程插入两条数据,第一条错误,第二条正确,try catch中第一条自定发到死信,第二条会正确入库。

参考:
kafka之consumer参数auto.offset.reset

kafka 消费者offset记录位置和方式

消息队列-kafka消费异常问题

Kafka - 异常处理 待试

以上是关于Kafka消费异常处理的主要内容,如果未能解决你的问题,请参考以下文章

kafka消费者和offset的关系,以及异常处理问题

Spring自带Kafka消息异常处理

kafka故障排查-consumer处理超时导致的异常

kafka日志保留策略异常处理

Kafka 消费者异常和偏移提交

如果发生异常,如何禁用记录 Kafka 批处理中的所有消息?