RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic相关的知识,希望对你有一定的参考价值。

基于RocketMQ release-4.9.3,深入的介绍了Broker接收消息源码,以及自动创建Topic的源码。

此前我们学习RocketMQ的Broker接收Producer消息的入口源码:RocketMQ源码(9)—Broker接收消息入口源码,在文章的最后我们到了asyncSendMessage方法。

asyncSendMessage方法用来处理来自producer发送的消息,内部内容非常多,本次我们学习asyncSendMessage方法的整体流程,以及自动创建topic的源码

文章目录

1 asyncSendMessage异步处理单条消息

该方法是broker处理单条消息的通用入口方法,该方法非常重要,大概步骤为:

  1. 调用preSend方法创建响应的命令对象,包括自动创建topic的逻辑,随后创建响应头对象。
  2. 随后创建MessageExtBrokerInner对象,从请求中获取消息的属性并设置到对象属性中,例如消息体,topic等等。
  3. 判断如果是重试或者死信消息,则调用handleRetryAndDLQ方法处理重试和死信队列消息,如果已重试次数大于最大重试次数,那么替换topic为死信队列topic,消息会被发送至死信队列。
  4. 判断如果是事务准备消息,并且不会拒绝处理事务消息,则调用asyncPrepareMessage方法以异步的方式处理、存储事务准备消息;
  5. 否则表示普通消息,调用asyncPutMessage方法处理、存储普通消息。asyncPutMessage以异步方式将消息存储到存储器中,处理器可以处理下一个请求而不是等待结果,当结果完成时,以异步方式通知客户端。
  6. 最后调用handlePutMessageResultFuture方法处理消息存储的处理结果。
/**
 * SendMessageProcessor的方法
 * <p>
 * 处理单条消息
 */
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                            SendMessageContext mqtraceContext,
                                                            SendMessageRequestHeader requestHeader) 
    /*
     * 1 创建响应的命令对象,包括自动创建topic的逻辑
     */
    final RemotingCommand response = preSend(ctx, request, requestHeader);
    //获取响应头
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();

    if (response.getCode() != -1) 
        return CompletableFuture.completedFuture(response);
    
    //获取消息体
    final byte[] body = request.getBody();
    //获取队列id
    int queueIdInt = requestHeader.getQueueId();
    //从broker的topicConfigTable缓存中根据topicName获取TopicConfig
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    //如果队列id小于0,则随机选择一个写队列索引作为id
    if (queueIdInt < 0) 
        queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
    
    //构建消息对象,保存着要存入commitLog的数据
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    //设置topic
    msgInner.setTopic(requestHeader.getTopic());
    //设置队列id
    msgInner.setQueueId(queueIdInt);
    /*
     * 2 处理重试和死信队列消息,将会对死信消息替换为死信topic
     */
    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) 
        return CompletableFuture.completedFuture(response);
    
    /*
     * 设置一系列属性
     */
    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    //设置到properties属性中
    MessageAccessor.setProperties(msgInner, origProps);
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
    //WAIT属性表示 消息发送时是否等消息存储完成后再返回
    if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) 
        // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.
        // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.
        //不需要存储"WAIT=true"属性,从propertiesString中移除它,为每个消息节省9个字节。
        String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
        //将没有WAIT属性的origProps存入msgInner的propertiesString属性
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later
        //将WAIT属性重新存入origProps集合中,因为msgInner.isWaitStoreMsgOK()稍后将被调用
        origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
     else 
        //将没有WAIT属性的origProps存入msgInner的propertiesString属性
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    

    CompletableFuture<PutMessageResult> putMessageResult = null;
    /*
     * 处理事务消息逻辑
     */
    //TRAN_MSG属性值为true,表示为事务消息
    String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    //处理事务消息
    if (transFlag != null && Boolean.parseBoolean(transFlag)) 
        //判断是否需要拒绝事务消息,如果需要拒绝,则返回NO_PERMISSION异常
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) 
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                            + "] sending transaction message is forbidden");
            return CompletableFuture.completedFuture(response);
        
        //调用asyncPrepareMessage方法以异步的方式处理、存储事务准备消息,底层仍是asyncPutMessage方法
        putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
     else 
        //不是事务消息,那么调用asyncPutMessage方法处理,存储消息
        //以异步方式将消息存储到存储器中,处理器可以处理下一个请求而不是等待结果,当结果完成时,以异步方式通知客户端
        putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    
    //处理消息存放的结果
    return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);

2 preSend准备响应命令对象

该方法用于创建响应的命令对象,其中还包括topic的校验,以及自动创建topic的逻辑。

该方法中将会创建一个RemotingCommand对象,并且设置唯一id为请求的id。除此之外还会校验如果当前时间小于该broker的起始服务时间,那么broker会返回一个SYSTEM_ERROR,表示现在broker还不能提供服务。

在最后,会调用msgCheck方法进行一系列的校验,包括自动创建topic的逻辑。

/**
 * SendMessageProcessor的方法
 * <p>
 * 准备响应数据
 */
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
                                SendMessageRequestHeader requestHeader) 
    //创建响应命令对象
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    //设置唯一id为请求id
    response.setOpaque(request.getOpaque());
    //添加扩展字段属性"MSG_REGION"、"TRACE_ON"
    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("Receive SendMessage request command ", request);
    //获取配置的broker的处理请求的起始服务时间,默认为0
    final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    //如果当前时间小于起始时间,那么broker会返回一个SYSTEM_ERROR,表示现在broker还不能提供服务
    if (this.brokerController.getMessageStore().now() < startTimestamp) 
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
        return response;
    
    //设置code为-1
    response.setCode(-1);
    /*
     * 消息校验,包括自动创建topic的逻辑
     */
    super.msgCheck(ctx, requestHeader, response);
    if (response.getCode() != -1) 
        return response;
    

    return response;

2.1 msgCheck检查并自动创建topic

该方法进行一系列的消息校验,并且会尝试自动创建topic。大概步骤为:

  1. 校验如果当前broker没有写的权限,那么broker会返回一个NO_PERMISSION异常,sending message is forbidden,禁止向该broker发送消息。
  2. 校验topic不能为空,必须属于合法字符regex: ^[%|a-zA-Z0-9_-]+$,且长度不超过127个字符。
  3. 校验如果当前topic是不为允许使用的系统topic,那么抛出异常,默认不能为SCHEDULE_TOPIC_XXXX。
  4. 随后从broker的topicConfigTable缓存中根据topicName获取TopicConfig。
    1. 如果不存在该topic信息,比如第一次发送消息,那么首先调用createTopicInSendMessageMethod方法尝试创建普通topic,如果失败了,则判断是否是重试topic,即topic名是否以%RETRY%开头,如果是的话则尝试创建重试topic,如果还是创建失败,则返回TOPIC_NOT_EXIST异常信息。
  5. 如果找到或者创建了topic,则校验queutId 不能大于等于该broker的读或写的最大queueId。
/**
 * AbstractSendMessageProcessor的方法
 * <p>
 * 消息校验,包括自动创建topic的逻辑
 */
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
                                   final SendMessageRequestHeader requestHeader, final RemotingCommand response) 
    //如果当前broker没有写的权限,那么broker会返回一个NO_PERMISSION异常,sending message is forbidden,禁止向该broker发送消息
    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
            && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) 
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending message is forbidden");
        return response;
    
    //校验topic不能为空,必须属于合法字符regex: ^[%|a-zA-Z0-9_-]+$,且长度不超过127个字符
    if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) 
        return response;
    
    //校验如果当前topic是不为允许使用的系统topic,那么抛出异常,默认不能为SCHEDULE_TOPIC_XXXX
    if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) 
        return response;
    
    //从broker的topicConfigTable缓存中根据topicName获取TopicConfig
    TopicConfig topicConfig =
            this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    //如果不存在该topic信息
    if (null == topicConfig) 
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) 
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) 
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
             else 
                topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
            
        

        log.warn("the topic  not exist, producer: ", requestHeader.getTopic(), ctx.channel().remoteAddress());
        /*
         * 尝试创建普通topic
         */
        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                requestHeader.getTopic(),
                requestHeader.getDefaultTopic(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
        /*
         * 尝试创建重试topic
         */
        if (null == topicConfig) 
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) 
                topicConfig =
                        this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                                requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
                                topicSysFlag);
            
        

        if (null == topicConfig) 
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
                    + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            return response;
        
    
    //校验queutId 不能大于等于该broker的读或写的最大数量
    int queueIdInt = requestHeader.getQueueId();
    int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
    if (queueIdInt >= idValid) 
        String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
                queueIdInt,
                topicConfig.toString(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

        log.warn(errorInfo);
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(errorInfo);

        return response;
    
    return response;

2.1.1 createTopicInSendMessageMethod创建普通topic

该方法尝试创建一个新的topic,大概步骤为:

  1. 首先需要获取锁防止并发创建相同的topic,获得锁之再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回。如果还是没有,那么创建topic。
  2. 获取默认topic的信息,用于作为模板创建新topic,默认的默认topic实际上就是TBW102,其有8个读写队列,权限为读写并且可继承,即7。
  3. 如果默认topic就是TBW102,并且如果broker配置不支持自动创建topic ,即autoCreateTopicEnable为false,那么设置权限为可读写,不可继承,即6。
  4. 如果默认topic配置的权限包括可继承,那么从默认topic继承属性创建新topic。
    1. 新建一个TopicConfig对象,选择默认队列数量与默认topic写队列数中最小的值作为新topic的读写队列数量,默认为4。设置权限,去除可继承权限等操作。
  5. 如果topic不为null,说明创建了新topic。将新的topic信息存入topicConfigTable缓存中,生成下一个数据版本,标识位置为true,随后调用persist方法将topic配置持久化到配置文件 user.home/store/config/topics.json中。
  6. 最后解锁,然后判断如果创建了新topic,那么马上调用registerBrokerAll方法向nameServer注册当前broker的新配置路由信息。
  /**
   * TopicConfigManager的方法
   * <p>
   * 创建普通topic,并持久化至配置文件 user.home/store/config/topics.json中
   *
   * @param topic                       待创建topic
   * @param defaultTopic                默认topic,用于作为模板创建新topic
   * @param remoteAddress               远程地址
   * @param clientDefaultTopicQueueNums 自动创建服务器不存在的topic时,默认创建的队列数,默认为4
*                                    可通过生产者DefaultMQProducer的defaultTopicQueueNums属性进行配置
   * @param topicSysFlag                topic标识
   * @return topic配置
   */
  public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
                                                    final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) 
      TopicConfig topicConfig = null;
      boolean createNew = false;

      try 
          //需要加锁防止并发创建相同的topic
          if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) 
              try 
                  //再次尝试从topicConfigTable获取topic信息,如果获取到了,那么直接返回
                  topicConfig = this.topicConfigTable.get(topic);
                  if (topicConfig != null)
                      return topicConfig;
                  //获取默认topic的信息,用于作为模板创建新topic,默认的默认topic实际上就是TBW102,其有8个读写队列,权限为读写并且可继承,即7
                  TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
                  if (defaultTopicConfig != null) 
                   //如果默认topic就是TBW102
                      if (defaultTopic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) 
                       //如果broker配置不支持自动创建topic,那么设置权限为可读写,不可继承,即6
                          if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) 
                              defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
                          
                      
                      //如果默认topic配置的权限包括可继承,那么从默认topic继承属性
                      if以上是关于RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码系列 broker启动流程源码解析

RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字

RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字

RocketMQ Broker消息处理流程及部分源码解析

RocketMQ源码—Broker启动流程源码解析一万字

RocketMQ源码—Broker与NameServer的心跳服务源码