RocketMQ的消息重试(消息重投)

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ的消息重试(消息重投)相关的知识,希望对你有一定的参考价值。

详细介绍了RocketMQ的消息重试机制,RocketMQ的消息重试可以分为生产者重试和消费者重试两个部分。

1 生产者重试

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。

如下生产者属性可以设置消息重试策略:

  1. retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。如果因为超时,那么便不再重试。
  2. retryTimesWhenSendAsyncFailed:异步发送失败重试次数,默认为2,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。如果因为超时,那么便不再重试。
  3. retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。

两个注意点:

  1. 如果同步模式发送失败,则轮转到下一个Broker,如果异步模式发送失败,则只会在当前Broker进行重试。
  2. 发送消息超时时间默认3000毫秒,如果因为超时,那么便不再尝试重试。

2 消费者重试

Consumer消费消息失败后,要提供一种重试机制,令消息至少再消费一次。通常引起消息消费重试的时候包括两种情况:异常重试和超时重试。另外,Consumer在广播模式下重试失效。

2.1 异常重试

由于Consumer端逻辑出现了异常,导致没有返回SUCCESS状态,那么Broker就会在一段时间后尝试重试。

RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息,每个Consumer实例在启动的时候就默认订阅了该消费组的重试队列Topic。

考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大(实际上就是配置的延时队列的级别level)。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。

SubscriptionGroupConfig的配置中,默认最大重试16次(retryMaxTimes属性,默认重level3开始),但如果Consumer端逻辑出现异常,重试太多次也没有很大的意义,因此我们可以在代码中指定最大的重试次数,达到一定次数之后就返回SUCCESS,不再重试,对于失败的消息记录到数据库的表中,后续人工处理。

以上是针对并行消费的重试策略,即MessageListener为MessageListenerConcurrently。如果是串行消费策略,即MessageListener为MessageListenerOrderly,那么将会暂停后续消费进度,对于该消息无限的进行重试投递,这是需要特别注意的。

可通过consumer 客户端参数MaxReconsumeTimes设置最大重试次数,超过最大重试次数,消息将被转移到死信队列,范围是**-1 – 16之间。对于有序消费模式MessageListenerOrderly**,默认**-1**,表示无限次本地立即重试消费;对于并发无序消费模式MessageListenerConcurrently,默认16次延时消费,从Level3开始。

2.2 超时重试

如果Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回任何消费状态(成功或者失败),Broker就会认为Consumer消费超时,此时会发起超时重试,默认的消费超时时间为15分钟,时间足够的长了。

RocketMQ会认为该消息没有发送到Consumer端,会一直不停的发送,即无限重试。

相关文章:

RocketMQ

如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!

以上是关于RocketMQ的消息重试(消息重投)的主要内容,如果未能解决你的问题,请参考以下文章

Rocket重试机制,消息模式,刷盘方式

RocketMQ源码 — 八 RocketMQ消息重试

RocketMQ的消息重试

RocketMQ重试机制和消息幂

RocketMQ---RocketMQ重试机制

RocketMQ——消息重试