RocketMQ源码(21)—ConsumeMessageConcurrentlyService并发消费消息源码
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(21)—ConsumeMessageConcurrentlyService并发消费消息源码相关的知识,希望对你有一定的参考价值。
基于RocketMQ release-4.9.3,深入的介绍了ConsumeMessageConcurrentlyService并发消费消息源码。
此前我们学习了consumer消息的拉取流程源码:
- RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码
- RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】
- RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码
当前DefaultMQPushConsumer拉取到消息之后,会将消息提交到对应的processQueue处理队列内部的msgTreeMap中。然后通过consumeMessageService#submitConsumeRequest方法将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池消费消息。
consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现,下面我们来看看这两种实现如何消费消息,本次我们先学习ConsumeMessageConcurrentlyService并发消费的源码。
文章目录
- 1 start启动服务定时清理过期消息
- 2 submitConsumeRequest提交消费请求
- 3 ConsumeRequest执行消费任务
- 4 processConsumeResult处理消费结果
1 start启动服务定时清理过期消息
consumeMessageService服务在DefaultMQPushConsumerImpl#start方法中被初始化并启动,即调用start方法。
ConsumeMessageConcurrentlyService#start方法将会通过cleanExpireMsgExecutors定时任务清理过期的消息,启动后15min开始执行,后每15min执行一次,这里的15min是RocketMQ大的默认超时时间,可通过defaultMQPushConsumer#consumeTimeout属性设置。
/**
* ConsumeMessageConcurrentlyService的方法
* 启动服务
*/
public void start()
//通过cleanExpireMsgExecutors定时任务清理过期的消息
//启动后15min开始执行,后每15min执行一次,这里的15min时RocketMQ大的默认超时时间,可通过defaultMQPushConsumer#consumeTimeout属性设置
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
//清理过期消息
cleanExpireMsg();
catch (Throwable e)
log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
1.1 cleanExpireMsg清理过期消息
该方法获取所有的消息队列和处理队列的键值对,循环遍历并且调用ProcessQueue#cleanExpiredMsg方法清理过期消息。
/**
* ConsumeMessageConcurrentlyService的方法
* <p>
* 清理过期消息
*/
private void cleanExpireMsg()
//获取所有的消息队列和处理队列的键值对
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
//循环遍历
while (it.hasNext())
Map.Entry<MessageQueue, ProcessQueue> next = it.next();
ProcessQueue pq = next.getValue();
//调用ProcessQueue#cleanExpiredMsg方法清理过期消息
pq.cleanExpiredMsg(this.defaultMQPushConsumer);
1.2cleanExpiredMsg清理过期消息
循环清理msgTreeMap中的过期消息,每次最多循环清理16条消息。
- 每次循环首先获取msgTreeMap中的第一次元素的起始消费时间,msgTreeMap是一个红黑树,第一个节点就是offset最小的节点。
- 如果消费时间距离现在时间超过默认15min,那么获取这个msg,如果没有被消费,或者消费时间距离现在时间不超过默认15min,则结束循环。
- 将获取到的消息通过sendMessageBack发回broker延迟topic,将在给定延迟时间(默认从level 3,即10s开始)之后发回进行重试消费。
- 加锁判断如果这个消息还没有被消费完,并且还是在第一位,那么调用removeMessage方法从msgTreeMap中移除消息,进行下一轮判断。
/**
* ProcessQueue的方法
* 清理过期消息
*
* @param pushConsumer 消费者
*/
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer)
//如果是顺序消费,直接返回,只有并发消费才会清理
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly())
return;
//一次循环最多处理16个消息
int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
//遍历消息,最多处理前16个消息
for (int i = 0; i < loop; i++)
MessageExt msg = null;
try
//加锁
this.treeMapLock.readLock().lockInterruptibly();
try
if (!msgTreeMap.isEmpty())
//获取msgTreeMap中的第一次元素的起始消费时间,msgTreeMap是一个红黑树,第一个节点就是offset最小的节点
String consumeStartTimeStamp = MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue());
//如果消费时间距离现在时间超过默认15min,那么获取这个msg
if (StringUtils.isNotEmpty(consumeStartTimeStamp) && System.currentTimeMillis() - Long.parseLong(consumeStartTimeStamp) > pushConsumer.getConsumeTimeout() * 60 * 1000)
msg = msgTreeMap.firstEntry().getValue();
else
//如果没有被消费,或者消费时间距离现在时间不超过默认15min,则结束循环
break;
else
//msgTreeMap为空,结束循环
break;
finally
this.treeMapLock.readLock().unlock();
catch (InterruptedException e)
log.error("getExpiredMsg exception", e);
try
//将消息发回broker延迟topic,将在给定延迟时间(默认从level3,即10s开始)之后进行重试消费
pushConsumer.sendMessageBack(msg, 3);
log.info("send expire msg back. topic=, msgId=, storeHost=, queueId=, queueOffset=", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
try
this.treeMapLock.writeLock().lockInterruptibly();
try
//如果这个消息还没有被消费完
if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey())
try
//移除消息
removeMessage(Collections.singletonList(msg));
catch (Exception e)
log.error("send expired msg exception", e);
finally
this.treeMapLock.writeLock().unlock();
catch (InterruptedException e)
log.error("getExpiredMsg exception", e);
catch (Exception e)
log.error("send expired msg exception", e);
2 submitConsumeRequest提交消费请求
该方法将消息批量的封装为ConsumeRequest提交到ConsumeMessageConcurrentlyService内部的consumeExecutor线程池中进行异步消费,如果提交失败,则调用submitConsumeRequestLater方法延迟5s进行提交,而不是丢弃。
- 首先获取单次批量消费的数量,默认1,通过DefaultMQPushConsumer的consumeMessageBatchMaxSize属性配置。
- 如果消息数量 <= 单次批量消费的数量,那么直接全量消费,构建一个ConsumeRequest并提交到consumeExecutor线程池。
- 如果消息数量 > 单次批量消费的数量,那么需要分割消息进行分批提交。
从该方法可以得知,对于并发消费模式,拉取到的一批消息被分批次提交到线程池之后,就由线程池里面的线程异步的消费,我们知道线程池里面的线程执行先后顺序时不可控制的,因此这些不同批次的消息会被并发、的无序的消费。
/**
* ConsumeMessageOrderlyService的方法
* 提交并发消费请求
*
* @param msgs 拉取到的消息
* @param processQueue 处理队列
* @param messageQueue 消息队列
* @param dispatchToConsume 是否分发消费,对于并发消费无影响
*/
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume)
//单次批量消费的数量,默认1
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
/*
* 如果消息数量 <= 单次批量消费的数量,那么直接全量消费
*/
if (msgs.size() <= consumeBatchSize)
//构建消费请求,将消息全部放进去
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try
//将请求提交到consumeExecutor线程池中进行消费
this.consumeExecutor.submit(consumeRequest);
catch (RejectedExecutionException e)
//提交的任务被线程池拒绝,那么延迟5s进行提交,而不是丢弃
this.submitConsumeRequestLater(consumeRequest);
/*
* 如果消息数量 > 单次批量消费的数量,那么需要分割消息进行分批提交
*/
else
//遍历
for (int total = 0; total < msgs.size(); )
//一批消息集合,每批消息最多consumeBatchSize条,默认1
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
//将消息按顺序加入集合
for (int i = 0; i < consumeBatchSize; i++, total++)
if (total < msgs.size())
msgThis.add(msgs.get(total));
else
break;
//将本批次消息构建为ConsumeRequest
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try
//将请求提交到consumeExecutor线程池中进行消费
this.consumeExecutor.submit(consumeRequest);
catch (RejectedExecutionException e)
for (; total < msgs.size(); total++)
msgThis.add(msgs.get(total));
//提交的任务被线程池拒绝,那么延迟5s进行提交,而不是丢弃
this.submitConsumeRequestLater(consumeRequest);
consumeExecutor线程池用于消费消息,其定义如下:最小、最大线程数默认20,阻塞队列为无界阻塞队列LinkedBlockingQueue。
/*
* 并发消费线程池
* 最小、最大线程数默认20,阻塞队列为无界阻塞队列LinkedBlockingQueue
*/
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl(consumeThreadPrefix));
//单线程的延迟任务线程池,用于延迟提交消费请求
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
//单线程的延迟任务线程池,用于处理过期的消息
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
2.2 submitConsumeRequestLater延迟提交
提交的任务被线程池拒绝,那么延迟5s进行提交,而不是丢弃。
/**
* ConsumeMessageConcurrentlyService的方法
*
* 提交的任务被线程池拒绝,那么延迟5s进行提交,而不是丢弃
* @param consumeRequest 提交请求
*/
private void submitConsumeRequestLater(final ConsumeRequest consumeRequest
)
this.scheduledExecutorService.schedule(new Runnable()
@Override
public void run()
//将提交的行为封装为一个线程任务,提交到scheduledExecutorService延迟线程池,5s之后执行
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
, 5000, TimeUnit.MILLISECONDS);
2.2 consumeMessageBatchMaxSize和pullBatchSize
consumeMessageBatchMaxSize是什么意思呢?他的字面意思就是单次批量消费的数量,实际上它代表着每次发送给消息监听器MessageListenerOrderly或者MessageListenerConcurrently的consumeMessage方法中的参数List msgs中的最多的消息数量。
consumeMessageBatchMaxSize默认值为1,所以说,无论是并发消费还是顺序消费,每次的consumeMessage方法的执行,msgs集合默认都只有一条消息。同理,如果把它设置为其他值n,无论是并发消费还是顺序消费,每次的consumeMessage的执行,msgs集合默认都最多只有n条消息。
另外,在此前拉取消息的源码中,我们还学习了另一个参数pullBatchSize,默认值为32,其代表的是每一次拉取请求最多批量拉取的消费数量。也就是说无论是并发消费还是顺序消费,每次最多拉取32条消息。
3 ConsumeRequest执行消费任务
ConsumeRequest本身是一个线程任务,当拉取到消息之后,会将一批消息构建为一个ConsumeRequest对象,提交给consumeExecutor,由线程池异步的执行,它的run方法就是并发消费的核心方法。大概逻辑为:
- 如果处理队列被丢弃,即dropped=true,那么直接返回,不再消费,例如负载均衡时该队列被分配给了其他新上线的消费者,尽量避免重复消费。
- 调用resetRetryAndNamespace方法,当消息是重试消息的时候,将msg的topic属性从重试topic还原为真实的topic。
- 如果有消费钩子,那么执行钩子函数的前置方法consumeMessageBefore。我们可以通过DefaultMQPushConsumerImpl#registerConsumeMessageHook方法注册消费钩子ConsumeMessageHook,在消费消息的前后调用。
- 调用listener#consumeMessage方法,进行消息消费,调用实际的业务逻辑,返回执行状态结果如status为null。
- 正常情况下可返回两种状态:ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消费成功 , ConsumeConcurrentlyStatus.RECONSUME_LATER表示消费失败,如果中途抛出异常,则status为null。
- 对返回的执行状态结果进行判断处理。从这里可得知消费超时时间为15min,另外如果返回的status为null,那么status将会被设置为RECONSUME_LATER,即消费失败。
- 计算消费时间consumeRT。如果status为null,如果业务的执行抛出了异常,设置returnType为EXCEPTION,否则设置returnType为RETURNNULL。
- 如消费时间consumeRT大于等于consumeTimeout,默认15min。设置returnType为TIME_OUT。消费超时时间可通过DefaultMQPushConsumer. consumeTimeout属性配置,默认15,单位分钟。
- 如status为RECONSUME_LATER,即消费失败,设置returnType为FAILED。
- 如status为CONSUME_SUCCESS,即消费成功,设置returnType为SUCCESS。
- 如果有消费钩子,那么执行钩子函数的后置方法consumeMessageAfter。
- 如果处理队列没有被丢弃,即dropped=false,那么调用ConsumeMessageConcurrentlyService#processConsumeResult方法处理消费结果,包含消费重试、提交offset等操作。
需要注意的是,如果在执行了listener#consumeMessage方法,即执行了业务逻辑之后,处理消费结果之前,该消息队列被丢弃了,例如负载均衡时该队列被分配给了其他新上线的消费者,那么由于dropped=false,导致不会进行最后的消费结果处理,将会导致消息的重复消费,因此必须做好业务层面的幂等性!
class ConsumeRequest implements Runnable
//一次消费的消息集合,默认1条消息
private final List<MessageExt> msgs;
//处理队列
private final ProcessQueue processQueue;
//消息队列
private final MessageQueue messageQueue;
public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue)
this.msgs = msgs;
this.processQueue = processQueue;
this.messageQueue = messageQueue;
public List<MessageExt> getMsgs()
return msgs;
public ProcessQueue getProcessQueue()
return processQueue;
/**
* ConsumeMessageConcurrentlyService的内部类ConsumeRequest的方法
* <p>
* 执行并发消费
*/
@Override
public void run()
//如果处理队列被丢弃,那么直接返回,不再消费,例如负载均衡时该队列被分配给了其他新上线的消费者,尽量避免重复消费
if (this.processQueue.isDropped())
log.info("the message queue not be able to consume, because it's dropped. group= ", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
/*
* 1 获取并发消费的消息监听器,push模式模式下是我们需要开发的,通过registerMessageListener方法注册,内部包含了要执行的业务逻辑
*/
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
//重置重试topic
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
/*
* 2 如果有消费钩子,那么执行钩子函数的前置方法consumeMessageBefore
* 我们可以注册钩子ConsumeMessageHook,再消费消息的前后调用
*/
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook())
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
//起始时间戳
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
//消费返回类型,初始化为SUCCESS
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try
if (msgs != null && !msgs.isEmpty())
//循环设置每个消息的起始消费时间
for (MessageExt msg : msgs)
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
/*
* 3 调用listener#consumeMessage方法,进行消息消费,调用实际的业务逻辑,返回执行状态结果
* 有两种状态ConsumeConcurrentlyStatus.CONSUME_SUCCESS 和 ConsumeConcurrentlyStatus.RECONSUME_LATER
*/
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context)RocketMQ源码—RocketMQ源码调试环境准备