8RocketMQ 源码解析之消息发送

Posted carl-zhao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了8RocketMQ 源码解析之消息发送相关的知识,希望对你有一定的参考价值。

当我们启动了元数据管理 NameServer 以及 消息管理 Broker。下面我们就可以进行消息发送了。RocketMQ 支持三种消息发送方式:

  • 同步消息发送(sync):当 Producer 发送消息到 Broker 时会同步等待消息处理结果
  • 异步消息发送(async):当 Producer 发送消息到 Broker 时会指定一个消息发送成功的回调函数,调用消息发送后立即返回不会阻塞。消息发送成功或者失败会在一个新的线程中进行处理。
  • 单向消息发送(oneway):当 Producer 发送消息到 Broker 时直接返回,只管把消息发送出去,并不关心 Broker 的响应结果。

1、发送消息 Demo

首先我们来看一下 RocketMQ 的发送消息 Demo 的代码,如下所示:

public class Producer 
    
    public static void main(String[] args) throws MQClientException, InterruptedException 
        
        DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
        producer.setNamesrvAddr ("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 1000; i++) 
            try 
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
             catch (Exception e) 
                e.printStackTrace();
                Thread.sleep(1000);
            
        
        
        producer.shutdown();
    

其实上面的代码逻辑很清晰:

  • 首先创建 DefaultMQProducer 对象,并且设置存储 RocketMQ 元数据的 NameServer 地址,然后开启 DefaultMQProducer 服务
  • 根据业务请求构建 RocketMQ 的 Message,发送消息并查看发送结果
  • 关闭 DefaultMQProducer 类,并且释放资源

2、DefaultMQProducer 启动

生产者的启动,我们应该看 DefaultMQProducerImpl#start 这个方法。

DefaultMQProducerImpl#start

    public void start(final boolean startFactory) throws MQClientException 
        switch (this.serviceState) 
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) 
                    this.defaultMQProducer.changeInstanceNameToPID();
                

                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) 
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                

                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) 
                    mQClientFactory.start();
                

                log.info("the producer [] start OK. sendMessageWithVIPChannel=", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        

        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

        this.startScheduledTask();
    

生产者的启动逻辑为:

  • 检查 producerGroup 是否符合要求,并且修改生产者的 instanceName 为 PID
  • 创建 MQClientInstance 实例,MQClientManager 这个单例对象来管理 MQClientInstance 实例,它的内部有一个 Map 用于维护 clientId 与 MQClientInstance 实例的映射关系。clientId 的生成规则为:客户端IP + instanceName + unitName(可选)。上一步是把生产者的 instanceName 修改为了 PID。这样就可以在同一台机器上面启动多台 RocketMQ 实例了。
  • 向 MQClientInstance 中注册生产者,方便后续网络请求
  • 启动 MQClientInstance 实例,这里主要是创建 NettyRemotingClient 这个客户端网络连接与 NameServer 与 Broker 进行通信。

3、Topic 路由机制

在我们讲消息发送之前,我们首先看一下 RocketMQ 的 Topic 路由机制。当 RocketMQ 生产者第一次发送消息的时候,它首先会去 NameServer 查询业务方已经创建好的 Topic 并且把它保存到内存当中。然后通过定时任务每隔 30 秒去 NameServer 拉取最新的 Topic 信息,这样就实现了 Producer 对 Topic 配置信息的动态感知。

RocketMQ 支持自动创建 Topic:就是业务方如果没有使用命令行创建 Topic,生产者可以直接发送到没有创建的 Topic 里面,RocketMQ 的 Broker 会自动创建这个 Broker。自动创建 Topic 的时序图如下:

下面我们来理一下 RocketMQ 是如何支持自动创建 Topic 的:

  • 首先在 Broker 配置 autoCreateTopicEnable = true (默认) 支持自动创建 Topic,然后会创建 TBW102 这个自动创建 Topic 的 Topic,并且上报到 NameServer。
  • 然后 Producer 发送消息到未创建的 Topic 当中,这个时候就会查询 TBW102 这个默认 Topic 进行发送。
  • 在 Broker 如果不存在该 topic 的路由信息,则尝试查找默认topic的路由信息,如果能找到,则根据请求中的队列数,创建对应的 topic 路由信息。注意:此时还存在于 Broker 的内存中
  • Broker 每隔 30 秒会上报自己的 Topic 路由信息到 NameServer,然后 Producer 会更新 NameServer 的 Topic 路由信息到本地缓存当中。
  • Producer 更新 Topic 路由信息到本地缓存中后,后续再往之前未创建的这个 Topic 发送消息的时候,就会直接从本地缓存中获取这个 Topic 路由信息了。

注意:RocketMQ中的路由消息是持久化在Broker中的,NameServer中的路由
信息来自Broker的心跳包并存储在内存中。

4、消息发送

从上面的 demo 中可以发现,当我们需要消息发送的时候必须构建出来 RocketMQ 里面定义的消息对象,下面我们首先来看一下这个消息里面有哪些内容:

Message.java

public class Message implements Serializable 

    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
    private String transactionId;

	...

  • topic:topic 就是消息主题,也就是消息要发往哪个地方。
  • flag:消息Flag,RocketMQ暂不做处理
  • body:消息体,也就是我们需要传递的业务数据
  • transactionId:事务消息的事务ID
  • properties:消息扩展参数,RocketMQ 默认使用了:TAGS – 消息TAG,用于消息过滤;KEYS – Message索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息;WAIT – 消息发送时是否等消息存储完成后再返回;delayTimeLevel – 消息延迟级别,用于定时消息或消息重试。

现在需要发送的消息已经构建好了,下面我们来看一下生产的消息发送过程。

4.1 消息校验

在消息发送之前首先会验证一下消息信息。校验逻辑如下:

  • 消息主题校验:消息主题名称不能为空;消息主题格式名称、长度是否合法;不允许往系统主题中发送消息
  • 消息体校验:消息体不能为空;消息体长度是否合法;

4.2 查询 Topic 路由

在消息真正发送之前,我们需要获取 Topic 路由信息,这样我们才知道我们往哪个具体的 Broker 发送消息。

DefaultMQProducerImpl#tryToFindTopicPublishInfo

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) 
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) 
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) 
            return topicPublishInfo;
         else 
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        
    

可以参考一下第三小节的 Topic 的路由机制,RocketMQ 的查询 Topic 的路由优先级如下:

  • 本地缓存的 Topic 名称的路由信息
  • 根据传入的 Topic 名称去 NameServer 查询 Topic 路由信息
  • 根据默认的 Topic 名称去 NameServer 查询 Topic 路由信息
  • 如果没有找到 Topic 的路由信息,则抛出没有找到 Topic 异常

4.3 选择消息队列

根据路由信息选择消息队列,返回的消息队列按照broker序号进行排序。消息发送端采用重试机制,由 retryTimesWhenSendFailed 指定同步方式重试次数,异步重试机制在收到消息发送结果执行回调之前进行重试,由retryTimesWhenSendAsyncFailed 指定异常重试次数。接下来就是循环执行,选择消息队列、发送消息,发送成功则返回,收到异常则重试。

4.4 发送消息到 Broker

发送消息到 Broker 的逻辑在 DefaultMQProducerImpl#sendKernelImpl 这个方法当中:

DefaultMQProducerImpl#sendKernelImpl

    private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException 
        long beginStartTime = System.currentTimeMillis();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) 
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        

        SendMessageContext context = null;
        if (brokerAddr != null) 
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try 
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) 
                    MessageClientIDSetter.setUniqID(msg);
                

                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) 
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                

                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) 
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) 
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                

                if (hasCheckForbiddenHook()) 
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                

                if (this.hasSendMessageHook()) 
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) 
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) 
                        context.setMsgType(MessageType.Delay_Msg);
                    
                    this.executeSendMessageHookBefore(context);
                

                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) 
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) 
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) 
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    
                

                SendResult sendResult = null;
                switch (communicationMode) 
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) 
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        

                        if (topicWithNamespace) 
                            if (!messageCloned) 
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        

                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) 
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout")Azure SB 队列将消息发送到禁用的死信队列

正确计算发送到死信 SQS 队列的消息数

RocketMQ源码(23)—DefaultMQPushConsumer消费者重试消息和死信消息源码

RabbitMq高级特性之死信队列 通俗易懂 超详细 内含案例

RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信

RabbitMQ之消息可靠性死信交换机惰性队列及集群