一个看似简单的复杂问题 :分布式消息队列RocketMQ-- 消息的“顺序消费”

Posted 程序员创业记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个看似简单的复杂问题 :分布式消息队列RocketMQ-- 消息的“顺序消费”相关的知识,希望对你有一定的参考价值。

顺序消息看似很简单:

理想情况如下:

①、producer 顺序发送1,2,3

②、broker 顺序存储 1,2,3

③、consumer 顺序消费1,2,3


以上,皆大欢喜。


这个特性看起来很简单,为什么  RocketMQ 默认不保证呢?


下面就从三个封面来讨论要保证顺序消息,是多么的困难,或者说不可能。


在 RocketMQ 中,一个topic默认有 4个 MessageQueue 。


数据结构:

this.messageQueueList = {ArrayList@3127}  size = 4

 0 = {MessageQueue@3154} "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0]"

 1 = {MessageQueue@3155} "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1]"

 2 = {MessageQueue@3156} "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2]"

 3 = {MessageQueue@3157} "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3]"


①、producer 顺序发送1,2,3

如果2,3,发送成功了,过了一会1发送失败了,重新发送1,那么就不是顺序消息了。

换个角度,为了保证1发送成功才能发送2,3,得损耗多少性能和资源。

性能可想而知。


②、broker顺序存储1,2,3

在存储端,为了保证消息顺序存储,需要保证一下2个要求:

1、消息不能分区,在Kafka中,它叫做partition;在RocketMQ中,叫 queue ;在RocketMQ ,一个topic默认有4个queue。

如果你有多个队列,那么一个topic的消息,会被分散到多个队列中,自然不能保证顺序存储。


2、即使满足1的要求,一个topic的消息只有一个队列,那么还有两个问题。

    A、高可用问题: 比如当前机器挂掉了,上面的消息还没消费完,此时切换到其他机器,高可用保证了,但是消息顺序乱掉了,所以得保证消息是同步复制

    B、那么还得保证切机器之前,挂掉的机器上面,剩余的消息都得全部消费完,这个就非常困难了。


③、consumer 顺序消费1,2,3

为了保证顺序消息,consumer端不能并行消费,也就是不能开多个线程或多个客户端去消费。


通过上面的分析,我们了解到,要保证一个topic内部的消息要做到严格的顺序,是多么的困难。


那么,我们有必要花大力气去保证消息严格顺序吗?


实际情况中,从业务方的角度: 


(1)不关注顺序的业务大量存在; 

(2) 队列无序不代表消息无序。



第(2)条的意思是说:我们不保证队列的全局有序,但可以保证消息的局部有序。


所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是不是我们应该寻求的一种更合理的方式?




举个例子:保证来自同1个order id的消息,是有序的!


下面就看一下在Kafka和RocketMQ中,分别是如何对待这个问题的:


    1、Kafka中:发送1条消息的时候,可以指定(topic, partition, key) 3个参数。partiton和key是可选的。


如果你指定了partition,那就是所有消息发往同1个partition,就是有序的。并且在消费端,Kafka保证,1个partition只能被1个consumer消费。


或者你指定key(比如order id),具有同1个key的所有消息,会发往同1个partition。也是有序的。


    2、RocketMQ: RocketMQ在Kafka的基础上,把这个限制更放宽了一步。只指定(topic, key),不指定具体发往哪个队列。也就是说,它更加不希望业务方,非要去要一个全局的严格有序。


但是在rocketmq 4.3.x版本中,可以通过制定 MessageQueue的方法指定消息发往哪个队列:


A、普通同步可靠发送方法:

SendResult sendResult = producer.send(msg);

B、普通同步可靠发送,并指定按orderId来选择MessageQueue方法:

就是说,同一个 orderId 发往同一个 MessageQueue。


下面是开发者自己实现的队列选择策略:


// FIXME RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。
// FIXME 比如下面的示例中,订单号相同的消息会被先后发送到同一个队列中:
// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
// RocketMQ默认提供了3种MessageQueueSelector实现:随机/Hash/根据机房来选择
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
int orderId = 1;
producer.send(msg, new MessageQueueSelector() {
@Override
   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
       int index = id % mqs.size();
       return mqs.get(index);
   }
},orderId);


注意:选择哪个队列是根据 

MessageQueue 选择策略

接口决定的。

默认有三种选择策略,除此之外,开发者还可以实现 选择算法:


/**
* 随机选择队列
①、 SelectMessageQueueByRandom  随机选择

*/
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
   @Override
   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt(mqs.size());
       return mqs.get(value);
   }
}

/**
* 根据 arg hashCode 选择队列
②、   SelectMessageQueueByHash  根据 args Hash值来选择

*/
public class SelectMessageQueueByHash implements MessageQueueSelector {

@Override
   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
       if (value < 0) {
value = Math.abs(value);
       }

value = value % mqs.size();
       return mqs.get(value);
   }
}


/* ③、 SelectMessageQueueByMachineRoom  根据机房来选择
*  未实现
*
* ④、可以自己实现 策略,如同一个订单号的发往同一个队列
*
* RocketMQ默认提供了两种MessageQueueSelector实现
*/
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
private Set<String> consumeridcs;
   @Override
   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
   }

public Set<String> getConsumeridcs() {
return consumeridcs;
   }

public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
   }
}




/* 
④、可以自己实现 策略,如同一个订单号的发往同一个队列
*
* RocketMQ默认提供了两种MessageQueueSelector实现
*/

int orderId = 1;
producer.send(msg, new MessageQueueSelector() {
@Override
   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
       int index = id % mqs.size();
       return mqs.get(index);
   }
},orderId);


 

所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是不是我们应该寻求的一种更合理的方式?


以上是关于一个看似简单的复杂问题 :分布式消息队列RocketMQ-- 消息的“顺序消费”的主要内容,如果未能解决你的问题,请参考以下文章

Java分布式:消息队列(Message Queue)

消息队列之RabbitMQ-分布式部署

如何用好消息队列RabbitMQ?

谈谈消息队列的流派

分布式框架之高性能:消息队列的可用性

选择哪款消息队列更适合你的业务系统?