RocketMQ笔记:顺序消息

Posted 无虑的小猪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ笔记:顺序消息相关的知识,希望对你有一定的参考价值。

一、什么是顺序消息

  消息有序指的是可以按照消息的发送顺序来消费(FIFO)。

  顺序消息是 RocketMQ 提供的一种消息类型,支持消费者按照发送消息的先后顺序获取消息。顺序消息在发送、存储和投递的处理过程中,强调多条消息间的先后顺序关系。RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

二、顺序消息的特性

  1、消息消费失败或消费超时,会触发服务端重试逻辑,重试消息属于新的消息,原消息的生命周期已结束;

  2、顺序消息消费失败进行消费重试时,为保障消息的顺序性,后续消息不可被消费,必须等待前面的消息消费完成后才能被处理。

  3、顺序消息仅支持使用MessageType为FIFO的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。

三、RocketMQ如何保证消息的顺序

  RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

  RocketMQ在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);RocketMQ在消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。

  全局有序:发送和消费参与的queue只有一个,则是全局有序。

  分区有序:多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

四、RocketMQ保证消息顺序的条件

RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性。

1、生产顺序性

  RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。

1、生产者顺序发送的前提

1.1、单一生产者

  消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。

1.2、串行发送

  RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

2、生产者顺序发送

满足上述条件,将顺序消息发送至 RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

  ·相同消息组的消息按照先后顺序被存储在同一个队列。

  ·不同消息组的消息可以混合在同一个队列中,且不保证连续。

  

  如上图所示,消息组1和消息组4的消息混合存储在队列1中,RocketMQ 保证消息组1中的消息G1-M1、G1-M2、G1-M3是按发送顺序存储,且消息组4的消息G4-M1、G4-M2也是按顺序存储,但消息组1和消息组4中的消息不涉及顺序关系。

2、消费顺序性

  RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。

2.1、投递顺序

  RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递。

  消费者类型为PushConsumer时, RocketMQ 保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由应用程序保证。

2.2、有限重试

  RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

  对于严格保证消费顺序的场景,需要合理设置重试次数,避免消息乱序。

3、生产顺序性和消费顺序性组合

  如果消息需要严格按照先进先出(FIFO)的原则处理,即先发送的先消费、后发送的后消费,则必须要同时满足生产顺序性和消费顺序性。

  一般业务场景下,同一个生产者可能对接多个下游消费者,不一定所有的消费者业务都需要顺序消费,可以将生产顺序性和消费顺序性进行差异化组合,应用于不同的业务场景。

生产顺序
消费顺序
顺序性效果
设置消息组,保证消息顺序发送。
顺序消费
按照消息组粒度,严格保证消息顺序。 同一消息组内的消息的消费顺序和发送顺序完全一致。
设置消息组,保证消息顺序发送。
并发消费
并发消费,尽可能按时间顺序处理。
未设置消息组,消息乱序发送。
顺序消费
按队列存储粒度,严格顺序。 基于 Apache RocketMQ 本身队列的属性,消费顺序和队列存储的顺序一致,但不保证和发送顺序一致。
未设置消息组,消息乱序发送。
并发消费
并发消费,尽可能按照时间顺序处理。

五、顺序消息的生命周期

  

1、初始化

  消息被生产者构建并完成初始化,待发送到RocketMQ服务端。

2、待消费

  消息被发送到服务端,等待消费者消费。

3、消费中

  消息被消费者火球,RocketMQ会等待消费者完成消息并提交消费结果,若一定时间后未收到消费者的响应,RocketMQ会对消息进行重试处理。

4、消费提交

  消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已被处理(消费成功/失败)。RocketMQ默认支持保留消息,此时消息数据不会被立即删除,逻辑标记已消费。

在消息保存时间到期或存储空间不足被删除前,消费者可以回溯消息重新消费。

5、消息删除

  RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

六、顺序消息示例demo

  工具类详见:RocketMQ笔记(六):示例代码工具类

1、订单实体

 /**
  * @Description: 订单信息
  */
 public class Indent 
 
     /**
      * 订单ID
      */
     private String orderId;
 
     /**
      * 顺序
      */
     private String order;
 
     public Indent(String orderId, String order) 
         this.orderId = orderId;
         this.order = order;
     
 
     public String getOrderId() 
         return orderId;
     
 
     public void setOrderId(String orderId) 
         this.orderId = orderId;
     
 
     public String getOrder() 
         return order;
     
 
     public void setOrder(String order) 
         this.order = order;
     
 

2、顺序消息生产者

 

 import com.snails.rmq.common.RMQConstant;
 import com.snails.rmq.utils.ClientUtils;
 import com.snails.rmq.utils.DateUtils;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import java.util.ArrayList;
 import java.util.List;
 
 /**
  * @Description: 发送顺序消息
  */
 public class OrderProducer 
 
     public static void main(String[] args) throws Exception 
         // 获取生产者实例
         DefaultMQProducer producer =
                 ClientUtils.gainProducerInstance(RMQConstant.ORDER_GROUP, RMQConstant.NAEMSRV_ADDR);
         // 消息标签
         String[] tags = new String[]RMQConstant.MSG_TAG_A, RMQConstant.MSG_TAG_B, RMQConstant.MSG_TAG_C;
         // 订单列表
         List<Indent> indentList = OrderProducer.buildIndents();
 
         for (int i = 0; i < indentList.size(); i++) 
             // 获取消息
             String msgBody = "顺序消息, ".concat(DateUtils.now().concat(":")) + indentList.get(i);
             Message msg = new Message(RMQConstant.ORDER_TOPIC,
                     tags[i % tags.length], "RMQKEYS" + i, msgBody.getBytes());
 
             // 发送消息
             SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
                 @Override
                 public MessageQueue select(List<MessageQueue> mqList, Message msg, Object obj) 
                     //根据id选择要发送的消息队列queue
                     long indentId = (Long) obj;
                     long index = indentId % mqList.size();
                     return mqList.get((int) index);
                 
             , Long.valueOf(indentList.get(i).getOrderId()));
 
             // 发送结果打印
             System.out.printf("发送状态:%s, 消息队列id:%d, 消息内容:%s %n",
                     sendResult.getSendStatus(),
                     sendResult.getMessageQueue().getQueueId(),
                     msgBody);
         
 
         // 关闭生产实例
         ClientUtils.shutdownProducer(producer);
     
 
 
     /**
      * 生成模拟订单数据
      */
     private static List<Indent> buildIndents() 
         List<Indent> indentList = new ArrayList<Indent>();
 
         Indent indentDemo = new Indent("15103111039", "0");
         indentList.add(indentDemo);
 
         indentDemo = new Indent("15103111065", "1");
         indentList.add(indentDemo);
 
         indentDemo = new Indent("15103111039", "2");
         indentList.add(indentDemo);
 
         indentDemo = new Indent("15103117235", "3");
         indentList.add(indentDemo);
 
         indentDemo = new Indent("15103111065", "4");
         indentList.add(indentDemo);
 
         indentDemo = new Indent("15103117235", "5");
         indentList.add(indentDemo);
 
         indentDemo = new Indent("15103111065", "6");
         indentList.add(indentDemo);
 
         indentDemo = new Indent("15103111039", "7");
         indentList.add(indentDemo);
 
         indentDemo = new Indent("15103117235", "8");
         indentList.add(indentDemo);
 
         indentDemo = new Indent("15103111039", "9");
         indentList.add(indentDemo);
 
         return indentList;
     
 

 

3、顺序消息消费者

 import com.snails.rmq.common.RMQConstant;
 import com.snails.rmq.utils.ClientUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
 
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 /**
  * SUCCESS 确认消费
  * SUSPEND_CURRENT_QUEUE_A_MOMENT  稍后消费
  *
  * @Description: 消费顺序消息
  */
 public class OrderConsumer 
 
     public static void main(String[] args) 
         // 获取消费者实例
         String subExpression = RMQConstant.MSG_TAG_A.concat(" || ").concat(RMQConstant.MSG_TAG_B).concat(" || ").concat(RMQConstant.MSG_TAG_C);
         DefaultMQPushConsumer consumer =
                 ClientUtils.gainConsumerInstance(RMQConstant.ORDER_GROUP, RMQConstant.NAEMSRV_ADDR, RMQConstant.ORDER_TOPIC, subExpression);
 
         /**
          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
          * 如果非第一次启动,那么按照上次消费的位置继续消费
          */
         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
 
         // 注册回调实现类来处理从broker拉取回来的消息
         consumer.registerMessageListener(new MessageListenerOrderly() 
 
             Random random = new Random();
 
             @Override
             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) 
                 context.setAutoCommit(true);
                 for (MessageExt msg : msgs) 
                     // 每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                     System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                     try 
                         // TODO 业务逻辑处理
                         TimeUnit.SECONDS.sleep(random.nextInt(10));
                      catch (Exception e) 
                         e.printStackTrace();
                     
                 
 
                 return ConsumeOrderlyStatus.SUCCESS;
             
         );
 
         // 启动消费者实例
         ClientUtils.startupConsumer(consumer);
     
 

 

RocketMQ源码 — 十 RocketMQ顺序消息

RocketMQ本身支持顺序消息,在使用上发送顺序消息和非顺序消息有所区别

发送顺序消息

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

send方法带有参数MessageQueueSelector,MessageQueueSelector是让用户自己决定消息发送到哪一个队列,如果是局部消息的话,用来决定消息与队列的对应关系。

顺序消息消费

consumer.registerMessageListener(new MessageListenerOrderly() {
    AtomicLong consumeTimes = new AtomicLong(0);

    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        context.setAutoCommit(false);
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

从使用上可以推断顺序消息需要从发送到消费整个过程中保证有序,所以顺序消息具体表现为

  • 发送消息是顺序的
  • broker存储消息是顺序的
  • consumer消费是顺序的

下面分别看看rmq怎么实现顺序消息

发送顺序消息

因为broker存储消息有序的前提是producer发送消息是有序的,所以这两个结合在一起说。

消息发布是有序的含义:producer发送消息应该是依次发送的,所以要求发送消息的时候保证:

  • 消息不能异步发送,同步发送的时候才能保证broker收到是有序的。
  • 每次发送选择的是同一个MessageQueue

同步发送

从刚开始的例子中发送消息的时候,会调用下面的方法

public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}

CommunicationMode.SYNC表明了producer发送消息的时候是同步发送的。同步发送表示,producer发送消息之后不会立即返回,会等待broker的response。

broker收到producer的请求之后虽然是启动线程处理的,但是在线程中将消息写入commitLog中以后会发送response给producer,producer在收到broker的response并且是处理成功之后才算是消息发送成功。

同一个MessageQueue

为了保证broker收到消息也是顺序的,所以producer只能向其中一个队列发送消息。因为只有是同一个队列才能保证消息是发往同一个broker,只有同一个broker处理发来的消息才能保证顺序。所以发送顺序消息的时候需要用户指定MessageQueue,在send调用过程中会调用下面的方法,下面的方法中回调了用户指定的select queue的方法

private SendResult sendSelectImpl(
    Message msg,
    MessageQueueSelector selector,
    Object arg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);

    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        try {
            // 调用用户指定的select方法来选出一个queue,如果是全局顺序,用户必须保证自己选出的queue是同一个
            mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        } catch (Throwable e) {
            throw new MQClientException("select message queue throwed exception.", e);
        }

        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
        } else {
            throw new MQClientException("select message queue return null.", null);
        }
    }

    throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

这样每次发送的消息都是同一个MessageQueue,也就是都会发送到同一个broker,这个发送消息的过程都保证了顺序,也就保证了broker存储在CommitLog中的消息也是顺序的。

顺序消费

保证了broker中物理存储的消息是顺序的,只要保证消息消费是顺序的就能保证整个过程是顺序消息了。

还是开始的例子中,顺序消费和普通消费的listener是不一样的,顺序消费需要实现的是下面这个接口

org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly

在consumer启动的时候会根据listener的类型判断应该使用哪一个service来消费

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    // 顺序消息
    this.consumeOrderly = true;
    this.consumeMessageService =
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    // 非顺序消息
    this.consumeOrderly = false;
    this.consumeMessageService =
        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

consumer拉取消息是按照offset拉取的,所以consumer能保证拉取到consumer的消息是连续有序的,但是consumer拉取到消息后又启动了线程去处理消息,所以就不能保证先拉取到的消息先处理

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
// org.apache.rocketmq.client.consumer.PullCallback#onSuccess
// 将拉取到的消息放入ProcessQueue
boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 这里的consumeMessageService就是ConsumeMessageOrderlyService
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispathToConsume);


// org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispathToConsume) {
    if (dispathToConsume) {
        // 拉取到的消息构造ConsumeRequest,然后放入线程池等待执行
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}

上面将请求放入线程池了,所以线程执行的顺序又不确定了,那么consumer消费就变成无序的了吗?

答案显然是不会变成无序的,因为上面有一行关键的代码

processQueue.putMessage(pullResult.getMsgFoundList());

先说一下ProcessQueue这个关键的数据结构。一个MessageQueue对应一个ProcessQueue,是一个有序队列,该队列记录一个queueId下所有从brokerpull回来的消息,如果消费成功了就会从队列中删除。ProcessQueue有序的原因是维护了一个TreeMap

private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

msgTreeMap:里面维护了从broker pull回来的所有消息,TreeMap是有序的,key是Long类型的,没有指定comparator,默认是将key强转为Comparable,然后进行比较,因为key是当前消息的offset,而Long实现了Comparable接口,所以msgTreeMap里面的消息是按照offset排序的。

所以是ProcessQueue保证了拉取回来的消息是有序的,继续上面说到的启动线程执行ConsumeRequest.run方法来消费消息

Override
public void run() {
    // 保证一个队列只有一个线程访问,因为顺序消息只有一个队列,也就保证了只有一个线程消费
    final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
    synchronized (objLock) {
        if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
            || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
            final long beginTime = System.currentTimeMillis();
            for (boolean continueConsume = true; continueConsume; ) {
                // 从ProcessQueue中获取消息进行消费,获取出来的消息也是有序的
                final int consumeBatchSize =
                    ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                // 省略中间代码
}

从ProcessQueue中获取出来的消息是有序的,consumer保证了消费的有序性。

总结

rmq从发送消息、broker存储消息到consumer消费消息整个过程中,环环相扣,最终保证消息是有序的。

以上是关于RocketMQ笔记:顺序消息的主要内容,如果未能解决你的问题,请参考以下文章

阿里P9封神之作!RocketMQ核心笔记疯传Ali内网

RocketMQ源码 — 十 RocketMQ顺序消息

RocketMQ源码 — 十 RocketMQ顺序消息

RocketMQ源码 — 十 RocketMQ顺序消息

RocketMQ的延迟消息和顺序消息

RocketMq如何保证消费顺序