rocketmq之顺序消费

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq之顺序消费相关的知识,希望对你有一定的参考价值。

参考技术A 1.在进行负载分配时候,会把不属于的队列进行锁释放,如果释放成功,从待处理队列中移除掉时,

进行新队列加入操作,尝试对新加的队列加锁,如果锁成功,进入创建待处理队列放入,同时创建pullrequest请求放入请求缓存

2.消费线程每隔20秒获取当前所有待处理队列构造成brokername+queue,尝试给master对应队列加锁,所成功设置待处理队列状态为true,未成功或者为false(就是加锁的一个批量处理,拉取数据用)

3.在进行pull拉取数据时,对于顺序消息,判断该队列是否加锁,加锁的话往下进行,如果是首次加锁,pullreuqets的锁状态置为true,同时获取消费进度

4.从broker拉取数据,回调函数中,把数据放入processQueue中(treeMap中),交给消费线程处理,pullreuqets接着放入缓存进行下次拉取

5.消费线程中,获取当前队列锁,判断processQueue是否已加锁,判断待处理队列开始时间距离当前时间是否超过60秒(只有超过才会跳出当前线程,当前队列只有这一个线程能处理)从processQueue中取消费信息(并发是取直接拉取的信息),消费信息从treemap中移除,同时临时map中存有一份待处理消息,调用监听处理逻辑,返回处理状态

6.自动提交:返回成功,直接清除临时map,返回最后偏移量+1,存入消费进度,

挂起的话,判断当前重试次数是否超过,如果未超过或者超过但是发送Ack失败,重试+1,临时map清除,消息放入msgTreeMap,延迟重新消费,

超过并且Ack延迟成功,进入死信队列,清除临时map,返回当前偏移量+1,当消费成功处理

7.非自动提交:返回提交,和自动提交的成功一样,返回回滚,临时map的消息放入msgTreeMap,清除临时map,重新延时消费(相当于本次未消费),挂起,处理方式和上边一样

8.存入消费进度到内存

RocketMQ 顺序消费

参考技术A

对于所有的 MQ 来说,必问的一道面试题就是 RocketMQ 顺序消息怎样做?原理是什么?

首先我们要明确什么顺序消费,顺序消费的定义是什么?我所理解的顺序消费,指的针对某一类消息,比如都是订单A 的消息来说,它的消费有先后顺序,类似于 FIFO。假设订单 A 有创建、付款、完成这几类消息,我们对于订单 A 的消息,必须要满足先消费创建,其次是付款,最后是完成。

所以针对整个链路来说,我们不仅需要塞的时候是有序的,消费的时候也应该做到有序。就算是以 FIFO 顺序塞进去,消费如果使用多线程同时消费同一个 ConsumerQueue 且同时能消费多个消息,那必然做不到有序。接下来,会从 provider、consumer 两个方面说明如何做到有序。

首先针对顺序消息,生产者可以是多线程的,只要保证每个线程发的是不同类型的消息(如发生不同订单的消息),那么在不同的分区就可以保证有序;

针对 provider 来说,RocketMQ 提供了发送顺序消息的方式,即 MessageQueueSelector:

provider 在发送的时候,只要选择消息发送到那个 ConsumerQueue 即可。比如订单来说,使用订单 id 作为 key 选择队列,那么同一个订单的消息必定能发送到同一个队列。

所以 provider 的顺序发送异常简单。

针对 consumer 来说,需要使用 MessageListenerOrderly 来消费消息:

consumer 顺序消费的原理也很简单。消费者消费消息的时候,会有一个 PullMessageService 拉取线程(单线程)拉取消息,然后放入到 processQueue(每个消费队列对应一个 processQueue) 中,因为是单线程拉取的,对于同一个队列的消息(虽然消费者可以订阅多个队列,但是对于同一个队列是有序的)是有序的。在放入 processQueue 之后,会调用 ConsumeMessageConcurrentlyService 或 ConsumeMessageOrderlyService 来进行消费,这里是调用 ConsumeMessageOrderlyService 进行消费。ConsumeMessageOrderlyService 在消费的时候,会先获取每一个 ConsumerQueue 的锁,然后从 processQueue 获取消息消费,这也就意味着,对于每一个 ConsumerQueue 的消息来说,消费的逻辑也是顺序的。

不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。

热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。

消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。

消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。

其实对于所谓的顺序消费来说,本质上是类似于一个状态机的行为,比如一个订单先创建,后付款、最后结束的行为,完全可以定义一个状态,而且发生的顺序是有先后的。所以完全不必要使用什么顺序消费,可以先创建,把创建消息塞到 mq,从 mq 获取到创建消息消费,然后创建一个付款消息,再塞到 mq。然后从 mq 消费付款消息,然后标识订单结束。完全可以用一个状态机 + mq + db 来做,更加稳定通用。

以上是关于rocketmq之顺序消费的主要内容,如果未能解决你的问题,请参考以下文章

rocketMQ之顺序消费

rocketMQ之顺序消费

RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信

RocketMQ 顺序消费

RocketMQ-消息消费模式 顺序消费

RocketMQ事务消费和顺序消费详解