RocketMQ——消息重试

Posted 1013wang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ——消息重试相关的知识,希望对你有一定的参考价值。

文章目录

一、 Producer端重试

二、 Consumer端重试

1. Exception

2. Timeout总结

对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制。很显示,消息重试分为2种:Producer端重试和Consumer端重试。

一、 Producer端重试

生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。
这种消息失败重试我们可以手动设置发送失败重试的次数,看一下代码:

/**
 * Producer,发送消息
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("group_name");
        producer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
        producer.setRetryTimesWhenSendFailed(3);
        producer.start();

        for (int i = 0; i < 100; i++) {
            try {
                Message msg = new Message("TopicTest",                 // topic
                        "TagA",                                     // tag
                        ("HelloWorld - RocketMQ" + i).getBytes()    // body
                );
                SendResult sendResult = producer.send(msg, 1000);
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}
*生产者端失败重试*
 
上图代码示例的处理手段是:如果该条消息在1S内没有发送成功,那么重试3次。
 
producer.setRetryTimesWhenSendFailed(3);  //失败的情况重发3次
producer.send(msg, 1000);  //消息在1S内没有发送成功,就会重试
 
二、 Consumer端重试
消费者端的失败,分为2种情况,一个是exception,一个是timeout。
1. Exception
消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。
这里涉及到一些问题,需要我们思考下,比如,消费者消费消息的状态有哪些定义?如果失败,MQ将采取什么策略进行重试?假设一次性批量PUSH了10条,其中某条数据消费异常,那么消息重试是10条呢,还是1条呢?而且在重试的过程中,需要保证不重复消费吗?
public enum ConsumeConcurrentlyStatus {
    /**
     * Success consumption
     */
    CONSUME_SUCCESS,
    /**
     * Failure consumption,later try to consume
     */
    RECONSUME_LATER;
}
*ConsumeConcurrentlyStatus枚举的源码*

通过查看源码,消息消费的状态,有2种,一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)

技术图片

 

                                                               RECONSUME_LATER的策略

在启动broker的过程中,可以观察到上图日志,你会发现RECONSUME_LATER的策略:如果消费失败,那么1S后再次消费,如果失败,那么5S后,再次消费,…直至2H后如果消费还失败,那么该条消息就会终止发送给消费者了!
RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要再重试呢,因为重试解决不了问题了!这该如何做呢?
看一段代码:

/**
 * Consumer,订阅消息
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
        consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    MessageExt msg = msgs.get(0);
                    String msgbody = new String(msg.getBody(), "utf-8");
                    System.out.println(msgbody + " Receive New Messages: " + msgs);
                    if (msgbody.equals("HelloWorld - RocketMQ4")) {
                        System.out.println("======错误=======");
                        int a = 1 / 0;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    if (msgs.get(0).getReconsumeTimes() == 3) {
                        // 该条消息可以存储到DB或者LOG日志中,或其他处理方式
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
                    } else {
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer Started.");
    }
}

技术图片

                                                                  RECONSUME_LATER的重试效果
观察上图发现,HelloWorld - RocketMQ4的消息的***reconsumeTimes***属性值发生了变化,其实这个属性就代表了消息重试的次数!因此我们可以通过reconsumeTimes属性,让MQ超过了多少次之后让他不再重试,而是记录日志等处理,也就是上面代码catch中的内容。
2. Timeout
比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!(比如集群中一个broker失败,就尝试另一个broker)
延续Exception的思路,也就是消费端没有给RocketMQ返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS或return ConsumeConcurrentlyStatus.RECONSUME_LATER,这样的就认为没有到达Consumer端。
下面进行模拟:
1)消费端有consumer1和consumer2这样一个集群。
2)consumer1端的业务代码中暂停1分钟并且不发送接收状态给RocketMQ。

public class Consumer1 {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
        consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("Topic1", "Tag1 || Tag2 || Tag3");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {                    
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(),"utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic:" + topic + " ,tags:" + tags + " ,msg:" + msgBody);
                        
                        // 表示业务处理时间
                        System.out.println("=========开始暂停==========");
                        Thread.sleep(60000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer Started.");
    }
}
Consumer1端Timeout异常测试代码

3)启动consumer1和consumer2。
4)启动Producer,只发送一条数据。

5)关闭consumer1。

总结

Producer端没什么好说的,Consumer端值得注意。对于消费消息而言,存在2种指定的状态(成功 OR 失败重试),如果一条消息在消费端处理没有返回这2个状态,那么相当于这条消息没有达到消费者,势必会再次发送给消费者!也即是消息的处理必须有返回值,否则就进行重发。

我这儿整理了比较全面的JAVA相关的面试资料,

技术图片


需要领取面试资料的同学,请加群:473984645

技术图片

 

 

以上是关于RocketMQ——消息重试的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 消费重试简介

RocketMQ的消息重试(消息重投)

RocketMQ的消息重试

RocketMQ——消息重试

RocketMQ重试机制和消息幂

RocketMQ高可用设计之消息重试机制