Spring自带Kafka消息异常处理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring自带Kafka消息异常处理相关的知识,希望对你有一定的参考价值。
参考技术A 方法onMessage会执行注解@KafkaListener指定的方法报异常会catch到,继续下一条消息,如果设置的是自动提交offset,则kafka认为此消息消费成功
异常处理的实现类,打印日志,仅此而已
一段解决kafka消息处理异常的经典对话
对kafka不了解的童鞋可以先看看
有一天,卡尔维护的购买系统发生了一个奇怪的异常,从日志里看到,购买后的任务处理竟然先于购买任务执行了。“不可能啊,按照代码的顺序,一定是先执行购买流程,再发送消息到kafka,最后消费端接收到消息后执行购买后的一些善后任务。从A到B到C,顺序清清楚楚。” 于是,他请教了马克,马克眯着眼睛细看了一会,道:"问题是不是出在这段@Transaction注解上?"
伪代码:
@Transaction
public void buy(){
//购买业务逻辑
user.buy();
//发送消息
kafkaTemplete.sendMdg();
}
buy();
马克说道:“这kafka消息鬼的很,它没准在事务提交之前就发送出去了,而消费者在fetch消息执行业务流程的时候这段事务仍然没有提交,这就导致了数据上的乱序,看上去就像购买后任务先于购买任务执行。”
“那该怎么改呢?把kafkaTemplete.sendMdg()这段移出方法,等事务提交了再发送消息?但我把消息发送这步写在事务注解的方法内部,就是为了在消息发送失败的时候能够实现回滚。如果移出来,而消息发送的时候失败,那怎么办?” 卡尔问道。
“可以考虑使用本地消息表。” 马克说着在白板上写下了一段伪代码:
producer:
@Transaction
public void buy(){
user.buy();
//新建一张msgtable本地消息表,消息在这里插入
insertInitMsgToDB();
}
public void sendMsg(){
//发送消息移到了新方法里
kafkaTemplete.sendMdg();
}
//主流程执行
buy();
sendMsg();
consumer:
@Kafkalistener
@Transaction
public void msgConsume(Record record){
if(isConsumed(record)){
//查询本地消息表,已经消费过的则不处理
return;
}
//处理业务逻辑
deal(record);
// 更改本地消息表消息状态为成功
changeRecord(record);
}
"主要是通过时效性高的MQ,自动触发事件;万一消息发送失败,也可以通过数据表的消息记录轮询来保证。不过关系数据库的吞吐量和性能存在瓶颈,频繁的读写消息会给数据库造成压力,考虑当前场景,稳定性要求较高,而并发量还没有上来。可以考虑这种方法。” 马克道。
卡尔改完后,测试发布,之后再也没出现乱序了,但消息有时会莫名地丢失或者重复消费,卡尔不得不经常查看线上日志,手工修复一些数据问题,费时费力,只能在晚上加班加点开发新的业务。
马克也一直在跟踪这个问题,有一天,他有了发现,走过来对卡尔说道:“我研究了一些kafka的机制,问题可能是我们kafka中的配置enable.auto.commit 是 true的缘故?”
卡尔道:“是不是自动提交会带来一些不可控因素?”
马克道:“对,当我们的配置是自动提交的时候,消费者的消息投递保证有可能是at least once,或者at most once。当到达提交时间间隔,触发Kafka自动提交上次的偏移量时,就可能发生at most once的情况, 在这段时间,如果消费者还没完成消息的处理进程就崩溃了, 消费者进程重新启动时,它开始接收上次提交的偏移量之后的消息,实际上消费者可能会丢失几条消息;而当消费者处理完消息并将消息提交到持久化存储系统,而消费者进程崩溃时,会发生at least once的情况。 在此期间,kafka没有向broker提交offset,因为自动提交时间间隔没有过去。 当消费者进程重新启动时,会收到从上次提交的偏移量开始的一些旧消息。”
“正是这个导致消息丢失或者重复消费现象,那你想怎么改呢?” 卡尔道。
马克继续道:“不仅如此,即使消费者进程没有崩溃,假如中间有一个消息的业务逻辑执行抛出了异常,消费者也当作是接收到了消息,程序执行回滚,这条消息也等同于丢失了。我关闭了自动提交(enable.auto.commit:false),当消费者每次 poll 处理完业务逻辑后必须完成手动同步提交(commitSync),如果消费者在消费过程中发生 crash,或者执行业务逻辑发生异常回滚,下次启动时依然会从之前的位置开始消费,从而保证每次提交的内容都能被消费,即实现了at least once保证。”
“这防止了消息丢失,但消息重复问题该怎么解决?”
“先别急,另外需要注意的是,这只是对消费者的配置,为了使消息在发送时不丢失,我们对生产者也要做相应的配置优化。即配置 request.required.acks 参数。” 马克说着,在纸上列出了一张表格:
1(默认) |
leader 已成功收到的数据并得到确认后发送下一条 message。如果 leader 宕机,则会丢失数据。 |
0 |
送端无需等待来自 broker 的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。 |
-1(ALL) |
发送端需要等待 ISR 列表中所有列表都确认接收数据后才算一次发送完成,可靠性最高。 |
“至于你说的消费重复问题,主要解决思路是在服务层实现幂等性。让接收端支持消息去重的功能。比如在上面的伪代码中,record中放一个唯一键字段,消费时根据唯一键查询这条消息,判断是否消费过。也可以通过redis缓存来实现类似的机制。”
卡尔道:“真是这样子的吗?”
“尽信书不如无书,尤其是技术,是需要经过长时间的时间检验的,你对此有所怀疑的话可以在本地开发环境优化试试看。” 马克道。
更多精彩:
本地消息表只是分布式事务处理的一个经典方法,要想了解更全面具体的方案,请关注知识星球,获取支付宝大神的方案杰作,另外有重要开源框架推荐。
java达人
ID:drjava
(长按识别)
以上是关于Spring自带Kafka消息异常处理的主要内容,如果未能解决你的问题,请参考以下文章