RocketMQ源码—Producer发送消息的总体流程一万字

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—Producer发送消息的总体流程一万字相关的知识,希望对你有一定的参考价值。

基于RocketMQ 4.9.3,详细的介绍了Producer发送消息的总体流程的源码,包括生产者重试机制、生产者故障转移机制、VIP通道等知识都会一一介绍。

文章目录

下面是一个最简单的producer的使用案例:

public class Producer 
    public static void main(String[] args) throws MQClientException, InterruptedException 

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Launch the instance.
         */
        producer.start();

        try 

            /*
             * Create a message instance, specifying topic, tag and message body.
             */
            Message msg = new Message("Topic1" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );

            /*
             * Call send message to deliver message to one of brokers.
             */
            SendResult sendResult = producer.send(msg);

            System.out.printf("%s%n", sendResult);
         catch (Exception e) 
            e.printStackTrace();
            Thread.sleep(1000);
        

        //producer.shutdown();
    

可以看到producer通过调用send方法发送消息,实际上RocketMQ的producer发送消息的模式可以分为三种:

  1. 单向发送:把消息发向Broker服务器,而不用管消息是否成功发送到Broker服务器,只管发送,不管结果。
  2. 同步发送:把消息发送给Broker服务器,如果消息成功发送给Broker服务器,能得到Broker服务器的响应结果。
  3. 异步发送:把消息发送给Broker服务器,如果消息成功发送给Broker服务器,能得到Broker服务器的响应结果。因为是异步发送,发送完消息以后,不用等待,等到Broker服务器的响应调用回调。

DefaultMQProducer提供了更多的send的重载方法,来实现上面三种发送模式:

模式方法描述
同步SendResult send(Collection msgs)同步批量发送消息
SendResult send(Collection msgs, long timeout)同步批量发送消息
SendResult send(Collection msgs, MessageQueue messageQueue)向指定的消息队列同步批量发送消息
SendResult send(Collection msgs, MessageQueue messageQueue, long timeout)向指定的消息队列同步批量发送消息,并指定超时时间
SendResult send(Message msg)同步单条发送消息
SendResult send(Message msg, long timeout)同步发送单条消息,并指定超时时间
SendResult send(Message msg, MessageQueue mq)向指定的消息队列同步发送单条消息
SendResult send(Message msg, MessageQueue mq, long timeout)向指定的消息队列同步单条发送消息,并指定超时时间
SendResult send(Message msg, MessageQueueSelector selector, Object arg)向消息队列同步单条发送消息,并指定发送队列选择器
SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)向消息队列同步单条发送消息,并指定发送队列选择器与超时时间
异步void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)向指定的消息队列异步单条发送消息
void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)向指定的消息队列异步单条发送消息,并指定超时时间
void send(Message msg, SendCallback sendCallback)异步发送消息
void send(Message msg, SendCallback sendCallback, long timeout)异步发送消息,并指定回调方法和超时时间
void send(Message msg, MessageQueue mq, SendCallback sendCallback)向指定的消息队列异步单条发送消息,并指定回调方法
void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)向指定的消息队列异步单条发送消息,并指定回调方法和超时时间
单向void sendOneway(Message msg)单向发送消息,不等待broker响应
void sendOneway(Message msg, MessageQueue mq)单向发送消息到指定队列,不等待broker响应
void sendOneway(Message msg, MessageQueueSelector selector, Object arg)单向发送消息到队列选择器的选中的队列,不等待broker响应

上次我们分析了producer的启动流程源码,这次我们分析producer发送消息的源码。

1 send源码入口

DefaultMQProducer#send方法作为源码分析的入口方法,该方法被使用者直接调用。其内部调用defaultMQProducerImpl#send方法发送消息。

1.1 同步消息

public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException 
    //根据namespace设置topic
    msg.setTopic(withNamespace(msg.getTopic()));
    //调用defaultMQProducerImpl#send发送消息
    return this.defaultMQProducerImpl.send(msg);

该方法内部调用defaultMQProducerImpl#send发送消息。

public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException 
    //调用另一个send方法,设置超时时间参数,默认3000ms
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());

该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。

/**
 * DefaultMQProducerImpl的方法
 *
 * @param msg     消息
 * @param timeout 超时时间,毫秒值
 */
public SendResult send(Message msg,
                       long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException 
    //调用另一个sendDefaultImpl方法,设置消息发送模式为SYNC,即同步;设置回调函数为null
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);

该方法内部又调用另一个sendDefaultImpl方法,设置消息发送模式为SYNC,即同步;设置回调函数为null。

1.2 单向消息

单向消息使用sendOneway发送。

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException
 
    //根据namespace设置topic
    msg.setTopic(withNamespace(msg.getTopic()));
    //调用defaultMQProducerImpl#sendOneway发送消息
    this.defaultMQProducerImpl.sendOneway(msg);

该方法内部调用defaultMQProducerImpl#sendOneway。

public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException 
    try 
        //调用sendDefaultImpl方法,设置消息发送模式为ONEWAY,即单向;设置回调函数为null;设置超时时间参数,默认3000ms
        this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
     catch (MQBrokerException e) 
        throw new MQClientException("unknown exception", e);
    

最终调用sendDefaultImpl方法,设置消息发送模式为ONEWAY,即单向;设置回调函数为null;设置超时时间参数,默认3000ms。

1.3 异步消息

异步消息使用带有callback函数的send方法发送。

public void send(Message msg,                 SendCallback sendCallback) throws MQClientException,
 RemotingException, InterruptedException 
    //根据namespace设置topic
    msg.setTopic(withNamespace(msg.getTopic()));
    //调用defaultMQProducerImpl#send发送消息,带有sendCallback参数
    this.defaultMQProducerImpl.send(msg, sendCallback);

该方法内部调用defaultMQProducerImpl#send方法发送消息,带有sendCallback参数。

public void send(Message msg,                 SendCallback sendCallback) throws MQClientException, 
RemotingException, InterruptedException 
    //该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。
    send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());

该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。

public void send(final Message msg, final SendCallback sendCallback, final long timeout)
        throws MQClientException, RemotingException, InterruptedException 
    //调用起始时间
    final long beginStartTime = System.currentTimeMillis();
    //获取异步发送执行器线程池
    ExecutorService executor = this.getAsyncSenderExecutor();
    try 
        /*
         * 使用线程池异步的执行sendDefaultImpl方法,即异步发送消息
         */
        executor.submit(new Runnable() 
            @Override
            public void run() 
                /*
                 * 发送之前计算超时时间,如果超时则不发送,直接执行回调函数onException方法
                 */
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) 
                    try 
                        //调用sendDefaultImpl方法执行发送操作
                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                     catch (Exception e) 
                        //抛出异常,执行回调函数onException方法
                        sendCallback.onException(e);
                    
                 else 
                    //超时,执行回调函数onException方法
                    sendCallback.onException(
                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                
            

        );
     catch (RejectedExecutionException e) 
        throw new MQClientException("executor rejected ", e);
    


该方法内部会获取获取异步发送执行器线程池,使用线程池异步的执行sendDefaultImpl方法,即异步发送消息。

发送之前计算超时时间,如果超时则不发送,直接执行回调函数onException方法。

2 sendDefaultImpl发送消息实现

该方法位于DefaultMQProducerImpl中,无论是同步消息、异步消息还是单向消息,最终都是调用该方法实现发送消息的逻辑的,因此该方法是真正的发送消息的方法入口。

该方法的大概步骤为:

  1. 调用makeSureStateOK方法,确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常。
  2. 调用checkMessage方法,校验消息的合法性。
  3. 调用tryToFindTopicPublishInfo方法,尝试查找消息的一个topic路由,用以发送消息。
  4. 计算循环发送消息的总次数timesTotal,默认情况下,同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改。实际上异步发送消息也会重试,最多两次,只不过不是通过这里的逻辑重试的。
  5. 调用selectOneMessageQueue方法,选择一个消息队列MessageQueue,该犯法支持失败故障转移。
  6. 调用sendKernelImpl方法发送消息,异步、同步、单向发送消息的模式都是通过该方法实现的。
  7. 调用updateFaultItem方法,更新本地错误表缓存数据,用于延迟时间的故障转移的功能。
  8. 根据发送模式执行不同的处理,如果是异步或者单向模式则直接返回,如果是同步模式,如果开启了retryAnotherBrokerWhenNotStoreOK开关,那么如果返回值不是返回SEND_OK状态,则仍然会执行重试发送。
  9. 此过程中,如果抛出了RemotingException、MQClientException、以及部分MQBrokerException异常时,那么会进行重试,如果抛出了InterruptedException,或者因为超时则不再重试。
/**
 * DefaultMQProducerImpl的方法
 *
 * @param msg               方法
 * @param communicationMode 通信模式
 * @param sendCallback      回调方法
 * @param timeout           超时时间
 */
private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException 
    /*
     * 1 确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常
     */
    this.makeSureStateOK();
    /*
     * 2 校验消息的合法性
     */
    Validators.checkMessage(msg, this.defaultMQProducer);
    //生成本次调用id
    final long invokeID = random.nextLong();
    //开始时间戳
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    //结束时间戳
    long endTimestamp = beginTimestampFirst;
    /*
     * 3 尝试查找消息的一个topic路由,用以发送消息
     */
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    //找到有效的topic信息
    if (topicPublishInfo != null && topicPublishInfo.ok()) 
        boolean callTimeout = false;
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        /*
         * 4 计算发送消息的总次数
         * 同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改
         */
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        //记录每一次重试时候发送消息目标Broker名字的数组
        String[] brokersSent = new String[timesTotal];
        /*
         * 在循环中,发送消息,包含消息重试的逻辑,总次数默认不超过3
         */
        for (; times < timesTotal; times++) 
            //上次使用过的broker,可以为空,表示第一次选择
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            /*
             * 5 选择一个消息队列MessageQueue
             */
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) 
                mq = mqSelected;
                //设置brokerName
                brokersSent[times] = mq.getBrokerName();
                try 
                    //调用的开始时间
                    beginTimestampPrev = System.currentTimeMillis();
                    //如果还有可调用次数,那么
                    if (times > 0) 
                        //在重新发送期间用名称空间重置topic
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    
                    //现在调用的开始时间 减去 开始时间,判断时候在调用发起之前就超时了
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    //如果已经超时了,那么直接结束循环,不再发送
                    //即超时的时候,即使还剩下重试次数,也不会再继续重试
                    if (timeout < costTime) 
                        callTimeout = true;
                        break;
                    
                    /*
                     * 6 异步、同步、单向发送消息
                     */
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    //方法调用结束时间戳
                    endTimestamp = System.currentTimeMillis();
                    /*
                     * 7 更新本地错误表缓存数据,用于延迟时间的故障转移的功能
                     */
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    /*
                     * 8 根据发送模式执行不同的处理
                     */
                    switch (communicationMode) 
                        //异步和单向模式直接返回null
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            //同步模式,如果开启了retryAnotherBrokerWhenNotStoreOK开关,那么如果不是返回SEND_OK状态,则仍然会执行重试发送
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) 
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) 
                                    continue;
                                
                            
                            //如果发送成功,则返回
                            return sendResult;
                        default:
                            break;
                    
                 catch (RemotingException e) 
                    //RemotingException异常,会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                 catch (MQClientException e) 
                    //MQClientException异常,会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                 catch (MQBrokerException e) 
                    //MQBrokerException异常
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    //如果返回的状态码属于一下几种,则支持重试:
                    //ResponseCode.TOPIC_NOT_EXIST,
                    //ResponseCode.SERVICE_NOT_AVAILABLE,
                    //ResponseCode.SYSTEM_ERROR,
                    //ResponseCode.NO_PERMISSION,
                    //ResponseCode.NO_BUYER_ID,
                    //ResponseCode.NOT_IN_CURRENT_UNIT

                    if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) 
                        continue;
                     else 
                        //其他状态码不支持重试,如果有结果则返回,否则直接抛出异常
                        if (sendResult != null) 
                            return sendResult;
                        

                        throw e;
                    
                 catch (InterruptedException e) 
                    //InterruptedException异常,不会执行重试
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());

                    log.warn(以上是关于RocketMQ源码—Producer发送消息的总体流程一万字的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码—Producer发送消息的总体流程一万字

RocketMQ源码—Producer发送单向同步异步消息源码一万字

RocketMQ源码 — 八 RocketMQ消息重试

8RocketMQ 源码解析之消息发送

8RocketMQ 源码解析之消息发送

RocketMQ源码学习- 1. 入门