RocketMq消费者拉取消息源码
Posted 壹佰大多
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMq消费者拉取消息源码相关的知识,希望对你有一定的参考价值。
文章目录
- 1、DefaultMQPushConsumer分发处理消息源码
- 2、pullMessageServer
- 3、pullMessageServer的pullMessage源码分析
- 4、DefaultMQPushConsumerImpl的拉取消息
- 5.1、pullKernelImpl拉取消息
- 5.2 pullMessage发起拉取消息请求
1、DefaultMQPushConsumer分发处理消息源码
/**
* 分发处理消息
* @param pullRequestList
*/
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList)
//遍历拉去请求
for (PullRequest pullRequest : pullRequestList)
//将请求存入PullMessageService服务的pullRequestQueue集合中,后续异步的消费,执行拉取消息的请求
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, , add a new pull request ", consumerGroup, pullRequest);
2、pullMessageServer
Push模式下, 消息拉取由PullMessageService服务实现,PullMessageService继承了ServiceThread,因此他也是一个异步线程任务。
在它的run方法中,在循环中不断的从pullRequestQueue中阻塞式的获取并移除队列的头部数据,即拉取消息的请求,然后调用pullMessage方法根据该请求去broker拉取消息
/**
* PullMessageService的方法
* 从MQClientInstance的启动流程中可以看出,RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。
*/
@Override
public void run()
logger.info(this.getServiceName() + " service started");
//循环拉取消息
//运行时逻辑。如果服务没有停止,则在死循环中执行拉取消息的操作。
while (!this.isStopped())
try
/**
* TODO: 从请求队列中获取拉取消息请求,刚开始肯定获取不到(因为没有经过重平衡分配数据),那么我们就要看是什么时候将拉取请求放入队列中的呢?
* 没错,这就要看重平衡服务了,这里最开始是阻塞的,经过重平衡可以获取到PullRequest
*/
MessageRequest messageRequest = this.messageRequestQueue.take();
/**
* 如果消息请求模式是POP
* POP:在pop模式下工作的消费者可以共享MessageQueue
*/
if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP)
//执行popMessage
this.popMessage((PopRequest)messageRequest);
else
//TODO: 拉取消息
this.pullMessage((PullRequest)messageRequest);
catch (InterruptedException ignored)
catch (Exception e)
logger.error("Pull Message Service Run Method exception", e);
logger.info(this.getServiceName() + " service end");
3、pullMessageServer的pullMessage源码分析
pullMessage方法是拉取消息的入口方法。内部实际调用DefaultMQPushConsumerImpl的pullMessage方法。
/**
* PullMessageService的方法
* 拉取消息服务
* @param pullRequest 拉取请求
*/
private void pullMessage(final PullRequest pullRequest)
//从consumerTable中获取pullRequest中保存的消费者组的取消费者实例
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null)
//强转为推送模式消费者
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
//拉取消息
impl.pullMessage(pullRequest);
else
logger.warn("No matched consumer for the PullRequest , drop it", pullRequest);
4、DefaultMQPushConsumerImpl的拉取消息
-
服务状态校验。如果消费者服务状态异常,或者消费者暂停了,那么延迟发送拉取消息请求。
-
流控校验。默认情况下,如果processQueue中已缓存的消息总数量大于设定的阈值,默认1000,或者processQueue中已缓存的消息总大小大于设定的阈值,默认100MB那么同样延迟发送拉取消息请求。
-
顺序消费和并发消费的校验。如果是并发消费并且内存中消息的offset的最大跨度大于设定的阈值,默认2000。那么延迟发送拉取消息请求。如果是顺序消费并且没有锁定过,那么需要设置消费点位。
-
创建拉取消息的回调函数对象PullCallback,当拉取消息的请求返回之后,将会调用回调函数。这里面的源码我们后面再讲。
-
判断是否允许上报消费点位,如果是集群消费模式,并且本地内存有关于此mq的offset,那么设置commitOffsetEnable为true,表示拉取消息时可以上报消费位点给Broker进行持久化。
-
调用pullAPIWrapper.pullKernelImpl方法真正的拉取消息。
/**
* DefaultMQPushConsumerImpl的方法
* 拉取消息
* @param pullRequest 拉取消息请求
*/
public void pullMessage(final PullRequest pullRequest)
//获取对应的处理队列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
//如果该处理队列被丢弃,直接返回(默认是false)
if (processQueue.isDropped())
log.info("the pull request[] is dropped.", pullRequest.toString());
return;
//如果处理队列未被丢弃,更新时间戳
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
/**
* 1. 状态校验
*/
try
//确保此consumer的服务状态正常,服务状态为RUNNING才为正常,如果服务状态不是RUNNING,那么抛出异常
this.makeSureStateOK();
catch (MQClientException e)
log.warn("pullMessage exception, consumer state not ok", e);
//延迟3s发送拉取消息请求
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
//如果消费者暂停了,那么延迟1s后再发送拉取消息请求(默认是false)
if (this.isPause())
log.warn("consumer was paused, execute pull request later. instanceName=, group=", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
/**
* 2. 流控校验
*/
//获得processQueue中已缓存的消息总数量
long cachedMessageCount = processQueue.getMsgCount().get();
//获取processQueue中已缓存的消息总大小MB
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
//TODO: 判断是否出发流控?目的:流控主要是保护消费者。当消费者消费能力不够时,拉取速度太快会导致大量消息积压,很可能导致内存溢出。
/**
* TODO:如果消息数量大于配置,则从消息数量进行流控
* 1. 判断queue缓存的消息数量是否超过1000(可以根据pullThresholdForQueue参数配置)
* 如果超过了1000,则先不去broker拉取消息,而是先暂停50ms,然后重新将对象放入队列(this.pullRequestQueue.put(pullRequest)),然后重新拉取
*/
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue())
//延迟执行拉取消息请求
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0)
//缓存的消息计数超过了阈值{},因此流控制也超过了阈值
log.warn(
"the cached message count exceeds the threshold , so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
return;
/**
* TODO: 如果消息大小超过配置,则从消息大小进行流控
* 2. 判断queue缓存的消息大小是否超过100M(可以根据pullThresholdSizeForQueue参数配置)
* 如果超过100M,则先不去broker拉取消息,而是先暂停50ms,然后重新将对象放入队列中(this.pullRequestQueue.put(pullRequest)),然后重新拉取
*/
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue())
//延迟执行拉取消息请求
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0)
//缓存的消息大小超过了阈值{}MiB,流控制也是如此
log.warn(
"the cached message size exceeds the threshold MiB, so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
return;
/**
* 3 顺序消费和并发消费的校验
*/
//如果不是顺序消费,即并发消费
if (!this.consumeOrderly)
//如果内存中消息的offset的最大跨度大于设置的阈值,默认2000
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan())
//延迟50ms发送拉取消息请求
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0)
log.warn(
"the queue's messages, span too long, so do flow control, minOffset=, maxOffset=, maxSpan=, pullRequest=, flowControlTimes=",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
return;
else
//顺序消费校验,如果已经锁定
if (processQueue.isLocked())
//如果此前没有锁定过,那么需要设置消费点位(默认false)
if (!pullRequest.isPreviouslyLocked())
long offset = -1L;
try
//获取该MessageQueue的下一个消息的消费偏移量offset
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
if (offset < 0)
throw new MQClientException(ResponseCode.SYSTEM_ERROR, "Unexpected offset " + offset);
catch (Exception e)
//延迟3s发送拉取消息请求
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: ", pullRequest, e);
return;
boolean brokerBusy = offset < pullRequest.getNextOffset();
//第一次拉取消息时,请修复来自代理的偏移量。pull请求:
log.info("the first time to pull message, so fix offset from broker. pullRequest: NewOffset: brokerBusy: ",
pullRequest, offset, brokerBusy);
if (brokerBusy)
//第一次拉取消息,但拉取请求偏移量大于代理消耗偏移量
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: NewOffset: ",
pullRequest, offset);
//设置previouslyLocked为true
pullRequest.setPreviouslyLocked(true);
//重设消费点位
pullRequest.setNextOffset(offset);
else
//如果没有被锁住,那么延迟3s发送拉取消息请求
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, ", pullRequest);
return;
//获取topic对应的SubscriptionData订阅关系
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
//如果订阅信息为null,则表示发现消费者的订阅失败
if (null == subscriptionData)
//延迟3s发送拉取消息请求
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, ", pullRequest);
return;
//起始时间
final long beginTimestamp = System.currentTimeMillis();
/**
* 4. 创建拉取消息的回调函数对象,当拉取消息的请求返回之后,将会指定回调函数
*/
//TODO: 构建消息处理的回调对象,它的非常重要的,等从broker拉取消息后(这里是从broker拉取消息成功后才执行的),会交给它来处理
PullCallback pullCallback = new PullCallback()
@Override
public void onSuccess(PullResult pullResult)
if (pullResult != null)
/**
* 拉取成功,开始处理从broker读取到的消息
* 将二进制内容转换成MessageExt对象,并根据TAG的值进行过滤
*/
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus())
//TODO: 发现了消息
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
//TODO: 更新nextOffset的值
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
/**
* TODO:如果没有消息则立即执行,立即拉取的意思是继续将PullRequest放入队列中
* 这样take()方法将不会再阻塞,然后继续从broker拉取消息,从而达到持续从broker拉取消息
*/
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty())
//拉取消息
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
else
//拉取的消息不为空
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
//TODO: 将本次读取到的所有信息(经过TAG/SQL过滤)保存到本地缓存队列processQueue中
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
/**
* TODO: 构建consumeRequest,将消息提交到线程池中,由ConsumerMessageService 进行消费
* 由于我们的是普通消息(不是顺序消息),所以由ConsumeMessageConcurrentlyService类来消费消息
* ConsumeMessageConcurrentlyService内部会创建一个线程池ThreadPoolExecutor,这个xcc非常重要,消息最终将提交到这个线程池中
*/
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
/**
* 上面是异步消费,然后这里是将PullRequest放入队列中,这样take()方法将不会阻塞
* 然后继续从broker中拉取消息,从而到达持续从broker中拉取消息
* 延迟pullInterval 时间再去拉取消息:
* 这里有一个 pullInterval参数,表示间隔多长时间在放入队列中(实际上就是间隔多长时间再去broker拉取消息)。当消费者消费速度比生产者快的时候,可以考虑设置这个值,这样可以避免大概率拉取到空消息。
*
*/
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0)
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
else
//立即拉取消息
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset)
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: firstMsgOffset: prevRequestOffset: ",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, ",
pullRequest.toString(), pullResult.toString(RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字
基于RocketMQ release-4.9.3,深入的介绍了Broker处理DefaultMQPushConsumer发起的拉取消息请求源码。
此前我们学习了RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码。我们知道consumer在发送了拉取消息请求的时候,请求的Code为PULL_MESSAGE,broker端接收到请求之后,将会更根据不同的Code调用不同的处理器进行处理,而PULL_MESSAGE拉取消息的请求则是通过PullMessageProcessor处理的。
下面我们来看看Broker处理DefaultMQPushConsumer发起的拉取消息请求源码。
1 PullMessageProcessor处理拉取请求
PullMessageProcessor作为broker的拉取消息处理器,用于处理拉取消息的请求,他在BrokerController实例化的时候跟着实例化。
然后在registerProcessor方法中,将Code为PULL_MESSAGE的请求与其绑定。
随后在processRequestCommand方法中,会根据请求的Code选择不同的netty处理器进行处理,调用方法为asyncProcessRequest:
在asyncProcessRequest方法中,将会调用processRequest方法,该方法由各个处理器实现,这里就是PullMessageProcessor处理器处理器PULL_MESSAGE拉取消息请求的入口方法。
/**
* PullMessageProcessor的方法
* <p>
* 处理PULL_MESSAGE拉取消息请求
*/
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException
//调用另一个processRequest方法,第三个参数broker是否支持持挂起请求为true
return this.processRequest(ctx.channel(), request, true);
2 processRequest处理拉取消息请求
**该方法处理拉取消息的请求,包括构建过滤信息,拉取消息,拉取结果处理(判断直接返回响应还是挂起请求),上报消费点位等步骤,源码非常多,不易理解。**大概步骤为(详细步骤请看源码注释):
- 构建过滤信息。基于TAG会获取subscriptionData,基于classFilter还会获取consumerFilterData,最后根据它们构建MessageFilter对象。
- 如果有子订阅标记,即hasSubscriptionFlag = true,那么每次拉取消息都会重新构建subscriptionData和consumerFilterData,而不是使用缓存的数据。
- 一般hasSubscriptionFlag都是false,因为hasSubscriptionFlag为true需要consumer端将postSubscriptionWhenPull=true,并且订阅不是classFilter模式同时满足
- 通过DefaultMessageStore#getMessage方法拉取消息,并且进行过滤操作。这是拉取消息的核心方法,涉及到查找ConsumeQueue和CommitLog文件数据。返回拉取结果GetMessageResult。
- 对于拉取结果GetMessageResult进行处理,设置响应数据。
- 判断并设置下次拉取消息的建议broker是MASTER还是SLAVE。
- 如果getMessage方法返回的GetMessageResult的suggestPullingFromSlave属性为true,则设置responseHeader的suggestWhichBrokerId属性值为1,即建议下次从SLAVE拉取,否则设置为0,建议下次从MASTER拉取。
- 判断broker角色。如果是SLAVE,并且slaveReadEnable = false。那么设置responseHeader的suggestWhichBrokerId属性值为0,即建议下次从MASTER拉取。
- 如果slaveReadEnable = true,并且如果如果消费太慢了,那么下次重定向到另一台broker,id通过subscriptionGroupConfig的whichBrokerWhenConsumeSlowly指定,默认1,即SLAVE。否则id通过subscriptionGroupConfig的brokerId指定,默认0,即MASTER。如果slaveReadEnable = false,设置建议的brokerId为MASTER。
- 判断getMessageResult的状态码,并设置response的对应的响应码。
- 判断如果有消费钩子,那么执行消费钩子的consumeMessageBefore方法。
- 判断响应码,然后直接返回数据或者进行短轮询或者长轮询。
- 如果拉取消息成功,那么更新一些统计信息,然后从buffer中读取出消息转换为字节数组,存入response的body中。
- 如果没有读取到消息,如果broker允许挂起请求并且客户端支持请求挂起,则broker挂起该请求一段时间,中间如果有消息到达则会唤醒请求拉取消息并返回。
- 计算最长挂起时间,如果支持长轮询则默认最长挂起15s,否则使用短轮询,挂起最长1s。
- 构建一个PullRequest,通过pullRequestHoldService#suspendPullRequest方法提交PullRequest,该请求将会被挂起并异步处理。
iii. 如果读取的offset不正确,太大或者太小,发布offset移除事件。
- 拉取消息完毕之后,无论是否拉取到消息,只要broker支持挂起请求(新的拉取请求为true,但是已被suspend的请求将会是false),并且consumer支持提交消费进度,并且当前broker不是SLAVE角色,那么通过ConsumerOffsetManager#commitOffset方法提交消费进度偏移量。
/**
* PullMessageProcessor的方法
* <p>
* 处理拉取消息请求
*
* @param channel 通连接道
* @param request 请求
* @param brokerAllowSuspend broker是否支持挂起请求
* @return
* @throws RemotingCommandException
*/
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException
//起始时间
final long beginTimeMills = this.brokerController.getMessageStore().now();
//创建响应命令对象
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
//创建响应头
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
//解析请求头
final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
//设置请求id,通过id可以获取请求结果
response.setOpaque(request.getOpaque());
log.debug("receive PullMessage request command, ", request);
//当前broker是否可读,不可读则直接返回
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission()))
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
//获取当前consumerGroup对应的订阅信息
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig)
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
//判断是否可消费,不可消费则直接返回
if (!subscriptionGroupConfig.isConsumeEnable())
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
return response;
//是否支持请求挂起
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
//是否提交消费进度
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
//是否存在子订阅,即TAG或者SQL92的设置,用于过滤消息
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
//计算broker最长的挂起时间,默认15s,该参数是消费者传递的
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
//获取topic配置
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig)
log.error("the topic not exist, consumer: ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
//topic是否可读,不可读则直接返回
if (!PermName.isReadable(topicConfig.getPerm()))
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
return response;
//校验请求中的队列id,如果小于0或者大于等于topic配置中的读队列数量,那么直接返回
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums())
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
/*
* 1 构建过滤信息
* 真正的过滤消息操作还在后面,而且broker和consumer都会进行过滤
*/
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
//如果有子订阅标记,那么每次拉取消息都会重新构建subscriptionData和consumerFilterData,而不是使用缓存的信息,一般都是false
//因为hasSubscriptionFlag为true需要consumer端将postSubscriptionWhenPull=true,并且订阅不是classFilter模式同时满足
if (hasSubscriptionFlag)
try
subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());
if (!ExpressionType.isTagType(subscriptionData.getExpressionType()))
consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion());
assert consumerFilterData != null;
catch (Exception e)
log.warn("Parse the consumer's subscription[] failed, group: ", requestHeader.getSubscription(), requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
else
//获取消费者组信息
ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo)
log.warn("the consumer's group info not exist, group: ", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
//如果不支持广播消费但是消费者消费模式是广播消费,则直接返回
if (!subscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING)
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
return response;
//获取broker缓存的此consumerGroupInfo中关于此topic的订阅关系
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData)
log.warn("the consumer's subscription not exist, group: , topic:", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
//比较订阅关系版本
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion())
log.warn("The broker's subscription is not latest, group: ", requestHeader.getConsumerGroup(), subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
return response;
//如果订阅关系表达式不是TAG类型,那么构建consumerFilterData
if (!ExpressionType.isTagType(subscriptionData.getExpressionType()))
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), requestHeader.getConsumerGroup());
if (consumerFilterData == null)
response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
return response;
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion())
log.warn("The broker's consumer filter data is not latest, group: , topic: , serverV: , clientV: ", requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
response.setRemark("the consumer's consumer filter data not latest");
return response;
//如果订阅关系表达式不是TAG类型,并且enablePropertyFilter没有开启支持SQL92,那么抛出异常
//也就是说,如果消费者使用SQL92模式订阅,那么需要现在broker端设置enablePropertyFilter=true
if (!ExpressionType.isTagType(subscriptionData.getExpressionType()) && !this.brokerController.getBrokerConfig().isEnablePropertyFilter())
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
return response;
MessageFilter messageFilter;
//重试topic是否支持filter过滤,默认false,即重试topic是不支持过滤额
if (this.brokerController.getBrokerConfig().isFilterSupportRetry())
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
else
//创建普通的ExpressionMessageFilter,内部保存了消费者启动时通过心跳上报的订阅关系
//一般基于tag订阅的情况下,consumerFilterData是null,通过subscriptionData进行过滤
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
/*
* 2 通过DefaultMessageStore#getMessage方法批量拉取消息,并且进行过滤操作
*/
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
/*
* 3 对于拉取结果GetMessageResult进行处理
*/
if (getMessageResult != null)
//设置拉去状态枚举名字
response.setRemark(getMessageResult.getStatus().name());
//设置下次拉取的consumeQueue的起始逻辑偏移量
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
//设置consumeQueue的最小、最大的逻辑偏移量maxOffset和minOffset
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
/*
* 3.1 判断并设置下次拉取消息的建议broker是MATER还是SLAVE
*/
//是否建议从SLAVE拉取消息
if (getMessageResult.isSuggestPullingFromSlave())
//设置建议的brokerId为从服务器的id 1
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
else
//否则,设置建议的brokerId为主服务器的id 0
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
//判断broker角色
switch (this.brokerController.getMessageStoreConfig().getBrokerRole())
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
//如果是SLAVE,并且从服务器不可读
if (!this.brokerController.getBrokerConfig().以上是关于RocketMq消费者拉取消息源码的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码
RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码
RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字
RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字