RocketMQ 顺序消费
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 顺序消费相关的知识,希望对你有一定的参考价值。
参考技术A对于所有的 MQ 来说,必问的一道面试题就是 RocketMQ 顺序消息怎样做?原理是什么?
首先我们要明确什么顺序消费,顺序消费的定义是什么?我所理解的顺序消费,指的针对某一类消息,比如都是订单A 的消息来说,它的消费有先后顺序,类似于 FIFO。假设订单 A 有创建、付款、完成这几类消息,我们对于订单 A 的消息,必须要满足先消费创建,其次是付款,最后是完成。
所以针对整个链路来说,我们不仅需要塞的时候是有序的,消费的时候也应该做到有序。就算是以 FIFO 顺序塞进去,消费如果使用多线程同时消费同一个 ConsumerQueue 且同时能消费多个消息,那必然做不到有序。接下来,会从 provider、consumer 两个方面说明如何做到有序。
首先针对顺序消息,生产者可以是多线程的,只要保证每个线程发的是不同类型的消息(如发生不同订单的消息),那么在不同的分区就可以保证有序;
针对 provider 来说,RocketMQ 提供了发送顺序消息的方式,即 MessageQueueSelector:
provider 在发送的时候,只要选择消息发送到那个 ConsumerQueue 即可。比如订单来说,使用订单 id 作为 key 选择队列,那么同一个订单的消息必定能发送到同一个队列。
所以 provider 的顺序发送异常简单。
针对 consumer 来说,需要使用 MessageListenerOrderly 来消费消息:
consumer 顺序消费的原理也很简单。消费者消费消息的时候,会有一个 PullMessageService 拉取线程(单线程)拉取消息,然后放入到 processQueue(每个消费队列对应一个 processQueue) 中,因为是单线程拉取的,对于同一个队列的消息(虽然消费者可以订阅多个队列,但是对于同一个队列是有序的)是有序的。在放入 processQueue 之后,会调用 ConsumeMessageConcurrentlyService 或 ConsumeMessageOrderlyService 来进行消费,这里是调用 ConsumeMessageOrderlyService 进行消费。ConsumeMessageOrderlyService 在消费的时候,会先获取每一个 ConsumerQueue 的锁,然后从 processQueue 获取消息消费,这也就意味着,对于每一个 ConsumerQueue 的消息来说,消费的逻辑也是顺序的。
不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。
热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。
消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。
消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。
其实对于所谓的顺序消费来说,本质上是类似于一个状态机的行为,比如一个订单先创建,后付款、最后结束的行为,完全可以定义一个状态,而且发生的顺序是有先后的。所以完全不必要使用什么顺序消费,可以先创建,把创建消息塞到 mq,从 mq 获取到创建消息消费,然后创建一个付款消息,再塞到 mq。然后从 mq 消费付款消息,然后标识订单结束。完全可以用一个状态机 + mq + db 来做,更加稳定通用。
以上是关于RocketMQ 顺序消费的主要内容,如果未能解决你的问题,请参考以下文章