RocketMQ消费者没有成功消费消息的问题排查
Posted 翎野君
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ消费者没有成功消费消息的问题排查相关的知识,希望对你有一定的参考价值。
背景
今天下游同事反馈,有一些以取消的订单库存还原异常了,导致部分商品库存没有还原。查日志发现没有收到还原消息,但是查看发送方是可以确认消息是已经发了的,那么是什么原因导致消费者没有收到,或者收到后没有处理消息呢。最后发现这些消息的状态都是NOT_ONLINE,原因是服务挂了,重启之后便可以重新消费了。让我们看看这个调查过程。
调查
消息丢失如何排查?当我们在使用mq的时候,经常会遇到消息消费异常的问题,原因有很多种,比如:
- producer发送失败
- consumer消费异常
- consumer根本就没收到消息
「那么我们该如何排查了?」
其实借助RocketMQ-Dashboard就能高效的排查,里面有很多你想象不到的功能。
首先我们先查找期望消费的消息,查找的方式有很多种,根据消息id,时间等。
「消息没找到?」
说明proder发送异常,也有可能是消息过期了,因为rocketmq的消息默认保存72h,此时到producer端的日志进一步确认即可。
「消息找到了!」
接着看消息的消费状态,如下图消息的消费状态为NOT_ONLINE。
「NOT_ONLINE代表什么含义呢?」
别着急,我们一步步来分析,先看看TrackType到底有多少种状态。
public enum TrackType
CONSUMED,
CONSUMED_BUT_FILTERED,
PULL,
NOT_CONSUME_YET,
NOT_ONLINE,
UNKNOWN
每种类型的解释如下:
类型 | 解释 |
CONSUMED | 消息已经被消费 |
CONSUMED_BUT_FILTERED | 消息已经投递但被过滤 |
PULL | 消息消费的方式是拉模式 |
NOT_CONSUME_YET | 目前没有被消费 |
NOT_ONLINE | CONSUMER不在线 |
UNKNOWN | 未知错误 |
「怎么判定消息已经被消费?」
上一节我们讲到,broker会用一个map来保存每个queue的消费进度,「如果queue的offset大于被查询消息的offset则消息被消费,否则没有被消费」(NOT_CONSUME_YET)。
我们在RocketMQ-Dashboard上其实就能看到每个队列broker端的offset(代理者位点)以及消息消费的offset(消费者位点),差值就是没有被消费的消息。
当消息都被消费时,差值为0,如下图所示:
「CONSUMED_BUT_FILTERED表示消息已经投递,但是已经被过滤掉了」。例如producer发的是topicA,tagA,但是consumer订阅的却是topicA,tagB。
「CONSUMED_BUT_FILTERED(消息已经被投递但被过滤)是怎么发生的呢?」
这个就不得不提到RocketMQ中的一个概念,「消息消费要满足订阅关系一致性,即一个consumerGroup中的所有消费者订阅的topic和tag必须保持一致,不然就会造成消息丢失」。
如下图场景,发送了4条消息,consumer1订阅了topica-taga,而consumer2订阅了topica-tab。consumer1消费q0中的数据,consumer2消费q1中的数据。
投递到q0的msg-1和msg-3只有msg-1能被正常消费,而msg-3则是CONSUMED_BUT_FILTERED。因为msg-3被投递到q0,但是consumer1不消费tagb的消息导致消息被过滤,造成消息丢失。
同理msg-2这条消息也会丢失。
「注意,还有一个非常重要的点」!
虽然消息消费失败了,但是消息的offset还会正常提交,即 「消息消费失败了,但是状态也会是CONSUMED」。
「RocketMQ认为消息消费失败需要重试的场景有哪些?」
- 返回ConsumeConcurrentlyStatus.RECONSUME_LATER
- 返回null
- 主动或被动抛出异常
「那么消费失败的消息去哪了呢?」
当消息消费失败,会被放到重试队列中,Topic名字为%RETRY% + consumerGroup。
「Consumer没订阅这个topic啊,怎么才能消费到重试消息?」
其实在Consumer启动的时候,框架内部帮你订阅了这个topic,所以重试消息能被消费到。
「另外消息不是一直重试,而是每隔1段时间进行重试」
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
当消息超过最大消费次数16次,会将消息投递到死信队列中,死信队列的topic名为%DLQ% + consumerGroup。
「因此当你发现消息状态为CONSUMED,但是消费失败时,去重试队列和死信队列中找就行了」。
本篇文章如有帮助到您,请给「翎野君」点个赞,感谢您的支持。
首发链接:https://www.cnblogs.com/lingyejun/p/16997414.html
程序重启RocketMQ消息重复消费
最近在调试RocketMQ消息发送与消费的Demo时,发现一个问题:只要重启程序,RocketMQ消息就会重复消费。
那么这是什么原因导致的,又该如何解决呢?
经过一番排查,发现程序使用的RocketMQ客户端版本是3.6.2,而测试环境安装的RocketMQ环境的版本是4.1.0。原来是客户端和服务器端版本不一样导致的,消息并没有最终被消费,即没有ACK消息确认,只要程序重启就会重复消费。
解决方案:RocketMQ客户端版本使用与服务器端的同一版本,即4.1.0版本。
划重点:使用RocketMQ的时候,客户端与服务器端一定要使用相同的版本。
以上是关于RocketMQ消费者没有成功消费消息的问题排查的主要内容,如果未能解决你的问题,请参考以下文章