RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字相关的知识,希望对你有一定的参考价值。
基于RocketMQ release-4.9.3,深入的介绍了Broker处理DefaultMQPushConsumer发起的拉取消息请求源码。
此前我们学习了RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码。我们知道consumer在发送了拉取消息请求的时候,请求的Code为PULL_MESSAGE,broker端接收到请求之后,将会更根据不同的Code调用不同的处理器进行处理,而PULL_MESSAGE拉取消息的请求则是通过PullMessageProcessor处理的。
下面我们来看看Broker处理DefaultMQPushConsumer发起的拉取消息请求源码。
1 PullMessageProcessor处理拉取请求
PullMessageProcessor作为broker的拉取消息处理器,用于处理拉取消息的请求,他在BrokerController实例化的时候跟着实例化。
然后在registerProcessor方法中,将Code为PULL_MESSAGE的请求与其绑定。
随后在processRequestCommand方法中,会根据请求的Code选择不同的netty处理器进行处理,调用方法为asyncProcessRequest:
在asyncProcessRequest方法中,将会调用processRequest方法,该方法由各个处理器实现,这里就是PullMessageProcessor处理器处理器PULL_MESSAGE拉取消息请求的入口方法。
/**
* PullMessageProcessor的方法
* <p>
* 处理PULL_MESSAGE拉取消息请求
*/
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException
//调用另一个processRequest方法,第三个参数broker是否支持持挂起请求为true
return this.processRequest(ctx.channel(), request, true);
2 processRequest处理拉取消息请求
**该方法处理拉取消息的请求,包括构建过滤信息,拉取消息,拉取结果处理(判断直接返回响应还是挂起请求),上报消费点位等步骤,源码非常多,不易理解。**大概步骤为(详细步骤请看源码注释):
- 构建过滤信息。基于TAG会获取subscriptionData,基于classFilter还会获取consumerFilterData,最后根据它们构建MessageFilter对象。
- 如果有子订阅标记,即hasSubscriptionFlag = true,那么每次拉取消息都会重新构建subscriptionData和consumerFilterData,而不是使用缓存的数据。
- 一般hasSubscriptionFlag都是false,因为hasSubscriptionFlag为true需要consumer端将postSubscriptionWhenPull=true,并且订阅不是classFilter模式同时满足
- 通过DefaultMessageStore#getMessage方法拉取消息,并且进行过滤操作。这是拉取消息的核心方法,涉及到查找ConsumeQueue和CommitLog文件数据。返回拉取结果GetMessageResult。
- 对于拉取结果GetMessageResult进行处理,设置响应数据。
- 判断并设置下次拉取消息的建议broker是MASTER还是SLAVE。
- 如果getMessage方法返回的GetMessageResult的suggestPullingFromSlave属性为true,则设置responseHeader的suggestWhichBrokerId属性值为1,即建议下次从SLAVE拉取,否则设置为0,建议下次从MASTER拉取。
- 判断broker角色。如果是SLAVE,并且slaveReadEnable = false。那么设置responseHeader的suggestWhichBrokerId属性值为0,即建议下次从MASTER拉取。
- 如果slaveReadEnable = true,并且如果如果消费太慢了,那么下次重定向到另一台broker,id通过subscriptionGroupConfig的whichBrokerWhenConsumeSlowly指定,默认1,即SLAVE。否则id通过subscriptionGroupConfig的brokerId指定,默认0,即MASTER。如果slaveReadEnable = false,设置建议的brokerId为MASTER。
- 判断getMessageResult的状态码,并设置response的对应的响应码。
- 判断如果有消费钩子,那么执行消费钩子的consumeMessageBefore方法。
- 判断响应码,然后直接返回数据或者进行短轮询或者长轮询。
- 如果拉取消息成功,那么更新一些统计信息,然后从buffer中读取出消息转换为字节数组,存入response的body中。
- 如果没有读取到消息,如果broker允许挂起请求并且客户端支持请求挂起,则broker挂起该请求一段时间,中间如果有消息到达则会唤醒请求拉取消息并返回。
- 计算最长挂起时间,如果支持长轮询则默认最长挂起15s,否则使用短轮询,挂起最长1s。
- 构建一个PullRequest,通过pullRequestHoldService#suspendPullRequest方法提交PullRequest,该请求将会被挂起并异步处理。
iii. 如果读取的offset不正确,太大或者太小,发布offset移除事件。
- 判断并设置下次拉取消息的建议broker是MASTER还是SLAVE。
- 拉取消息完毕之后,无论是否拉取到消息,只要broker支持挂起请求(新的拉取请求为true,但是已被suspend的请求将会是false),并且consumer支持提交消费进度,并且当前broker不是SLAVE角色,那么通过ConsumerOffsetManager#commitOffset方法提交消费进度偏移量。
/**
* PullMessageProcessor的方法
* <p>
* 处理拉取消息请求
*
* @param channel 通连接道
* @param request 请求
* @param brokerAllowSuspend broker是否支持挂起请求
* @return
* @throws RemotingCommandException
*/
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException
//起始时间
final long beginTimeMills = this.brokerController.getMessageStore().now();
//创建响应命令对象
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
//创建响应头
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
//解析请求头
final PullMessageRequestHeader requestHeader = (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
//设置请求id,通过id可以获取请求结果
response.setOpaque(request.getOpaque());
log.debug("receive PullMessage request command, ", request);
//当前broker是否可读,不可读则直接返回
if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission()))
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
//获取当前consumerGroup对应的订阅信息
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig)
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
//判断是否可消费,不可消费则直接返回
if (!subscriptionGroupConfig.isConsumeEnable())
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
return response;
//是否支持请求挂起
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
//是否提交消费进度
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
//是否存在子订阅,即TAG或者SQL92的设置,用于过滤消息
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
//计算broker最长的挂起时间,默认15s,该参数是消费者传递的
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
//获取topic配置
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig)
log.error("the topic not exist, consumer: ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
//topic是否可读,不可读则直接返回
if (!PermName.isReadable(topicConfig.getPerm()))
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
return response;
//校验请求中的队列id,如果小于0或者大于等于topic配置中的读队列数量,那么直接返回
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums())
String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
/*
* 1 构建过滤信息
* 真正的过滤消息操作还在后面,而且broker和consumer都会进行过滤
*/
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
//如果有子订阅标记,那么每次拉取消息都会重新构建subscriptionData和consumerFilterData,而不是使用缓存的信息,一般都是false
//因为hasSubscriptionFlag为true需要consumer端将postSubscriptionWhenPull=true,并且订阅不是classFilter模式同时满足
if (hasSubscriptionFlag)
try
subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());
if (!ExpressionType.isTagType(subscriptionData.getExpressionType()))
consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(), requestHeader.getExpressionType(), requestHeader.getSubVersion());
assert consumerFilterData != null;
catch (Exception e)
log.warn("Parse the consumer's subscription[] failed, group: ", requestHeader.getSubscription(), requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
else
//获取消费者组信息
ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo)
log.warn("the consumer's group info not exist, group: ", requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
//如果不支持广播消费但是消费者消费模式是广播消费,则直接返回
if (!subscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING)
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
return response;
//获取broker缓存的此consumerGroupInfo中关于此topic的订阅关系
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData)
log.warn("the consumer's subscription not exist, group: , topic:", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
//比较订阅关系版本
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion())
log.warn("The broker's subscription is not latest, group: ", requestHeader.getConsumerGroup(), subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
return response;
//如果订阅关系表达式不是TAG类型,那么构建consumerFilterData
if (!ExpressionType.isTagType(subscriptionData.getExpressionType()))
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), requestHeader.getConsumerGroup());
if (consumerFilterData == null)
response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
return response;
if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion())
log.warn("The broker's consumer filter data is not latest, group: , topic: , serverV: , clientV: ", requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
response.setRemark("the consumer's consumer filter data not latest");
return response;
//如果订阅关系表达式不是TAG类型,并且enablePropertyFilter没有开启支持SQL92,那么抛出异常
//也就是说,如果消费者使用SQL92模式订阅,那么需要现在broker端设置enablePropertyFilter=true
if (!ExpressionType.isTagType(subscriptionData.getExpressionType()) && !this.brokerController.getBrokerConfig().isEnablePropertyFilter())
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
return response;
MessageFilter messageFilter;
//重试topic是否支持filter过滤,默认false,即重试topic是不支持过滤额
if (this.brokerController.getBrokerConfig().isFilterSupportRetry())
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
else
//创建普通的ExpressionMessageFilter,内部保存了消费者启动时通过心跳上报的订阅关系
//一般基于tag订阅的情况下,consumerFilterData是null,通过subscriptionData进行过滤
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
/*
* 2 通过DefaultMessageStore#getMessage方法批量拉取消息,并且进行过滤操作
*/
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
/*
* 3 对于拉取结果GetMessageResult进行处理
*/
if (getMessageResult != null)
//设置拉去状态枚举名字
response.setRemark(getMessageResult.getStatus().name());
//设置下次拉取的consumeQueue的起始逻辑偏移量
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
//设置consumeQueue的最小、最大的逻辑偏移量maxOffset和minOffset
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
/*
* 3.1 判断并设置下次拉取消息的建议broker是MATER还是SLAVE
*/
//是否建议从SLAVE拉取消息
if (getMessageResult.isSuggestPullingFromSlave())
//设置建议的brokerId为从服务器的id 1
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
else
//否则,设置建议的brokerId为主服务器的id 0
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
//判断broker角色
switch (this.brokerController.getMessageStoreConfig().getBrokerRole())
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
//如果是SLAVE,并且从服务器不可读
if (!this.brokerController.getBrokerConfig().以上是关于RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码一万字的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic
RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic
RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码
RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码