从源码分析RocketMQ不保证幂等的三个原因

Posted 陈汤姆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从源码分析RocketMQ不保证幂等的三个原因相关的知识,希望对你有一定的参考价值。

阅读本文大概需要 5 分钟





  • 前言
  • 1.场景
    • 1.1. 疑问点
    • 1.2. 日志追踪
    • 1.3.定位问题
  • 2.问题点
    • 2.1. 批量消费消息问题
    • 2.2. 更新位点(offset)问题
    • 2.3.消费超时时间问题
  • 3.总结
    • 总结

前言

目前公司主要业务是跨境物流方面的,我们部门主要负责包裹全中心对接的业务,通俗点讲就是实现包裹在各个系统的流转。而我负责的模块主要是包裹全链路路径的需求,这部分更注重业务上的处理,因此对于业务上的产生的数据要保证正确性。

1. 场景

最近生产环境中测试发现包裹的链路数据出现了一些重复的数据,主要体现在一个事件产生了多条重复的数据,从这个问题点开始追踪问题所在。



追踪问题如下:

1.1. 疑问点

首先怀疑的问题点就是幂等问题,因此我们业务为了保证时效性都大部分都通过MQ做了异步处理,因此可能会由于MQ出现重复消费导致的。



1.2. 日志追踪

通过查看线上日志,可以看到同一个messageId被消费了多次,日志记录出现重复消费。


1.3. 定位问题

问题找到了就是由于mq重复消费导致,然后通过mq控制台查看该消息的轨迹,知道该消息的消费时间超过了消息消费默认的时常(默认15分钟),因此消息服务器认为消费未成功,又重新推送了消息。

从源码分析RocketMQ不保证幂等的三个原因


因此问题点定位到了接下来就是做业务上的幂等处理。


但是这里我还是刨根问底的去翻了源码,看了mq如何处理幂等问题的,真被我找到了RocketMQ不保证幂等的原因。


2. 问题点

通过查看源码知道RocketMQ处理消息的消费情况主要通过一个叫做位点(offset)来实现的,首先看一下RocketMQ的消息在broker中的情况。

从源码分析RocketMQ不保证幂等的三个原因

broker中都会存在一个CommitLog用来绑定每个消息的消费情况,每个消息在CommitLog上都是通过一个offset进行控制状态的,因此RocketMQ消费成功一条消息后就会在CommitLog中将该消息的offset进行移位。因此RocketMQ主要通过offset进行控制消息的消费。


那么通过源码找到不保证幂等的问题点在哪里呢?


主要有两个方面:


2.1. 批量消费消息

源码如下:


从源码分析RocketMQ不保证幂等的三个原因


源码中Action action = listener.consume(msgList, context);该源码朔源可以看到是操作的消息列表,因此这批消息的消费情况只通过一个action标识,因此批量消息更新消费状态时可能出现牵一发而动全身的问题,假设一批消息中有10条消息,其中一条消息消费失败,那么action的返回值依然是RECONSUME_LATER,因此导致这批10条消息都会被重推,从而导致重复消费。


这种批量消费消息的场景在我们实际业务中并没有使用,实际业务中我们最终调用的是ConsumeMessageConcurrentlyService实现类!





2.2. 更新位点(offset)问题

源码如下:

从源码分析RocketMQ不保证幂等的三个原因

从源码分析RocketMQ不保证幂等的三个原因


因此通过源码可以看到由于RocketMQ取未消费成功的offset最小值,那么如果有10条消息,1-6消费成功,但是6消费失败,7-10消费成功,那么RocketMQ更新offset时会更新为6,导致7-10消息也被重新推送,造成消息重复消费。


这样的问题出现从我的理解上为了保证消息不丢失做的兼容。


2.3. 消费超时问题



消费时间超时应该是比较常见的消息重复消费的原因,消费超时最根本的问题还是在于业务逻辑的处理,因此如果出现消费超时第一时间就应该从业务逻辑上做优化,而不是改动消息消费的超时时间。


我负责的模块出现消费超时主要原因是业务逻辑中处理大批量数据,在处理大批量数据时并没有做多线程的优化以及mq的异步优化,目前已优化mq异步的方式,减少消费超时的问题。


那么RocketMQ的消费超时时间是如何设定的?

源码如下:


RocketMQ官网也提供了消费者的超时时间,因此在消费消息时若超过默认的15分钟,那么RocketMQ就会把该消息的状态设置为TIMEOUT,在RocketMQ中只要消息状态不是SUCCESS,那么都是需要消息服务器重推消息,从而造成消息的重复消费。


3. 总结

以上就是探寻RocketMQ的过程,也了解了RocketMQ的消费过程以及如何控制消息状态的逻辑。


知其然知其所以然!!




参考资料:


1、RocketMQ源码




以上是关于从源码分析RocketMQ不保证幂等的三个原因的主要内容,如果未能解决你的问题,请参考以下文章

rocketmq源码分析:消息存储与消息通信

RocketMQ 源码分析 —— 高可用

从源码告诉你,RocketMQ的tag有什么坑。

从RocketMQ的Broker源码层面验证一下这两个点

rocketmq源码分析1-benchmark学习

如何保障消息中间件 100% 消息投递成功?如何保证消息幂等性?