RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic相关的知识,希望对你有一定的参考价值。
基于RocketMQ release-4.9.3,深入的介绍了Broker接收消息源码,以及自动创建Topic的源码。
此前我们学习RocketMQ的Broker接收Producer消息的入口源码:RocketMQ源码(9)—Broker接收消息入口源码,在文章的最后我们到了asyncSendMessage方法。
asyncSendMessage方法用来处理来自producer发送的消息,内部内容非常多,本次我们学习asyncSendMessage方法的整体流程,以及自动创建topic的源码。
文章目录
1 asyncSendMessage异步处理单条消息
该方法是broker处理单条消息的通用入口方法,该方法非常重要,大概步骤为:
- 调用preSend方法创建响应的命令对象,包括自动创建topic的逻辑,随后创建响应头对象。
- 随后创建MessageExtBrokerInner对象,从请求中获取消息的属性并设置到对象属性中,例如消息体,topic等等。
- 判断如果是重试或者死信消息,则调用handleRetryAndDLQ方法处理重试和死信队列消息,如果已重试次数大于最大重试次数,那么替换topic为死信队列topic,消息会被发送至死信队列。
- 判断如果是事务准备消息,并且不会拒绝处理事务消息,则调用asyncPrepareMessage方法以异步的方式处理、存储事务准备消息;
- 否则表示普通消息,调用asyncPutMessage方法处理、存储普通消息。asyncPutMessage以异步方式将消息存储到存储器中,处理器可以处理下一个请求而不是等待结果,当结果完成时,以异步方式通知客户端。
- 最后调用handlePutMessageResultFuture方法处理消息存储的处理结果。
/**
* SendMessageProcessor的方法
* <p>
* 处理单条消息
*/
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader)
/*
* 1 创建响应的命令对象,包括自动创建topic的逻辑
*/
final RemotingCommand response = preSend(ctx, request, requestHeader);
//获取响应头
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
if (response.getCode() != -1)
return CompletableFuture.completedFuture(response);
//获取消息体
final byte[] body = request.getBody();
//获取队列id
int queueIdInt = requestHeader.getQueueId();
//从broker的topicConfigTable缓存中根据topicName获取TopicConfig
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
//如果队列id小于0,则随机选择一个写队列索引作为id
if (queueIdInt < 0)
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
//构建消息对象,保存着要存入commitLog的数据
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
//设置topic
msgInner.setTopic(requestHeader.getTopic());
//设置队列id
msgInner.setQueueId(queueIdInt);
/*
* 2 处理重试和死信队列消息,将会对死信消息替换为死信topic
*/
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig))
return CompletableFuture.completedFuture(response);
/*
* 设置一系列属性
*/
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
//设置到properties属性中
MessageAccessor.setProperties(msgInner, origProps);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
//WAIT属性表示 消息发送时是否等消息存储完成后再返回
if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK))
// There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.
// It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.
//不需要存储"WAIT=true"属性,从propertiesString中移除它,为每个消息节省9个字节。
String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
//将没有WAIT属性的origProps存入msgInner的propertiesString属性
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
// Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later
//将WAIT属性重新存入origProps集合中,因为msgInner.isWaitStoreMsgOK()稍后将被调用
origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
else
//将没有WAIT属性的origProps存入msgInner的propertiesString属性
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
CompletableFuture<PutMessageResult> putMessageResult = null;
/*
* 处理事务消息逻辑
*/
//TRAN_MSG属性值为true,表示为事务消息
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
//处理事务消息
if (transFlag != null && Boolean.parseBoolean(transFlag))
//判断是否需要拒绝事务消息,如果需要拒绝,则返回NO_PERMISSION异常
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage())
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
//调用asyncPrepareMessage方法以异步的方式处理、存储事务准备消息,底层仍是asyncPutMessage方法
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
else
//不是事务消息,那么调用asyncPutMessage方法处理,存储消息
//以异步方式将消息存储到存储器中,处理器可以处理下一个请求而不是等待结果,当结果完成时,以异步方式通知客户端
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
//处理消息存放的结果
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
2 preSend准备响应命令对象
该方法用于创建响应的命令对象,其中还包括topic的校验,以及自动创建topic的逻辑。
该方法中将会创建一个RemotingCommand对象,并且设置唯一id为请求的id。除此之外还会校验如果当前时间小于该broker的起始服务时间,那么broker会返回一个SYSTEM_ERROR,表示现在broker还不能提供服务。
在最后,会调用msgCheck方法进行一系列的校验,包括自动创建topic的逻辑。
/**
* SendMessageProcessor的方法
* <p>
* 准备响应数据
*/
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageRequestHeader requestHeader)
//创建响应命令对象
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
//设置唯一id为请求id
response.setOpaque(request.getOpaque());
//添加扩展字段属性"MSG_REGION"、"TRACE_ON"
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
log.debug("Receive SendMessage request command ", request);
//获取配置的broker的处理请求的起始服务时间,默认为0
final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
//如果当前时间小于起始时间,那么broker会返回一个SYSTEM_ERROR,表示现在broker还不能提供服务
if (this.brokerController.getMessageStore().now() < startTimestamp)
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
return response;
//设置code为-1
response.setCode(-1);
/*
* 消息校验,包括自动创建topic的逻辑
*/
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1)
return response;
return response;
2.1 msgCheck检查并自动创建topic
该方法进行一系列的消息校验,并且会尝试自动创建topic。大概步骤为:
- 校验如果当前broker没有写的权限,那么broker会返回一个NO_PERMISSION异常,sending message is forbidden,禁止向该broker发送消息。
- 校验topic不能为空,必须属于合法字符regex: ^[%|a-zA-Z0-9_-]+$,且长度不超过127个字符。
- 校验如果当前topic是不为允许使用的系统topic,那么抛出异常,默认不能为SCHEDULE_TOPIC_XXXX。
- 随后从broker的topicConfigTable缓存中根据topicName获取TopicConfig。
- 如果不存在该topic信息,比如第一次发送消息,那么首先调用createTopicInSendMessageMethod方法尝试创建普通topic,如果失败了,则判断是否是重试topic,即topic名是否以%RETRY%开头,如果是的话则尝试创建重试topic,如果还是创建失败,则返回TOPIC_NOT_EXIST异常信息。
- 如果找到或者创建了topic,则校验queutId 不能大于等于该broker的读或写的最大queueId。
/**
* AbstractSendMessageProcessor的方法
* <p>
* 消息校验,包括自动创建topic的逻辑
*/
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand response)
//如果当前broker没有写的权限,那么broker会返回一个NO_PERMISSION异常,sending message is forbidden,禁止向该broker发送消息
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic()))
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending message is forbidden");
return response;
//校验topic不能为空,必须属于合法字符regex: ^[%|a-zA-Z0-9_-]+$,且长度不超过127个字符
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response))
return response;
//校验如果当前topic是不为允许使用的系统topic,那么抛出异常,默认不能为SCHEDULE_TOPIC_XXXX
if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response))
return response;
//从broker的topicConfigTable缓存中根据topicName获取TopicConfig
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
//如果不存在该topic信息
if (null == topicConfig)
int topicSysFlag = 0;
if (requestHeader.isUnitMode())
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
else
topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
log.warn("the topic not exist, producer: ", requestHeader.getTopic(), ctx.channel().remoteAddress());
/*
* 尝试创建普通topic
*/
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
/*
* 尝试创建重试topic
*/
if (null == topicConfig)
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
topicSysFlag);
if (null == topicConfig)
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
//校验queutId 不能大于等于该broker的读或写的最大数量
int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid)
String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
queueIdInt,
topicConfig.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorInfo);
return response;
return response;
2.1.1 createTopicInSendMessageMethod创建普通topic
该方法尝试创建一个新的topic,大概步骤为:
- 首先需要获取锁防止并发创建相同的topic,获得锁之再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回。如果还是没有,那么创建topic。
- 获取默认topic的信息,用于作为模板创建新topic,默认的默认topic实际上就是TBW102,其有8个读写队列,权限为读写并且可继承,即7。
- 如果默认topic就是TBW102,并且如果broker配置不支持自动创建topic ,即autoCreateTopicEnable为false,那么设置权限为可读写,不可继承,即6。
- 如果默认topic配置的权限包括可继承,那么从默认topic继承属性创建新topic。
- 新建一个TopicConfig对象,选择默认队列数量与默认topic写队列数中最小的值作为新topic的读写队列数量,默认为4。设置权限,去除可继承权限等操作。
- 如果topic不为null,说明创建了新topic。将新的topic信息存入topicConfigTable缓存中,生成下一个数据版本,标识位置为true,随后调用persist方法将topic配置持久化到配置文件 user.home/store/config/topics.json中。
- 最后解锁,然后判断如果创建了新topic,那么马上调用registerBrokerAll方法向nameServer注册当前broker的新配置路由信息。
/**
* TopicConfigManager的方法
* <p>
* 创建普通topic,并持久化至配置文件 user.home/store/config/topics.json中
*
* @param topic 待创建topic
* @param defaultTopic 默认topic,用于作为模板创建新topic
* @param remoteAddress 远程地址
* @param clientDefaultTopicQueueNums 自动创建服务器不存在的topic时,默认创建的队列数,默认为4
* 可通过生产者DefaultMQProducer的defaultTopicQueueNums属性进行配置
* @param topicSysFlag topic标识
* @return topic配置
*/
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag)
TopicConfig topicConfig = null;
boolean createNew = false;
try
//需要加锁防止并发创建相同的topic
if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
try
//再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
//获取默认topic的信息,用于作为模板创建新topic,默认的默认topic实际上就是TBW102,其有8个读写队列,权限为读写并且可继承,即7
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null)
//如果默认topic就是TBW102
if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC))
//如果broker配置不支持自动创建topic,那么设置权限为可读写,不可继承,即6
if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable())
defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
//如果默认topic配置的权限包括可继承,那么从默认topic继承属性
if以上是关于RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字