RocketMQ 消费重试简介
Posted 流楚丶格念
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 消费重试简介相关的知识,希望对你有一定的参考价值。
文章目录
消费重试
什么是消费重试
当消息发送到Broker成功 ,在被消费者消费时如果消费者没有正常消费 ,此时消息会重试消费。消费重试存在两种场景 :
场景1:消息没有被消费者接收 ,比如消费者与broker存在网络异常。此种情况消息会一直被消费重试。
场景2:当消息已经被消费者成功接收 ,但是在进行消息处理时出现异常 ,消费端无法向Broker返回成功 ,这种情况下 RocketMQ会不断重试。
问题:针对第二种消费重试的场景 ,borker是怎么知道重试呢 ?
消费者在消费消息成功会向broker返回成功状态,否则会不断进行消费重试。
处理策略
问题:当消息在消费时出现异常 ,此时消息被不断重试消费。 RocketMQ会一直重试消费吗 ?
答案是不会的
消息会按照延迟消息的延迟时间等级 ( 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h ) 从第3级开始重试 ,每试一次如果还不成功则延迟等级加1。
比如 :一条消息消费失败 ,等待10s(第3级)进行重试 ,如果还没有被成功消费则延迟等级加1 ,即按第4级别延迟等 待 ,等30s继续进行重试 ,如此进行下去 ,直到重试16次。
当重试了16次还未被成功消费将会投递到死信队列 ,到达死信队列的消息将不再被消费。
实际生产中的处理策略是什么呢 ?
实际生产中不会让消息重试这么多次 ,通常在重试一定的次数后将消息写入数据库 ,由另外单独的程序或人工去处理。
代码示例
例如:使用的Spring整合RocketMQ的方式 ,消费者实现RocketMQListener的onMessage方法 ,在此方法中实现处理策略的示例代码如下 :
public class ConsumerSimple implements RocketMQListener<MessageExt>
@Override
public void onMessage(MessageExt messageExt)
//取出当前重试次数
int reconsumeTimes = messageExt.getReconsumeTimes();
//当大于一定的次数后将消息写入数据库 ,由单独的程序或人工去处理
if (reconsumeTimes >= 2)
//将消息写入数据库 ,之后正常返回
return;
throw new RuntimeException(String.format("第%s次处理失败 ..", reconsumeTimes));
以上是关于RocketMQ 消费重试简介的主要内容,如果未能解决你的问题,请参考以下文章