kafka消费者和offset的关系,以及异常处理问题

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka消费者和offset的关系,以及异常处理问题相关的知识,希望对你有一定的参考价值。

参考技术A

earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

简单来说,如果partition里已经有数据,但还没有消费,earliest就会从没消费的起始点来消费,反观latest就不会去消费;如果partition已经有已消费的数据,再放新的数据进去,那么它们都会从新的数据开始消费。

offset会保存在kafka内部,一开始发送数据到kafka的时候就有offset,只是有没有提交而已。而使用spring-kafka时,客户端在监听topic的时候,它有2种提交offset的方式:

1、自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功。

2、手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。

spring-kafka版本2.5.5,官网 https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#listener-error-handlers ,设置的是批量消费。

因为是批量消费,所以@KafkaListener需要使用list来接收消息,如果使用单个bean会报错。正常不设置异常处理,它会不断循环重复消费这条数据,不像别的地方说有一定数量的重试。

实现接口new BatchErrorHandler自定义属于自己的批量异常处理,但只会到:
public void handle(Exception thrownException, ConsumerRecords<?, ?> data)

而不到
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container)

再定义逻辑自定处理。如果像官网那样seek回开始的offset,也是无限循环,不太了解所以不采用。

实现接口ConsumerAwareListenerErrorHandler,注意区别是有个Listener的,

如果同时存在局部和全局,在@KafkaListener注解中标注了这个局部的异常处理器,会优先使用局部的。

如果发生异常,来的是一批数据,如果头部发生了异常,那么后面的都会略过。按照参考链接中的异常处理,定义一个死信,来接收这些失败的msg,如果异常处理在全局异常处理器中,那么它们都被发送到死信,后续就算数据是正确的,都不会处理,所以 还是建议个人使用try catch来包裹 处理,个人尝试在kafka处理业务远程插入两条数据,第一条错误,第二条正确,try catch中第一条自定发到死信,第二条会正确入库。

参考:
kafka之consumer参数auto.offset.reset

kafka 消费者offset记录位置和方式

消息队列-kafka消费异常问题

Kafka - 异常处理 待试

以上是关于kafka消费者和offset的关系,以及异常处理问题的主要内容,如果未能解决你的问题,请参考以下文章

Spring自带Kafka消息异常处理

Kafka 消费者之消费方式工作流程消费者案例(订阅主题订阅分区)消费者组案例分区的分配以及再平衡offset 位移消费者事务数据积压(消费者如何提高吞吐量)

Kafka 消费者异常和偏移提交

Kafka

kafka系列之(3)——Coordinator与offset管理和Consumer Rebalance

rocketMQ -- offset管理