RocketMQ 消费重试简介

Posted 流楚丶格念

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 消费重试简介相关的知识,希望对你有一定的参考价值。

文章目录

消费重试

什么是消费重试

当消息发送到Broker成功 ,在被消费者消费时如果消费者没有正常消费 ,此时消息会重试消费。消费重试存在两种场景 :

场景1:消息没有被消费者接收 ,比如消费者与broker存在网络异常。此种情况消息会一直被消费重试。

场景2:当消息已经被消费者成功接收 ,但是在进行消息处理时出现异常 ,消费端无法向Broker返回成功 ,这种情况下 RocketMQ会不断重试。

问题:针对第二种消费重试的场景 ,borker是怎么知道重试呢 ?

消费者在消费消息成功会向broker返回成功状态,否则会不断进行消费重试。

处理策略

问题:当消息在消费时出现异常 ,此时消息被不断重试消费。 RocketMQ会一直重试消费吗 ?

答案是不会的

消息会按照延迟消息的延迟时间等级 ( 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h ) 从第3级开始重试 ,每试一次如果还不成功则延迟等级加1。

比如 :一条消息消费失败 ,等待10s(第3级)进行重试 ,如果还没有被成功消费则延迟等级加1 ,即按第4级别延迟等 待 ,等30s继续进行重试 ,如此进行下去 ,直到重试16次。

当重试了16次还未被成功消费将会投递到死信队列 ,到达死信队列的消息将不再被消费

实际生产中的处理策略是什么呢 ?

实际生产中不会让消息重试这么多次 ,通常在重试一定的次数后将消息写入数据库 ,由另外单独的程序或人工去处理。

代码示例

例如:使用的Spring整合RocketMQ的方式 ,消费者实现RocketMQListener的onMessage方法 ,在此方法中实现处理策略的示例代码如下 :

public class ConsumerSimple implements RocketMQListener<MessageExt> 
    @Override
    public void onMessage(MessageExt messageExt) 
        //取出当前重试次数
        int reconsumeTimes = messageExt.getReconsumeTimes();
        //当大于一定的次数后将消息写入数据库 ,由单独的程序或人工去处理
        if (reconsumeTimes >= 2) 
            //将消息写入数据库 ,之后正常返回
            return;
        
        throw new RuntimeException(String.format("第%s次处理失败 ..", reconsumeTimes));
    

以上是关于RocketMQ 消费重试简介的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ:死信队列和消息幂等

RocketMQ源码(23)—DefaultMQPushConsumer消费者重试消息和死信消息源码

RocketMQ的死信队列

RocketMQ的死信队列

消息队列 - 死信、延迟、重试队列

RocketMQ - 如何用死信队列解决消费者异常