RocketMQ - Consumer消息零丢失方案

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ - Consumer消息零丢失方案相关的知识,希望对你有一定的参考价值。

参考技术A

通过前面的方案,我们可以保证消息一定会达到MQ中,也确保了MQ中的消息不会丢失,只要做到这一点,我们就可以保证下游消费者系统一定可以获取到消息,但是即使下游消费者获取了消息,这条消息数据就一定不会丢失吗?

答案是未必的,假设下游消费者系统已经获取了消息,但是消息目前还在他的内存里,还没有执行业务逻辑,此时他就直接提交了这条消息的offset到broker去说自己已经处理过了,然后这个时候下游消费者系统突然就宕机了,内存里的消息没有了,业务逻辑也没有执行,结果broker已经收到他提交的消息offset了,还以为他已经处理完这个消息了。等消费者系统重启后,就不会再次消费这个消息了,还是会出现数据丢失。

上面代码中的MessageListenerConcurrently这个类是注册一个监听器,当获取到一批消息之后,就会回调你得到这个监听器函数,让你来处理这一批消息。

然后当你处理完毕之后,返回consumeConcurrentlyStatus.CONSUME_SUCCESS作为消费成功的示意,告诉RocketMQ,这批消息我已经处理完毕了,RocketMQ会提交这批消息的offset到broker去。

所以如果对一批消息处理完毕了,同时提交消息的offset给broker,即使消费者系统宕机了,此时是不会丢失消息的。

如果一批消息还没处理完,没返回consumeConcurrentlyStatus.CONSUME_SUCCESS这个状态呢,此时消费者宕机了,RocketMQ其实会感知到这个个consumer已经挂了,会把没处理完的消息交给其他机器去处理,所以在这种情况下,消息也绝对不会丢失的。

需要警惕的地方:不能异步消费消息

我们不能在代码中对消息进行异步处理,因为一旦开启了异步处理,消费者的Listener就会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS状态。这就可能出现异步处理逻辑还没处理完消息,就已经提交这批消息的offset给broker了,认为已经处理结束了。这时如果异步处理逻辑报错,那消息同样会丢失了。

面试官再问我如何保证 RocketMQ 不丢失消息,这回我笑了!

最近看了 @JavaGuide 发布的一篇『面试官问我如何保证Kafka不丢失消息?我哭了!』,这篇文章承接这个主题,来聊聊如何保证 RocketMQ 不丢失消息。

0x00. 消息的发送流程

一条消息从生产到被消费,将会经历三个阶段:

技术图片

  • 生产阶段,Producer 新建消息,然后通过网络将消息投递给 MQ Broker
  • 存储阶段,消息将会存储在 Broker 端磁盘中
  • 消息阶段, Consumer 将会从 Broker 拉取消息

以上任一阶段都可能会丢失消息,我们只要找到这三个阶段丢失消息原因,采用合理的办法避免丢失,就可以彻底解决消息丢失的问题。

0x01. 生产阶段

生产者(Producer) 通过网络发送消息给 Broker,当 Broker 收到之后,将会返回确认响应信息给 Producer。所以生产者只要接收到返回的确认响应,就代表消息在生产阶段未丢失。

RocketMQ 发送消息示例代码如下:

 1DefaultMQProducer mqProducer=new DefaultMQProducer("test");
 2// 设置 nameSpace 地址
 3mqProducer.setNamesrvAddr("namesrvAddr");
 4mqProducer.start();
 5Message msg = new Message("test_topic" /* Topic */,
 6        "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
 7);
 8// 发送消息到一个Broker
 9try {
10    SendResult sendResult = mqProducer.send(msg);
11} catch (RemotingException e) {
12    e.printStackTrace();
13} catch (MQBrokerException e) {
14    e.printStackTrace();
15} catch (InterruptedException e) {
16    e.printStackTrace();
17}

send 方法是一个同步操作,只要这个方法不抛出任何异常,就代表消息已经发送成功

消息发送成功仅代表消息已经到了 Broker 端,Broker 在不同配置下,可能会返回不同响应状态:

  • SendStatus.SEND_OK
  • SendStatus.FLUSH_DISK_TIMEOUT
  • SendStatus.FLUSH_SLAVE_TIMEOUT
  • SendStatus.SLAVE_NOT_AVAILABLE

引用官方状态说明:

技术图片

上图中不同 broker 端配置将会在下文详细解释

另外 RocketMQ 还提供异步的发送的方式,适合于链路耗时较长,对响应时间较为敏感的业务场景。

 1DefaultMQProducer mqProducer = new DefaultMQProducer("test");
 2// 设置 nameSpace 地址
 3mqProducer.setNamesrvAddr("127.0.0.1:9876");
 4mqProducer.setRetryTimesWhenSendFailed(5);
 5mqProducer.start();
 6Message msg = new Message("test_topic" /* Topic */,
 7        "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
 8);
 9
10try {
11    // 异步发送消息到,主线程不会被阻塞,立刻会返回
12    mqProducer.send(msg, new SendCallback() {
13        @Override
14        public void onSuccess(SendResult sendResult) {
15            // 消息发送成功,
16        }
17
18        @Override
19        public void onException(Throwable e) {
20            // 消息发送失败,可以持久化这条数据,后续进行补偿处理
21        }
22    });
23} catch (RemotingException e) {
24    e.printStackTrace();
25} catch (InterruptedException e) {
26    e.printStackTrace();
27}

异步发送消息一定要注意重写回调方法,在回调方法中检查发送结果。

不管是同步还是异步的方式,都会碰到网络问题导致发送失败的情况。针对这种情况,我们可以设置合理的重试次数,当出现网络问题,可以自动重试。设置方式如下:

1// 同步发送消息重试次数,默认为 2
2mqProducer.setRetryTimesWhenSendFailed(3);
3// 异步发送消息重试次数,默认为 2
4mqProducer.setRetryTimesWhenSendAsyncFailed(3);

0x02. Broker 存储阶段

默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。

这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。

若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。

修改 Broker 端配置如下:

1## 默认情况为 ASYNC_FLUSH 
2flushDiskType = SYNC_FLUSH 

若 Broker 未在同步刷盘时间内(默认为 5s)完成刷盘,将会返回 SendStatus.FLUSH_DISK_TIMEOUT 状态给生产者。

集群部署

为了保证可用性,Broker 通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到 slave 节点。

默认方式下,消息写入 master 成功,就可以返回确认响应给生产者,接着消息将会异步复制到 slave 节点。

注:master 配置:flushDiskType = SYNC_FLUSH

此时若 master 突然宕机且不可恢复,那么还未复制到 slave 的消息将会丢失。

为了进一步提高消息的可靠性,我们可以采用同步的复制方式,master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。

异步复制与同步复制区别如下图:

技术图片

注:大家不要被上图误导,broker master 只能配置一种复制方式,上图只为解释同步复制的与异步复制的概念。

Broker master 节点 同步复制配置如下:

1## 默认为 ASYNC_MASTER 
2brokerRole=SYNC_MASTER

如果 slave 节点未在指定时间内同步返回响应,生产者将会收到 SendStatus.FLUSH_SLAVE_TIMEOUT 返回状态。

小结

结合生产阶段与存储阶段,若需要严格保证消息不丢失,broker 需要采用如下配置:

1## master 节点配置
2flushDiskType = SYNC_FLUSH
3brokerRole=SYNC_MASTER
4
5## slave 节点配置
6brokerRole=slave
7flushDiskType = SYNC_FLUSH

同时这个过程我们还需要生产者配合,判断返回状态是否是 SendStatus.SEND_OK。若是其他状态,就需要考虑补偿重试。

虽然上述配置提高消息的高可靠性,但是会降低性能,生产实践中需要综合选择。

0x03. 消费阶段

消费者从 broker 拉取消息,然后执行相应的业务逻辑。一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态给 Broker。

如果 Broker 未收到消费确认响应或收到其他状态,消费者下次还会再次拉取到该条消息,进行重试。这样的方式有效避免了消费者消费过程发生异常,或者消息在网络传输中丢失的情况。

消息消费的代码如下:

 1// 实例化消费者
 2DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
 3
 4// 设置NameServer的地址
 5consumer.setNamesrvAddr("namesrvAddr");
 6
 7// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
 8consumer.subscribe("test_topic", "*");
 9// 注册回调实现类来处理从broker拉取回来的消息
10consumer.registerMessageListener(new MessageListenerConcurrently() {
11    @Override
12    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
13        // 执行业务逻辑
14        // 标记该消息已经被成功消费
15        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
16    }
17});
18// 启动消费者实例
19consumer.start();

以上消费消息过程的,我们需要注意返回消息状态。只有当业务逻辑真正执行成功,我们才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS。否则我们需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后再重试。

0x04. 总结

看完 RocketMQ 不丢消息处理办法,回头再看这篇 kafka,有没有发现,两者解决思路是一样的,区别就是参数配置不一样而已。

所以下一次,面试官再问你 XX 消息队列如何保证不丢消息?如果你没用过这个消息队列,也不要哭,微笑面对他,从容给他分析那几步会丢失,然后大致解决思路。

最后我们还可以说出我们的思考,虽然提高消息可靠性,但是可能导致消息重发,重复消费。所以对于消费客户端,需要注意保证幂等性

技术图片
但是要注意了,这时面试官可能就会跟你的话题,让你来聊聊如何保证幂等性,一定先想好再说哦。

什么?你还不知道如何实现幂等?那就赶紧关注@程序通事,后面文章我们就来聊聊幂等这个话题。

技术图片

0x05. Reference

以上是关于RocketMQ - Consumer消息零丢失方案的主要内容,如果未能解决你的问题,请参考以下文章

33 基于RocketMQ设计的全链路消息零丢失方案总结

27 发送消息零丢失方案:RocketMQ事务消息的实现流程分析

32 Consumer消息零丢失方案:手动提交offset + 自动故障转移

面试官再问我如何保证 RocketMQ 不丢失消息,这回我笑了!

RocketMQ的消息可靠性(防止消息丢失)

RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信