RocketMQ源码—Producer生产者启动源码一万字

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—Producer生产者启动源码一万字相关的知识,希望对你有一定的参考价值。

详细介绍了RocketMQ的客户端Producer生产者启动的源码。

Nameserver和Broker启动之后,RocketMQ就可以使用了。我们先开看看客户端生产者的启动流程源码。源码版本为4.9.3。

文章目录

客户端常用的生产者类是DefaultMQProducer,此类的简单生产者案例如下,在RocketMQ源码的example模块下的org/apache/rocketmq/example/quickstart包中可以找到该快速案例。

public class Producer 
    public static void main(String[] args) throws MQClientException, InterruptedException 
        
        /*
        * Instantiate with a producer group name.
        */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
        * Launch the instance.
        */
        producer.start();
        
        for (int i = 0; i < 1000; i++) 
            try 
                
                /*
                * Create a message instance, specifying topic, tag and message body.
                */
                Message msg = new Message("TopicTest" /* Topic */,
                                          "TagA" /* Tag */,
                                          ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                                         );
                
                /*
                * Call send message to deliver message to one of brokers.
                */
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
             catch (Exception e) 
                e.printStackTrace();
                Thread.sleep(1000);
            
        
        
        /*
        * Shut down once the producer instance is not longer in use.
        */
        producer.shutdown();
    

我们本次分析RocketMQ生产者启动的源码。实际上就是分析DefaultMQProducer的构造器以及start方法的源码。

1 创建DefaultMQProducer实例

DefaultMQProducer的构造器有很多,但最终都是调用下面三个参数的构造函数:

/**
 * Constructor specifying namespace, producer group and RPC hook.
 * 指定命名空间、生产者组和RPC钩子的构造函数。
 *
 * @param namespace Namespace for this MQ Producer instance.
 * @param producerGroup Producer group, see the name-sake field.
 * @param rpcHook RPC hook to execute per each remoting command execution.
 */
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) 
    //命名空间
    this.namespace = namespace;
    //生产者组
    this.producerGroup = producerGroup;
    //根据RPC钩子创建DefaultMQProducerImpl实例,负责发送消息
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);

这个构造器是指定了命名空间、生产者组和RPC钩子的构造器,其内部创建了一个DefaultMQProducerImpl实例,DefaultMQProducer可以看作是DefaultMQProducerImpl的包装类,开放给开发人员使用,DefaultMQProducer中的几乎所有的方法内部都是由DefaultMQProducerImpl实现的。这是门面模式设计模式。

下面是DefaultMQProducerImpl的构造器,也很简单。主要是初始化了一个异步发送消息的线程池,核心线程和最大线程数量都是当前服务器的可用线程数,线程池队列采用LinkedBlockingQueue,大小为50000。

/**
 * @param defaultMQProducer defaultMQProducer
 * @param rpcHook           rpc钩子
 */
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) 
    //保存defaultMQProducer和rpcHook
    this.defaultMQProducer = defaultMQProducer;
    this.rpcHook = rpcHook;
    /*
     * 异步发送消息的线程池队列
     */
    this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
    /*
     * 默认的异步发送消息的线程池
     * 核心线程和最大线程数量都是当前服务器的可用线程数
     */
    this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.asyncSenderThreadPoolQueue,
            new ThreadFactory() 
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) 
                    return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
                
            );

2 start启动生产者

DefaultMQProducer的构造器实际上没做什么太多的操作,主要是start方法内部会执行很多初始化操作,因此使用时,我们需要在发送或者查询消息之前调用该方法。

/**
 * 启动生产者实例
 * 为了准备这个实例,需要执行许多内部初始化过程,因此,必须在发送或查询消息之前调用这个方法。
 */
@Override
public void start() throws MQClientException 
    //根据namespace和producerGroup设置生产者组
    this.setProducerGroup(withNamespace(this.producerGroup));
    //默认生产者实现启动
    this.defaultMQProducerImpl.start();
    //消息轨迹跟踪服务,默认null
    if (null != traceDispatcher) 
        try 
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
         catch (MQClientException e) 
            log.warn("trace dispatcher start failed ", e);
        
    

主要是defaultMQProducerImpl#start方法,该方法实现生产者的启动。主要步骤有如下几步:

  1. 调用checkConfig方法检查生产者的ProducerGroup是否符合规范,如果ProducerGroup为空,或者长度大于255个字符,或者包含非法字符(正常的匹配模式为 ^[%|a-zA-Z0-9_-]+$),或者生产者组名为默认组名DEFAULT_PRODUCER,满足以上任意条件都校验不通过抛出异常。
  2. 调用getOrCreateMQClientInstance方法,然后根据clientId获取或者创建CreateMQClientInstance实例,并赋给mQClientFactory变量。
  3. 将当前生产者注册到MQClientInstance实例的producerTable属性中。
  4. 添加一个默认topic “TBW102”,将会在isAutoCreateTopicEnable属性开启时在broker上自动创建,RocketMQ会基于该Topic的配置创建新的Topic。
  5. 调用mQClientFactory#start方法启动CreateMQClientInstance客户端通信实例,初始化netty服务、各种定时任务、拉取消息服务、rebalanceService服务等等。
  6. 主动调用一次sendHeartbeatToAllBrokerWithLock发送心跳信息给所有broker。
  7. 启动一个定时任务,移除超时的request方法的请求,并执行异常回调,任务间隔1s。
/**
 * DefaultMQProducerImpl的启动方法
 */
public void start() throws MQClientException 
    this.start(true);


/**
 * DefaultMQProducerImpl的启动方法
 *
 * @param startFactory
 * @throws MQClientException
 */
public void start(final boolean startFactory) throws MQClientException 
    //根据服务状态选择走不同的代码分支
    switch (this.serviceState) 
        /**
         * 服务仅仅创建,而不是启动状态,那么启动服务
         */
        case CREATE_JUST:
            //首先修改服务状态为服务启动失败,如果最终启动成功则再修改为RUNNING
            this.serviceState = ServiceState.START_FAILED;
            /*
             * 1 检查生产者的配置信息
             * 主要是检查ProducerGroup是否符合规范,
             * 如果ProducerGroup为空,或者长度大于255个字符,或者包含非法字符(正常的匹配模式为 ^[%|a-zA-Z0-9_-]+$),或者生产者组名为默认组名DEFAULT_PRODUCER
             * 满足以上任意条件都校验不通过抛出异常。
             */
            this.checkConfig();
            //如果ProducerGroup不是CLIENT_INNER_PRODUCER,那么将修改当前的instanceName为当前进程pid,PID就是服务的进程号。
            //CLIENT_INNER_PRODUCER是客户端内部的生产者组名,该生产者用于发送消息回退请求
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) 
                this.defaultMQProducer.changeInstanceNameToPID();
            
            /*
             * 2 获取MQClientManager实例,然后根据clientId获取或者创建CreateMQClientInstance实例,并赋给mQClientFactory变量
             */
            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            /*
             * 3 将当前生产者注册到MQClientInstance实例的producerTable属性中
             */
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            //如果注册失败,那么设置服务属性为CREATE_JUST,并抛出异常
            if (!registerOK) 
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
            
            //添加一个默认topic “TBW102”,将会在isAutoCreateTopicEnable属性开启时在broker上自动创建,RocketMQ会基于该Topic的配置创建新的Topic
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
            /*
             * 4 启动CreateMQClientInstance客户端通信实例
             * netty服务、各种定时任务、拉取消息服务、rebalanceService服务
             */
            if (startFactory) 
                mQClientFactory.start();
            

            log.info("the producer [] start OK. sendMessageWithVIPChannel=", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
            //服务状态改为RUNNING
            this.serviceState = ServiceState.RUNNING;
            break;
        /**
         * 服务状态是其他的,那么抛出异常,即start方法仅能调用一次
         */
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
        default:
            break;
    
    /*
     * 5 发送心跳信息给所有broker
     */
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    /*
     * 6 启动一个定时任务,移除超时的请求,并执行异常回调,任务间隔1s
     */
    RequestFutureHolder.getInstance().startScheduledTask(this);


2.1 getOrCreateMQClientInstance获取或者创建MQClientInstance

该方法会先生成clientId,格式为 clientIP@instanceName@unitName。然后从本地缓存factoryTable中,查找该clientId的MQClientInstance实例。如果缓存中没有找到,则创建实例并存入缓存中。

MQClientInstance封装了RocketMQ底层网络处理API,Producer、Consumer都会使用到这个类,是Producer、Consumer与NameServer、Broker 打交道的网络通道。因此,同一个clientId对应同一个MQClientInstance实例就可以了,即同一个应用中的多个producer和consumer使用同一个MQClientInstance实例即可。

/**
 * MQClientManager的方法
 *
 * @param clientConfig 生产者客户端配置类
 * @param rpcHook      rpc钩子
 * @return MQClientInstance
 */
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) 
    /*
     * 构建clientId,格式为 clientIP@instanceName@unitName
     */
    String clientId = clientConfig.buildMQClientId();
    //从本地缓存factoryTable中,查找该clientId的MQClientInstance实例
    MQClientInstance instance = this.factoryTable.get(clientId);
    //如果不存在则创建并存入factoryTable
    if (null == instance) 
        instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                        this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) 
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[]", clientId);
         else 
            log.info("Created new MQClientInstance for clientId:[]", clientId);
        
    

    return instance;

2.1.1 创建MQClientInstance

MQClientInstance封装了RocketMQ底层网络处理API,Producer、Consumer都会使用到这个类,是Producer、Consumer与NameServer、Broker 打交道的网络通道。

创建MQClientInstance的时候,会初始化netty客户端,各种服务实例等等。

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) 
    this.clientConfig = clientConfig;
    this.instanceIndex = instanceIndex;
    //创建netty客户端配置类实例
    this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
    //客户端请求处理器
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);
    //创建客户端远程通信API实现类的实例,内部持有一个remotingClient
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
    //更新namesrvAddr,即根据“;”将namesrvAddr字符串拆分为namesrvAddrList集合
    if (this.clientConfig.getNamesrvAddr() != null) 
        this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        log.info("user specified name server address: ", this.clientConfig.getNamesrvAddr());
    
    //客户端id
    this.clientId = clientId;
    //MQ的admin控制台操作的实现
    this.mQAdminImpl = new MQAdminImpl(this);
    //push模式下,拉取消息的服务
    this.pullMessageService = new PullMessageService(this);
    //消息消费的负载均衡服务
    this.rebalanceService = new RebalanceService(this);
    //客户端内部的生产者,该生产者用于发送消息回退请求
    this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    this.defaultMQProducer.resetClientConfig(clientConfig);
    //消费者状态管理器
    this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

    log.info("Created a new client Instance, InstanceIndex:, ClientID:, ClientConfig:, ClientVersion:, SerializerType:",
            this.instanceIndex,
            this.clientId,
            this.clientConfig,
            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());

2.2 registerProducer注册生产者

registerProducer方法尝试将当前生产者组和生产者实例的映射关系加入到MQClientInstance内部的producerTable属性中。如果此前已存在相同生产者组的数据,那么不会再次添加并返回false。

/**
 * MQClientInstance的方法
 *
 * @param group    生产者组
 * @param producer 生产者实例
 * @return 是否加入成功
 */
public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) 
    //任何一个为null直接返回false
    if (null == group || null == producer) 
        return false;
    
    //如果生产者组不存在,则添加到producerTable集合中,并返回null,否则返回已存在的producer
    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
    //如果已存入相同的生产者组的生产者,则返回false
    if (prev != null) 
        log.warn("the producer group[] exist already.", group);
        return false;
    

    return true;

3 start启动MQClientInstance

该方法是producer启动的核心方法。启动CreateMQClientInstance客户端通信实例,将会初始化netty服务、各种定时任务、拉取消息服务、rebalanceService服务、内部的生产者服务等等。

3.1 mQClientAPIImpl#start启动netty客户端

该方法创建一个netty客户端,以及两个定时任务和netty事件监听器,注意并没有真正执行netty客户端的connect连接操作。

/**
 * MQClientAPIImpl的方法
 */
public void start() 
    //调用NettyRemotingClient#start
    this.remotingClient.start();

mQClientAPIImpl#start内部就是调用的NettyRemotingClient#start方法。NettyRemotingClient属于remoting包下面的类,该类被其他模块例如broker共同使用。

/**
 * NettyRemotingClient的方法
 * 创建netty客户端,并没有真正启动
 */
@Override
public void start() 
    //创建默认事件处理器组,默认4个线程,线程名以NettyClientWorkerThread_为前缀。
    //主要用于执行在真正执行业务逻辑之前需要进行的SSL验证、编解码、空闲检查、网络连接管理等操作
    //其工作时间位于IO线程组之后,process线程组之前
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() 

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) 
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                
            );
    /*
     * 初始化netty客户端
     */
    //eventLoopGroupWorker线程组,默认一个线程
    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NiosocketChannel.class)
            //对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关
            .option(ChannelOption.TCP_NODELAY, true)
            //对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态
            .option(ChannelOption.SO_KEEPALIVE, false)
            //用来设置连接超时时长,单位是毫秒,默认3000
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .handler(new ChannelInitializer<SocketChannel>() 
                @Override
                public void initChannel以上是关于RocketMQ源码—Producer生产者启动源码一万字的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码—Producer发送消息源码—发送消息的总体流程一万字

RocketMQ源码—Producer发送消息的总体流程一万字

RocketMQ源码学习- 1. 入门

rocketmq源码心得

打怪升级rocketMqproducer源码分析

RocketMQ源码(15)—消费者DefaultMQPushConsumer启动主要流程源码