一个看似简单的复杂问题 :分布式消息队列RocketMQ-- 消息的“顺序消费”
Posted 程序员创业记
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个看似简单的复杂问题 :分布式消息队列RocketMQ-- 消息的“顺序消费”相关的知识,希望对你有一定的参考价值。
顺序消息看似很简单:
理想情况如下:
①、producer 顺序发送1,2,3
②、broker 顺序存储 1,2,3
③、consumer 顺序消费1,2,3
以上,皆大欢喜。
这个特性看起来很简单,为什么 RocketMQ 默认不保证呢?
下面就从三个封面来讨论要保证顺序消息,是多么的困难,或者说不可能。
在 RocketMQ 中,一个topic默认有 4个 MessageQueue 。
数据结构:
this.messageQueueList = {ArrayList@3127} size = 4
0 = {MessageQueue@3154} "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0]"
1 = {MessageQueue@3155} "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1]"
2 = {MessageQueue@3156} "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2]"
3 = {MessageQueue@3157} "MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3]"
①、producer 顺序发送1,2,3
如果2,3,发送成功了,过了一会1发送失败了,重新发送1,那么就不是顺序消息了。
换个角度,为了保证1发送成功才能发送2,3,得损耗多少性能和资源。
性能可想而知。
②、broker顺序存储1,2,3
在存储端,为了保证消息顺序存储,需要保证一下2个要求:
1、消息不能分区,在Kafka中,它叫做partition;在RocketMQ中,叫 queue ;在RocketMQ ,一个topic默认有4个queue。
如果你有多个队列,那么一个topic的消息,会被分散到多个队列中,自然不能保证顺序存储。
2、即使满足1的要求,一个topic的消息只有一个队列,那么还有两个问题。
A、高可用问题: 比如当前机器挂掉了,上面的消息还没消费完,此时切换到其他机器,高可用保证了,但是消息顺序乱掉了,所以得保证消息是同步复制。
B、那么还得保证切机器之前,挂掉的机器上面,剩余的消息都得全部消费完,这个就非常困难了。
③、consumer 顺序消费1,2,3
为了保证顺序消息,consumer端不能并行消费,也就是不能开多个线程或多个客户端去消费。
通过上面的分析,我们了解到,要保证一个topic内部的消息要做到严格的顺序,是多么的困难。
那么,我们有必要花大力气去保证消息严格顺序吗?
实际情况中,从业务方的角度:
(1)不关注顺序的业务大量存在;
(2) 队列无序不代表消息无序。
第(2)条的意思是说:我们不保证队列的全局有序,但可以保证消息的局部有序。
所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是不是我们应该寻求的一种更合理的方式?
举个例子:保证来自同1个order id的消息,是有序的!
下面就看一下在Kafka和RocketMQ中,分别是如何对待这个问题的:
1、Kafka中:发送1条消息的时候,可以指定(topic, partition, key) 3个参数。partiton和key是可选的。
如果你指定了partition,那就是所有消息发往同1个partition,就是有序的。并且在消费端,Kafka保证,1个partition只能被1个consumer消费。
或者你指定key(比如order id),具有同1个key的所有消息,会发往同1个partition。也是有序的。
2、RocketMQ: RocketMQ在Kafka的基础上,把这个限制更放宽了一步。只指定(topic, key),不指定具体发往哪个队列。也就是说,它更加不希望业务方,非要去要一个全局的严格有序。
但是在rocketmq 4.3.x版本中,可以通过制定 MessageQueue的方法指定消息发往哪个队列:
A、普通同步可靠发送方法:
SendResult sendResult = producer.send(msg);
B、普通同步可靠发送,并指定按orderId来选择MessageQueue方法:
就是说,同一个 orderId 发往同一个 MessageQueue。
下面是开发者自己实现的队列选择策略:
// FIXME RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。
// FIXME 比如下面的示例中,订单号相同的消息会被先后发送到同一个队列中:
// RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
// RocketMQ默认提供了3种MessageQueueSelector实现:随机/Hash/根据机房来选择
// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
int orderId = 1;
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
},orderId);
注意:选择哪个队列是根据
MessageQueue 选择策略
接口决定的。
默认有三种选择策略,除此之外,开发者还可以实现 选择算法:
/**
* 随机选择队列
①、 SelectMessageQueueByRandom 随机选择
*/
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt(mqs.size());
return mqs.get(value);
}
}
/**
* 根据 arg hashCode 选择队列
②、 SelectMessageQueueByHash 根据 args Hash值来选择
*/
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
/* ③、 SelectMessageQueueByMachineRoom 根据机房来选择
* 未实现
*
* ④、可以自己实现 策略,如同一个订单号的发往同一个队列
*
* RocketMQ默认提供了两种MessageQueueSelector实现
*/
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
private Set<String> consumeridcs;
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
public Set<String> getConsumeridcs() {
return consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
/*
④、可以自己实现 策略,如同一个订单号的发往同一个队列
*
* RocketMQ默认提供了两种MessageQueueSelector实现
*/
int orderId = 1;
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
},orderId);
所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是不是我们应该寻求的一种更合理的方式?
以上是关于一个看似简单的复杂问题 :分布式消息队列RocketMQ-- 消息的“顺序消费”的主要内容,如果未能解决你的问题,请参考以下文章