RocketMQ的顺序消息(顺序消费)

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ的顺序消息(顺序消费)相关的知识,希望对你有一定的参考价值。

简单介绍了消息有序性的概念,以及RocketMQ如何实现消息的顺序消费。

1 消息的有序性

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。

顺序消息分为全局顺序消息与部分顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

如果想要实现全局顺序消息,那么只能使用一个队列,以及单个生产者,这是会严重影响性能。

因此,我们常说的顺序消息通常是只的部分顺序消息,就上面的例子来说,我们不用管不同的订单ID的消息之间的总体消费顺序,只需要保证同样订单ID的消息能按照订单创建、订单付款、订单完成这个顺序消费就可以了。

顺序消费实际上有两个核心点,一个是生产者有序存储,另一个是消费者有序消费。

2 生产者有序发送

先看如何实现生产者有序存储。我们知道RocketMQ中生产者生产的消息会放置在某个队列中,基于队列先进先出的特性天然的可以保证存入队列的消息顺序和拉取的消息顺序是一致的,因此,我们只需要保证一组相同的消息按照给定的顺序存入同一个队列中,就能保证生产者有序存储。

普通发送消息的模式下,生产者会采用轮询的方式将消费均匀的分发到不同的队列中,然后被不同的消费者消费,因为一组消息在不同的队列,此时就无法使用 RocketMQ 带来的队列有序特性来保证消息有序性了。

这个问题很好解决,因为RocketMQ支持生产者在投放消息的时候自定义投放策略,我们实现一个MessageQueueSelector接口,使用Hash取模法来保证同一个订单在同一个队列中就行了,即通过订单ID%队列数量得到该ID的订单所投放的队列在队列列表中的索引,然后该订单的所有消息都会被投放到这个队列中。

生产者发送消息的方法中就有一些添加队列选择器的方法,保证消息发送顺序。

比如只有两个队列,那么订单ID为1,2,3的三组消息中,1、3组消息存放于第一个队列,而2组消息存放于第二个队列,如下图是一种消息可能的消息存放顺序:

根据上图可以,上面的方法可以实现一组消息被顺序的存放,不同组的消息之间的顺序无法保证,这就是部分顺序。

另外,顺序消息必须使用同步发送的方式才能保证生产者发送的消息有序。

实际上,采用队列选择器的方法不能保证消息的严格顺序,我们的目的是将消息发送到同一个队列中,如果某个broker挂了,那么队列就会减少一部分,如果采用取余的方式投递,将可能导致同一个业务中的不同消息被发送到不同的队列中,导致同一个业务的不同消息被存入不同的队列中,短暂的造成部分消息无序。同样的,如果增加了服务器,那么也会造成短暂的造成部分消息无序。

3 消费者有序消费

生产者有序存储实现了,那么该如何实现消费者有序消费呢?RockerMQ的MessageListener回调函数提供了两种消费模式,有序消费模式MessageListenerOrderly和并发消费模式MessageListenerConcurrently。

在消费的时候,还需要保证消费者注册MessageListenerOrderly类型的回调接口实现顺序消费,如果消费者采用Concurrently并行消费,则仍然不能保证消息消费顺序。

实际上,每一个消费者的的消费端都是采用线程池实现多线程消费的模式,即消费端是多线程消费。虽然MessageListenerOrderly被称为有序消费模式,但是仍然是使用的线程池去消费消息。

MessageListenerConcurrently是拉取到新消息之后就提交到线程池去消费,而MessageListenerOrderly则是通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据。

MessageListenerOrderly的加锁机制:

  1. 消费者在进行某个队列的消息拉取时首先向Broker服务器申请队列锁,如果申请到琐,则拉取消息,否则放弃消息拉取,等到下一个队列负载周期(20s)再试。这一个锁使得一个MessageQueue同一个时刻只能被一个消费客户端消费,防止因为队列负载均衡导致消息重复消费。
  2. 假设消费者对messageQueue的加锁已经成功,那么会开始拉取消息,拉取到消息后同样会提交到消费端的线程池进行消费。但在本地消费之前,会先获取该messageQueue对应的锁对象,每一个messageQueue对应一个锁对象,获取到锁对象后,使用synchronized阻塞式的申请线程级独占锁。这一个锁使得来自同一个messageQueue的消息在本地的同一个时刻只能被一个消费客户端中的一个线程顺序的消费。
  3. 在本地加synchronized锁成功之后,还会判断如果是广播模式,则直接进行消费,如果是集群模式,则判断如果messagequeue没有锁住或者锁过期(默认30000ms),那么延迟100ms后再次尝试向Broker 申请锁定messageQueue,锁定成功后重新提交消费请求。

目前来说,消费者使用MessageListenerOrderly顺序消费有个两个问题:

  1. 使用了很多的锁,降低了吞吐量。
  2. 前一个消息消费阻塞时后面消息都会被阻塞。如果遇到消费失败的消息,会自动对当前消息进行重试(每次间隔时间为1秒),无法自动跳过,重试最大次数是Integer.MAX_VALUE,这将导致当前队列消费暂停,因此通常需要设定有一个最大消费次数,以及处理好所有可能的异常情况。

相关文章:

RocketMQ

如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!

以上是关于RocketMQ的顺序消息(顺序消费)的主要内容,如果未能解决你的问题,请参考以下文章

RocketMq如何保证消费顺序

RocketMQ的顺序消息(顺序消费)

RocketMQ-消息消费模式 顺序消费

详解RocketMQ 顺序消费机制

rocketmq之顺序消费

RocketMQ事务消费和顺序消费详解