rocketmq之顺序消费
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq之顺序消费相关的知识,希望对你有一定的参考价值。
参考技术A 1.在进行负载分配时候,会把不属于的队列进行锁释放,如果释放成功,从待处理队列中移除掉时,进行新队列加入操作,尝试对新加的队列加锁,如果锁成功,进入创建待处理队列放入,同时创建pullrequest请求放入请求缓存
2.消费线程每隔20秒获取当前所有待处理队列构造成brokername+queue,尝试给master对应队列加锁,所成功设置待处理队列状态为true,未成功或者为false(就是加锁的一个批量处理,拉取数据用)
3.在进行pull拉取数据时,对于顺序消息,判断该队列是否加锁,加锁的话往下进行,如果是首次加锁,pullreuqets的锁状态置为true,同时获取消费进度
4.从broker拉取数据,回调函数中,把数据放入processQueue中(treeMap中),交给消费线程处理,pullreuqets接着放入缓存进行下次拉取
5.消费线程中,获取当前队列锁,判断processQueue是否已加锁,判断待处理队列开始时间距离当前时间是否超过60秒(只有超过才会跳出当前线程,当前队列只有这一个线程能处理)从processQueue中取消费信息(并发是取直接拉取的信息),消费信息从treemap中移除,同时临时map中存有一份待处理消息,调用监听处理逻辑,返回处理状态
6.自动提交:返回成功,直接清除临时map,返回最后偏移量+1,存入消费进度,
挂起的话,判断当前重试次数是否超过,如果未超过或者超过但是发送Ack失败,重试+1,临时map清除,消息放入msgTreeMap,延迟重新消费,
超过并且Ack延迟成功,进入死信队列,清除临时map,返回当前偏移量+1,当消费成功处理
7.非自动提交:返回提交,和自动提交的成功一样,返回回滚,临时map的消息放入msgTreeMap,清除临时map,重新延时消费(相当于本次未消费),挂起,处理方式和上边一样
8.存入消费进度到内存
RocketMQ 顺序消费
参考技术A对于所有的 MQ 来说,必问的一道面试题就是 RocketMQ 顺序消息怎样做?原理是什么?
首先我们要明确什么顺序消费,顺序消费的定义是什么?我所理解的顺序消费,指的针对某一类消息,比如都是订单A 的消息来说,它的消费有先后顺序,类似于 FIFO。假设订单 A 有创建、付款、完成这几类消息,我们对于订单 A 的消息,必须要满足先消费创建,其次是付款,最后是完成。
所以针对整个链路来说,我们不仅需要塞的时候是有序的,消费的时候也应该做到有序。就算是以 FIFO 顺序塞进去,消费如果使用多线程同时消费同一个 ConsumerQueue 且同时能消费多个消息,那必然做不到有序。接下来,会从 provider、consumer 两个方面说明如何做到有序。
首先针对顺序消息,生产者可以是多线程的,只要保证每个线程发的是不同类型的消息(如发生不同订单的消息),那么在不同的分区就可以保证有序;
针对 provider 来说,RocketMQ 提供了发送顺序消息的方式,即 MessageQueueSelector:
provider 在发送的时候,只要选择消息发送到那个 ConsumerQueue 即可。比如订单来说,使用订单 id 作为 key 选择队列,那么同一个订单的消息必定能发送到同一个队列。
所以 provider 的顺序发送异常简单。
针对 consumer 来说,需要使用 MessageListenerOrderly 来消费消息:
consumer 顺序消费的原理也很简单。消费者消费消息的时候,会有一个 PullMessageService 拉取线程(单线程)拉取消息,然后放入到 processQueue(每个消费队列对应一个 processQueue) 中,因为是单线程拉取的,对于同一个队列的消息(虽然消费者可以订阅多个队列,但是对于同一个队列是有序的)是有序的。在放入 processQueue 之后,会调用 ConsumeMessageConcurrentlyService 或 ConsumeMessageOrderlyService 来进行消费,这里是调用 ConsumeMessageOrderlyService 进行消费。ConsumeMessageOrderlyService 在消费的时候,会先获取每一个 ConsumerQueue 的锁,然后从 processQueue 获取消息消费,这也就意味着,对于每一个 ConsumerQueue 的消息来说,消费的逻辑也是顺序的。
不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。
热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。
消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。
消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。
其实对于所谓的顺序消费来说,本质上是类似于一个状态机的行为,比如一个订单先创建,后付款、最后结束的行为,完全可以定义一个状态,而且发生的顺序是有先后的。所以完全不必要使用什么顺序消费,可以先创建,把创建消息塞到 mq,从 mq 获取到创建消息消费,然后创建一个付款消息,再塞到 mq。然后从 mq 消费付款消息,然后标识订单结束。完全可以用一个状态机 + mq + db 来做,更加稳定通用。
以上是关于rocketmq之顺序消费的主要内容,如果未能解决你的问题,请参考以下文章