RocketMQ 不丢失消息的方式

Posted OoZzzy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 不丢失消息的方式相关的知识,希望对你有一定的参考价值。

文章目录

1.RocketMq架构


Producer,Consumer,Brocker,Name Server

2.消息不丢失

1.Producer发送消息
2.Brocker保存消息
3.Consumer 消费消息
4.Brocker主从切换

2.1 同步发送

public void send() throws Exception 
    String message = "test producer";
    Message sendMessage = new Message("topic1", "tag1", message.getBytes());
    sendMessage.putUserProperty("name1","value1");
    SendResult sendResult = null;

    DefaultMQProducer producer = new DefaultMQProducer("testGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.setRetryTimesWhenSendFailed(3);
    try 
        sendResult = producer.send(sendMessage);
     catch (Exception e) 
        e.printStackTrace();
    
    if (sendResult != null) 
        System.out.println(sendResult.getSendStatus());
    

同步发送会返回状态码

1.SEND_OK:消息发送成功。需要注意的是,消息发送到 broker 后,还有两个操作:消息刷盘和消息同步到 slave 节点,默认这两个操作都是异步的,只有把这两个操作都改为同步,SEND_OK 这个状态才能真正表示发送成功。

2.FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时。

3.FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时。

4.SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用。

根据状态码可以重复消息,重试的数量为3

2.2 异步消息

public void sendAsync() throws Exception 
    String message = "test producer";
    Message sendMessage = new Message("topic1", "tag1", message.getBytes());
    sendMessage.putUserProperty("name1","value1");

    DefaultMQProducer producer = new DefaultMQProducer("testGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.setRetryTimesWhenSendFailed(3);
    producer.send(sendMessage, new SendCallback() 
        @Override
        public void onSuccess(SendResult sendResult) 
            
        

        @Override
        public void onException(Throwable e) 
            // TODO 可以在这里加入重试逻辑
        
    );

异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。

2.3 刷盘机制

  1. 异步刷盘:默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。
  2. 同步刷盘:消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置

2.4 Broker 多副本和高可用

Broker 为了保证高可用,采用一主多从的方式部署。


消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。

这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:

brokerRole=SYNC_MASTER

同步复制:

  1. slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;
  2. master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;
  3. slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;
  4. master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。

2.5 消息确认

public void consume() throws Exception 
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    consumer.subscribe("topic1", "tag1");
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> 
        try
            System.out.printf("Receive New Messages: %s", msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        catch (Exception e)
            e.printStackTrace();
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        
    );
    consumer.start();

如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。

2.6 Consumer 重试

  1. 返回 RECONSUME_LATER
  2. 返回 null
  3. 抛出异常

Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。

  1. Broker 默认最多重试 16 次,如果重试 16 次都失败,就把这条消息放入死信队列,Consumer 可以订阅死信队列进行消费。
  2. 重试只有在集群模式(MessageModel.CLUSTERING)下生效,在广播模式(MessageModel.BROADCASTING)下是不生效的。
  3. Consumer 端一定要做好幂等处理。

Consumer 给Brocker 结束重试, 这里是count = 3 的时候结束重试

int count = ((MessageExt) msgs).getReconsumeTimes();
if (count > 2) 
    //TODO 把消息写入本地存储
    System.out.println("重试次数超过3次");
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

2.7 事务消息

RocketMq支持事务

  1. Producer 发送half消息
  2. Brocker先把消息写入topic, RMQ_SYS_TRANS_HALF_TOPIC的队列,然后返回half消息给producer成功
  3. Producer 执行本地事务,成功后给 Broker 发送 commit 命令, 或者rollback
  4. Broker 收到 commit 请求后把消息状态更改为成功并把消息推到真正的 topic;
  5. Consumer拉取消息进行消费

public class ProducerTransactionListenerImpl implements TransactionListener 

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) 
        /**
         * 这里执行本地事务,执行成功返回LocalTransactionState.COMMIT_MESSAGE,执行失败返回
         * LocalTransactionState.ROLLBACK_MESSAGE,如果返回LocalTransactionState.UNKNOW,
         * Broker会回来查询,所以需要记录事务执行状态
         */
        return LocalTransactionState.COMMIT_MESSAGE;
    

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) 
        /**
         * 这里查询事务执行状态,根据事务状态返回LocalTransactionState.COMMIT_MESSAGE或
         * LocalTransactionState.ROLLBACK_MESSAGE,如果没有查询到返回LocalTransactionState.UNKNOW,
         * Broker会再次查询,可以记录查询次数,超过次数后返回ROLLBACK_MESSAGE
         */
        return LocalTransactionState.UNKNOW;
    

2.8 消息索引

RocketMQ 核心的数据文件有 3 个:CommitLog、ConsumeQueue 和 Index, Index是一个索引文件


查找消息时,首先根据消息 key 的 hashcode 计算出 Hash 槽的位置,然后读取 Hash 槽的值计算 Index 条目的位置,从Index 条目位置读取到消息在 CommitLog 文件中的 offset,从而查找到消息。

Producer 发送消息时,可以指定一个 key

Message sendMessage = new Message("topic1", "tag1", message.getBytes());
sendMessage.setKeys("weiyiid");

这样可以通过 RocketMQ 提供的命令或者管理控制台来查询消息是否发送成功。

2.9 极端

极端情况比如Rocketmq集群挂了, Producer发送消息一定失败, 可以在Producer做降级, 把发送的消息先存储在磁盘或者数据库中, 然后等到Rocketmq集群恢复了再推送消息

以上是关于RocketMQ 不丢失消息的方式的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 不丢失消息的方式

RocketMQ:死信队列和消息幂等

RocketMQ的死信队列

RocketMQ的死信队列

RocketMQ 保证消息不丢失

RocketMQ源码(23)—DefaultMQPushConsumer消费者重试消息和死信消息源码