RocketMq producer 发送一条消息所经过的流程

Posted iscys

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMq producer 发送一条消息所经过的流程相关的知识,希望对你有一定的参考价值。

 前言:

  RocketMq producer 在发送一条消息时候,从 producer --nameSrv -- Broker  中间经过了什么样子的数据交互

开始:

如下是 Producer 发送消息的一个demo例子:

    //1. 初始化 mq producer
        DefaultMQProducer mqProducer =new DefaultMQProducer("iscys-test");
        //2.设置nameServer 地址
        mqProducer.setNamesrvAddr("localhost:9876");
        //3. 开启mq producer,这一步是必须的,会做一些连接初始化检测工作
        mqProducer.start();
        //4.创建 Message
        Message msg = new Message("test-topis", "iscys-test".getBytes());
        //5.发送消息,设置回调,消息发送成功会回调函数
        mqProducer.send(msg, new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                //在消息发送成功之后,我们收到broker的响应通知后,会进行回调
                System.out.println("send success");
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("send fail");

            }
        });

构建发送消息:

 public void send(Message msg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        try {
            //默认异步发送,超时3s
            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
        } catch (MQBrokerException e) {
            throw new MQClientException("unknownn exception", e);
        }
    }

从NameSrv 中获取topic 配置的相关信息,比如 broker 地址,队列数 之类的。

 private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //1.尝试取获取从NameSrv 中获取topic 相关信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //2.选择一个消息队列,默认为4个,在创建新的Topic时候
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        //3.发送消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:

主要看一下如上代码第一步 尝试获取Topic 信息  tryToFindTopicPublishInfo

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //1.从 topicPublishInfoTable 从尝试从Map中获取,如果没有获取到,请求NameSrv
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //2.从NameSrv 中拉取topic 信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
       
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            //3.说明获取到TOPIC 的信息
            return topicPublishInfo;
        } else {
            //4.如果第2步执行后 NameSrv 中没有topic 信息,获取默认的TBW102 topic 的信息,这个是肯定能获取到的
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

 

   1. 会先从 topicPublishInfoTable 缓存中获取topic 配置信息

   2.缓存没有,就从NameSrv 中拉取。

   3.如果获取到了,则返回。

   4.NameSrv 没有得到相关到topic 信息,说明是新到topic ,则就请求获取TBW102 topic 配置信息,这个肯定能获取到,封装使用TBW102的配置。

 

请求NameSrv 非默认的topic

 public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        //1.非默认的topic ,默认Topic 为TBW102
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }

执行从NameSrv 获取topic 请求

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 1.如果请求的是默认的Topic 请求会走到这里
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        // 2.新的Topic 会先从NameSrv 中获取一遍,如果NameSrv 中没有获取到,会抛出异常
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }

    1. 从NameSrv 中获取到 TBW102 的topic 信息,这个一般都是有的。

    2. 新的topic 会从NameSrv 中获取信息,如果不存在,返回false。

 

获取到topic信息后封装成 TopicPublishInfo:

public class TopicPublishInfo {
    private boolean orderTopic = false;
    //用来检测Topic 在Broker 真实存在的,不存在false
    private boolean haveTopicRouterInfo = false;
    //消息队列的
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    //请求NameSrv 返回的TOPIC 具体信息 
    private TopicRouteData topicRouteData;

 

以上是关于RocketMq producer 发送一条消息所经过的流程的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ Producer发送消息过程

RocketMQ源码—Producer发送消息源码—发送消息的总体流程一万字

RocketMQ源码—Producer发送消息源码—单向同步异步发送消息一万字

RocketMQ源码—Producer发送消息的总体流程一万字

RocketMQ源码 — 八 RocketMQ消息重试

RocketMQ源码—Producer发送单向同步异步消息源码一万字