RocketMQ源码(22)—ConsumeMessageOrderlyService顺序消费消息源码
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(22)—ConsumeMessageOrderlyService顺序消费消息源码相关的知识,希望对你有一定的参考价值。
基于RocketMQ release-4.9.3,深入的介绍了ConsumeMessageOrderlyService顺序消费消息源码。
此前我们学习了consumer消息的拉取流程源码:
- RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码
- RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】
- RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码
当前DefaultMQPushConsumer拉取到消息之后,会将消息提交到对应的processQueue处理队列内部的msgTreeMap中。然后通过consumeMessageService#submitConsumeRequest方法将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池消费消息。
consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现,下面我们来看看这两种实现如何消费消息,
此前我们已经学习了并发消费的源码:https://blog.csdn.net/weixin_43767015/article/details/129017181,本次我们先学习顺序消费的源码。
文章目录
- 1 start启动服务定时锁定消息队列
- 2 submitConsumeRequest提交消费请求
- 3 ConsumeRequest执行消费任务
- 4 processConsumeResult处理消费结果
- 5 顺序消费和并发消费的总结
1 start启动服务定时锁定消息队列
consumeMessageService服务在DefaultMQPushConsumerImpl#start方法中被初始化并启动,即调用start方法。该方法将会通过scheduledExecutorService定时任务锁定所有分配的mq,保证同时只有一个消费端可以消费。
这个定时任务在启动1s后开始执行,后每20s执行一次,这里的20s可通过**-D rocketmq.client.rebalance.lockInterval**属性设置。因此它的频率与负载均衡的默认频率一致,都是20s。
/**
* ConsumeMessageOrderlyService的方法
* 启动服务
*/
public void start()
//如果是集群模式
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()))
//启动一个定时任务,启动后1s执行,后续每20s执行一次
//尝试对所有分配给当前consumer的队列请求broker端的消息队列锁,保证同时只有一个消费端可以消费。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
//定期锁定所有消息队列
ConsumeMessageOrderlyService.this.lockMQPeriodically();
catch (Throwable e)
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
内部调用RebalanceImpl#lockAll方法锁定该客户端分配的所有消费队列。
/**
* ConsumeMessageOrderlyService的方法
* <p>
* 锁定所有消息队列
*/
public synchronized void lockMQPeriodically()
if (!this.stopped)
//锁定所有消息队列
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
1.2 lockAll锁定所有消息队列
该方法尝试锁定所有消息队列。
- 根据processQueueTable的数据,构建brokerName到其所有mq的map集合brokerMqs。在负载均衡并为当前消费者新分配消息队列的时候,也会对新分配的消息队列申请broker加锁,加锁成功后才会创建对应的processQueue存入processQueueTable。也就是说,如果是顺序消息,那么processQueueTable中的数据一定是曾经加锁成功了的。
- 遍历brokerMqs,调用MQClientAPIImpl#lockBatchMQ的方法,向broker发送同步请求,Code为LOCK_BATCH_MQ,请求批量锁定消息队列,返回锁住的mq集合。
- 遍历锁住的mq集合,获取对应的processQueue,设置processQueue的状态,设置locked为true,重新设置加锁的时间。遍历没有锁住的mq,设置locked为false。
/**
* RebalanceImpl的方法
* <p>
* 定时每20s尝试锁定所有消息队列
*/
public void lockAll()
/*
* 1 根据processQueueTable的数据,构建brokerName到所有mq的map集合
* 在新分配消息队列的时候,也会对新分配的消息队列申请broker加锁,加锁成功后会创建对应的processQueue存入processQueueTable
* 也就是说,如果是顺序消息,那么processQueueTable的数据一定是曾经加锁成功了的
*/
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();
Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
//遍历集合
while (it.hasNext())
Entry<String, Set<MessageQueue>> entry = it.next();
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();
if (mqs.isEmpty())
continue;
//获取指定brokerName的master地址。
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null)
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs);
try
/*
* 2 向broker发送同步请求,Code为LOCK_BATCH_MQ,请求批量锁定消息队列,返回锁住的mq集合
*/
Set<MessageQueue> lockOKMQSet =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
//遍历锁住的mq集合
for (MessageQueue mq : lockOKMQSet)
//获取对应的processQueue,设置processQueue的状态
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null)
if (!processQueue.isLocked())
log.info("the message queue locked OK, Group: ", this.consumerGroup, mq);
//设置locked为true
processQueue.setLocked(true);
//设置加锁的时间
processQueue.setLastLockTimestamp(System.currentTimeMillis());
//遍历没有锁住的mq,设置locked为false
for (MessageQueue mq : mqs)
if (!lockOKMQSet.contains(mq))
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null)
processQueue.setLocked(false);
log.warn("the message queue locked Failed, Group: ", this.consumerGroup, mq);
catch (Exception e)
log.error("lockBatchMQ exception, " + mqs, e);
1.1.1 lockBatchMQ批量锁定消息队列
该方法向broker发送同步请求,Code为LOCK_BATCH_MQ,请求批量锁定消息队列,返回锁住的mq集合。
/**
* MQClientAPIImpl的方法
*
* 向broker发送同步请求,Code为LOCK_BATCH_MQ,请求批量锁定消息队列,返回锁住的mq集合
* @param addr broker地址
* @param requestBody 请求体
* @param timeoutMillis 超时时间
* @return
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public Set<MessageQueue> lockBatchMQ(
final String addr,
final LockBatchRequestBody requestBody,
final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException
//Code为LOCK_BATCH_MQ
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
request.setBody(requestBody.encode());
//同步请求,调用brokerVIPChannel判断是否开启vip通道,如果开启了,那么将brokerAddr的port – 2,因为vip通道的端口为普通端口 – 2。
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
switch (response.getCode())
case ResponseCode.SUCCESS:
//解码
LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
return messageQueues;
default:
break;
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
1.1.2 broker处理批量锁定请求lockBatchMQ
broker端通过AdminBrokerProcessor处理LOCK_BATCH_MQ请求。
lockBatchMQ方法就是broker处理批量锁定请求的方法。*broker内部使用一个ConcurrentMap<String/ group */, ConcurrentHashMap<MessageQueue, LockEntry>> 类型的mqLockTable存储不同的consumerGroup的下面的mq到其锁定的clientId的对应关系。**大概逻辑为:
- 首先将已被当前clientId锁定和未锁定的mq分别存储到不同集合。然后对于未锁定的mq尝试加锁。
- 获取本地锁,防止并发,这是一个ReentrantLock,所有的group的分配都获得同一个锁。
- 获取该消费者组对于消息队列的加锁的情况map,存放着下面的消息队列mq到其获取了锁的消费者客户端id的映射关系。一个消费者组下面的一个mq只能被一个clientId锁定。
- 遍历未锁定的mq集合。
- 如果mq未被任务客户端锁定,那么设置新建一个LockEntry,设置为当前clientId,表示已被当前请求的客户端锁定,内部设置锁定时间戳lastUpdateTimestamp为当前毫秒时间戳。
- 如果已被被当前客户端锁定,并且没有过期。那么重新设置锁定时间为当前时间戳,加入到已锁定的mq中,进行下一次循环。每次锁定过期时间为REBALANCE_LOCK_MAX_LIVE_TIME,默认60s,可通过-Drocketmq.broker.rebalance.lockMaxLiveTime的broker参数设置。
- 如果锁已过期,那么设置当前clientId获得了锁,进行下一次循环。
- 否则,表示所没有过期并且也不是当前clientId获得的锁,仅仅输出日志,进行下一次循环。
/**
* RebalanceLockManager的方法
* <p>
* 尝试批量锁定,返回锁定的mq
*
* @param group 消费者组
* @param mqs 需要锁定的mq集合
* @param clientId 客户端id
* @return 已锁定的mq集合
*/
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
final String clientId)
Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
//将已经锁定和未锁定的mq分别存储到不同集合
for (MessageQueue mq : mqs)
if (this.isLocked(group, mq, clientId))
lockedMqs.add(mq);
else
notLockedMqs.add(mq);
//存在未锁定的集合,那么尝试加锁
if (!notLockedMqs.isEmpty())
try
//获取本地锁,是一个ReentrantLock,所有的group的分配都获得同一个锁
this.lock.lockInterruptibly();
try
//获取该消费者组对于消息队列的加锁的情况map,存放着消息队列到其获取了锁的消费者客户端id的映射关系
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (null == groupValue)
//初始化一个
groupValue = new ConcurrentHashMap<>(32);
this.mqLockTable.put(group, groupValue);
//遍历未锁定的集合
for (MessageQueue mq : notLockedMqs)
LockEntry lockEntry = groupValue.get(mq);
//如果该mq未锁定
if (null == lockEntry)
//新建一个LockEntry,设置为当前clientId,表示已被当前请求的客户端锁定
//内部设置锁定时间戳lastUpdateTimestamp为当前毫秒时间戳
lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info(
"tryLockBatch, message queue not locked, I got it. Group: NewClientId: ",
group,
clientId,
mq);
//如果已被被当前客户端锁定,并且没有过期
//每次锁定过期时间为REBALANCE_LOCK_MAX_LIVE_TIME,默认60s,可通过-Drocketmq.broker.rebalance.lockMaxLiveTime的broker参数设置
if (lockEntry.isLocked(clientId))
//重新设置锁定时间为当前时间戳
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
//加入到已锁定的mq中
lockedMqs.add(mq);
//下一次循环
continue;
//获取客户端id,此时表示不是当前clientId获得的锁,或者锁已经过期
String oldClientId = lockEntry.getClientId();
//如果锁已过期
if (lockEntry.isExpired())
//那么设置当前clientId获得了锁
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
"tryLockBatch, message queue lock expired, I got it. Group: OldClientId: NewClientId: ",
group,
oldClientId,
clientId,
mq);
//加入到已锁定的mq中
lockedMqs.add(mq);
//下一次循环
continue;
//到此表示mq被其他客户端锁定了
log.warn(
"tryLockBatch, message queue locked by other client. Group: OtherClientId: NewClientId: ",
group,
oldClientId,
clientId,
mq);
finally
//本地锁解锁
this.lock.unlock();
catch (InterruptedException e)
log.error("putMessage exception", e);
//返回已锁定mq集合
return lockedMqs;
1.1.1.1.1. 分布式mq锁小结
consumeMessageService服务在DefaultMQPushConsumerImpl#start方法中被初始化并启动,即调用start方法。该方法将会通过scheduledExecutorService定时任务锁定所有分配的mq,保证同时只有一个消费端可以消费。
实际上,在之前学习的DefaultMQPushConsumer负载均衡服务的部分就知道,在集群模式加上顺序消费的情况下,一定是要向broker申请messageQueue锁成功之后,才能构建processQueue并且加入到processQueueTable,才能在随后发起拉取消息的请求,所以说,这里的定时任务,仅仅是遍历processQueueTable的所有mq并且申请锁定,起作用更多的是向broker进行分布式mq锁的续期操作。
对于从broker锁定的mq,在客户端的过期时间默认为30s,可以通过客户端启动参数-Drocketmq.client.rebalance.lockMaxLiveTime参数设置。但是在broker端看来,这个锁的过期时间默认60s,可以通过broekr启动参数-Drocketmq.broker.rebalance.lockMaxLiveTime设置。
2 submitConsumeRequest提交消费请求
该方法首先会判断是否分发消费,即dispathToConsume是否为true,如果允许,那么将创建一个ConsumeRequest提交到ConsumeMessageOrderlyService内部的consumeExecutor线程池中进行异步消费,否则什么也不做。注意并没有将消息没有放进ConsumeRequest,因为消费线程会自动拉取treeMap中的消息。
什么时候dispathToConsume为true呢?当当前processQueue的内部的msgTreeMap中有消息,并且consuming=false,即还没有开始消费时,将会返回true,即新提交一个消费任务进去激活消费。如果已经在消费了,那么不会提交新的消费任务,老的消费任务会自动去msgTreeMap拉取消息。
注意这里的dispathToConsume应该是dispatchToConsume,这是RocketMQ中的语法错误。
/**
* ConsumeMessageOrderlyService的方法
* 提交顺序消费请求
*
* @param msgs 拉取到的消息
* @param processQueue 处理队列
* @param messageQueue 消息队列
* @param dispathToConsume 是否分发消费
*/
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume)
//如果允许分发消费
if (dispathToConsume)
//构建消费请求,没有将消费放进去,消费消费会自动拉取treemap中的消息
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
//将请求提交到consumeExecutor线程池中进行消费
this.consumeExecutor.submit(consumeRequest);
3 ConsumeRequest执行消费任务
ConsumeRequest作为线程任务被ConsumeMessageOrderlyService内部的consumeExecutor线程池异步的执行。
/*
* 并发消费线程池
* 最小、最大线程数默认20,阻塞队列为无界阻塞队列LinkedBlockingQueue
*/
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl(consumeThreadPrefix));
//单线程的延迟任务线程池,用于定时执行锁定请求以及延迟提交新的消费请求
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
首先,我们要记住,在之前学习的DefaultMQPushConsumer负载均衡服务的部分就知道,在集群模式加上顺序消费的情况下,一定是要向broker申请messageQueue锁成功之后,才能构建processQueue并且加入到processQueueTable,才能在随后发起拉取消息的请求,所以说,能够进入到ConsumeRequest执行消息消费情况,一定是此前获得过broker的messageQueue锁。
ConsumeRequest的run方法,就是顺序消费的核心方法。下面我们来看看它的大概逻辑:
- 如果处理队列被丢弃,那么直接返回,不再消费,例如负载均衡时该队列被分配给了其他新上线的消费者,尽量避免重复消费。
- 消费消息之前先获取当前messageQueue的本地锁,锁对象是一个Object对象,每一个mq对应一个不同的Object,采用原生的synchronized阻塞式的获取同步锁。这将导致ConsumeMessageOrderlyService的线程池中的线程将不会同时并发的消费同一个队列。
- 如果是广播模式,或者是集群模式,并且锁定了processQueue处理队列,并且processQueue处理队列锁没有过期,那么可以消费消息。p**rocessQueue处理队列锁定实际上就是在负载均衡的时候向broker申请的消息队列分布式锁,申请成功之后将processQueue.locked属性置为true。**内部一个循环中不断的消费,直到消费超时或者条件不满足退出循环。
- 如果处理队列被丢弃,那么直接返回,不再消费,例如负载均衡时该队列被分配给了其他新上线的消费者,尽量避免重复消费。
- 如果是集群模式,并且没有锁定了processQueue处理队列,或者processQueue处理队列锁已经过期,那么调用tryLockLaterAndReconsume尝试延迟10ms请求broekr加锁并重新延迟提交新的消费请求。
- 计算消费时间。如果单次消费任务的消费时间大于默认60s,那么延迟10ms提交新的消费请求,并且结束循环,本次消费任务结束。单次最大消费时间可以通过-Drocketmq.client.maxTimeConsumeContinuously配置启动参数来设置时间。
- 调用getConsumeMessageBatchMaxSize方法,获取单次批量消费的数量consumeBatchSize,默认1,可以通过DefaultMQPushConsumer.consumeMessageBatchMaxSize的属性配置。
- 调用takeMessages方法,从processQueue内部的msgTreeMap有序map集合中获取offset最小的consumeBatchSize条消息,按顺序从最小的offset返回,保证有序性。
- 调用resetRetryAndNamespace方法,重置重试topic,当消息是重试消息的时候,将msg的topic属性从重试topic还原为真实的topic。
- 如果takeMessages方法拉取到了消息,那么进行消费。
- 如果有钩子,那么执行consumeMessageBefore前置方法。我们可以通过DefaultMQPushConsumerImpl#registerConsumeMessageHook方法注册消费钩子ConsumeMessageHook,在消费消息的前后调用。
- 真正消费消息之前再获取processQueue的本地消费锁,保证消息消费时,一个处理队列不会被并发消费。从这里可知,顺序消费需要获取三把锁,broker的messageQueue锁,本地的messageQueue锁,本地的processQueue锁。
- 调用listener#consumeMessage方法,进行消息消费,调用实际的业务逻辑,返回执行状态结果,有四种状态,ConsumeOrderlyStatus.SUCCESS 和 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT推荐使用,ConsumeOrderlyStatus.ROLLBACK和ConsumeOrderlyStatus.COMMIT已被废弃。
- 解锁,
以上是关于RocketMQ源码(22)—ConsumeMessageOrderlyService顺序消费消息源码的主要内容,如果未能解决你的问题,请参考以下文章