RocketMQ源码—Producer生产者启动源码一万字
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—Producer生产者启动源码一万字相关的知识,希望对你有一定的参考价值。
基于RocketMQ 4.9.3,详细介绍了RocketMQ的客户端Producer生产者启动的源码。
Nameserver和Broker启动之后,RocketMQ就可以使用了。我们先开看看客户端生产者的启动流程源码。源码版本为4.9.3。
文章目录
- 1 创建DefaultMQProducer实例
- 2 start启动生产者
- 3 start启动MQClientInstance
- 4 总结
客户端常用的生产者类是DefaultMQProducer,此类的简单生产者案例如下,在RocketMQ源码的example模块下的org/apache/rocketmq/example/quickstart包中可以找到该快速案例。
public class Producer
public static void main(String[] args) throws MQClientException, InterruptedException
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
/*
* Launch the instance.
*/
producer.start();
for (int i = 0; i < 1000; i++)
try
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
catch (Exception e)
e.printStackTrace();
Thread.sleep(1000);
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
我们本次分析RocketMQ生产者启动的源码。实际上就是分析DefaultMQProducer的构造器以及start方法的源码。
1 创建DefaultMQProducer实例
DefaultMQProducer的构造器有很多,但最终都是调用下面三个参数的构造函数:
/**
* Constructor specifying namespace, producer group and RPC hook.
* 指定命名空间、生产者组和RPC钩子的构造函数。
*
* @param namespace Namespace for this MQ Producer instance.
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
*/
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook)
//命名空间
this.namespace = namespace;
//生产者组
this.producerGroup = producerGroup;
//根据RPC钩子创建DefaultMQProducerImpl实例,负责发送消息
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
这个构造器是指定了命名空间、生产者组和RPC钩子的构造器,其内部创建了一个DefaultMQProducerImpl实例,DefaultMQProducer可以看作是DefaultMQProducerImpl的包装类,开放给开发人员使用,DefaultMQProducer中的几乎所有的方法内部都是由DefaultMQProducerImpl实现的。这是门面模式设计模式。
下面是DefaultMQProducerImpl的构造器,也很简单。主要是初始化了一个异步发送消息的线程池,核心线程和最大线程数量都是当前服务器的可用线程数,线程池队列采用LinkedBlockingQueue,大小为50000。
/**
* @param defaultMQProducer defaultMQProducer
* @param rpcHook rpc钩子
*/
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook)
//保存defaultMQProducer和rpcHook
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
/*
* 异步发送消息的线程池队列
*/
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
/*
* 默认的异步发送消息的线程池
* 核心线程和最大线程数量都是当前服务器的可用线程数
*/
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory()
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r)
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
);
2 start启动生产者
DefaultMQProducer的构造器实际上没做什么太多的操作,主要是start方法内部会执行很多初始化操作,因此使用时,我们需要在发送或者查询消息之前调用该方法。
/**
* 启动生产者实例
* 为了准备这个实例,需要执行许多内部初始化过程,因此,必须在发送或查询消息之前调用这个方法。
*/
@Override
public void start() throws MQClientException
//根据namespace和producerGroup设置生产者组
this.setProducerGroup(withNamespace(this.producerGroup));
//默认生产者实现启动
this.defaultMQProducerImpl.start();
//消息轨迹跟踪服务,默认null
if (null != traceDispatcher)
try
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
catch (MQClientException e)
log.warn("trace dispatcher start failed ", e);
主要是defaultMQProducerImpl#start方法,该方法实现生产者的启动。主要步骤有如下几步:
- 调用checkConfig方法检查生产者的ProducerGroup是否符合规范,如果ProducerGroup为空,或者长度大于255个字符,或者包含非法字符(正常的匹配模式为
^[%|a-zA-Z0-9_-]+$)
,或者生产者组名为默认组名DEFAULT_PRODUCER,满足以上任意条件都校验不通过抛出异常。 - 调用getOrCreateMQClientInstance方法,然后根据clientId获取或者创建CreateMQClientInstance实例,并赋给mQClientFactory变量。
- 将当前生产者注册到MQClientInstance实例的producerTable属性中。
- 添加一个默认topic “TBW102”,将会在isAutoCreateTopicEnable属性开启时在broker上自动创建,RocketMQ会基于该Topic的配置创建新的Topic。
- 调用mQClientFactory#start方法启动CreateMQClientInstance客户端通信实例,初始化netty服务、各种定时任务、拉取消息服务、rebalanceService服务等等。
- 主动调用一次sendHeartbeatToAllBrokerWithLock发送心跳信息给所有broker。
- 启动一个定时任务,移除超时的request方法的请求,并执行异常回调,任务间隔1s。
/**
* DefaultMQProducerImpl的启动方法
*/
public void start() throws MQClientException
this.start(true);
/**
* DefaultMQProducerImpl的启动方法
*
* @param startFactory
* @throws MQClientException
*/
public void start(final boolean startFactory) throws MQClientException
//根据服务状态选择走不同的代码分支
switch (this.serviceState)
/**
* 服务仅仅创建,而不是启动状态,那么启动服务
*/
case CREATE_JUST:
//首先修改服务状态为服务启动失败,如果最终启动成功则再修改为RUNNING
this.serviceState = ServiceState.START_FAILED;
/*
* 1 检查生产者的配置信息
* 主要是检查ProducerGroup是否符合规范,
* 如果ProducerGroup为空,或者长度大于255个字符,或者包含非法字符(正常的匹配模式为 ^[%|a-zA-Z0-9_-]+$),或者生产者组名为默认组名DEFAULT_PRODUCER
* 满足以上任意条件都校验不通过抛出异常。
*/
this.checkConfig();
//如果ProducerGroup不是CLIENT_INNER_PRODUCER,那么将修改当前的instanceName为当前进程pid,PID就是服务的进程号。
//CLIENT_INNER_PRODUCER是客户端内部的生产者组名,该生产者用于发送消息回退请求
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP))
this.defaultMQProducer.changeInstanceNameToPID();
/*
* 2 获取MQClientManager实例,然后根据clientId获取或者创建CreateMQClientInstance实例,并赋给mQClientFactory变量
*/
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
/*
* 3 将当前生产者注册到MQClientInstance实例的producerTable属性中
*/
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
//如果注册失败,那么设置服务属性为CREATE_JUST,并抛出异常
if (!registerOK)
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
//添加一个默认topic “TBW102”,将会在isAutoCreateTopicEnable属性开启时在broker上自动创建,RocketMQ会基于该Topic的配置创建新的Topic
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
/*
* 4 启动CreateMQClientInstance客户端通信实例
* netty服务、各种定时任务、拉取消息服务、rebalanceService服务
*/
if (startFactory)
mQClientFactory.start();
log.info("the producer [] start OK. sendMessageWithVIPChannel=", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
//服务状态改为RUNNING
this.serviceState = ServiceState.RUNNING;
break;
/**
* 服务状态是其他的,那么抛出异常,即start方法仅能调用一次
*/
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
/*
* 5 发送心跳信息给所有broker
*/
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
/*
* 6 启动一个定时任务,移除超时的请求,并执行异常回调,任务间隔1s
*/
RequestFutureHolder.getInstance().startScheduledTask(this);
2.1 getOrCreateMQClientInstance获取或者创建MQClientInstance
该方法会先生成clientId,格式为 clientIP@instanceName@unitName。然后从本地缓存factoryTable中,查找该clientId的MQClientInstance实例。如果缓存中没有找到,则创建实例并存入缓存中。
MQClientInstance封装了RocketMQ底层网络处理API,Producer、Consumer都会使用到这个类,是Producer、Consumer与NameServer、Broker 打交道的网络通道。因此,同一个clientId对应同一个MQClientInstance实例就可以了,即同一个应用中的多个producer和consumer使用同一个MQClientInstance实例即可。
/**
* MQClientManager的方法
*
* @param clientConfig 生产者客户端配置类
* @param rpcHook rpc钩子
* @return MQClientInstance
*/
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook)
/*
* 构建clientId,格式为 clientIP@instanceName@unitName
*/
String clientId = clientConfig.buildMQClientId();
//从本地缓存factoryTable中,查找该clientId的MQClientInstance实例
MQClientInstance instance = this.factoryTable.get(clientId);
//如果不存在则创建并存入factoryTable
if (null == instance)
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null)
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[]", clientId);
else
log.info("Created new MQClientInstance for clientId:[]", clientId);
return instance;
2.1.1 创建MQClientInstance
MQClientInstance封装了RocketMQ底层网络处理API,Producer、Consumer都会使用到这个类,是Producer、Consumer与NameServer、Broker 打交道的网络通道。
创建MQClientInstance的时候,会初始化netty客户端,各种服务实例等等。
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook)
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
//创建netty客户端配置类实例
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
//客户端请求处理器
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
//创建客户端远程通信API实现类的实例,内部持有一个remotingClient
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
//更新namesrvAddr,即根据“;”将namesrvAddr字符串拆分为namesrvAddrList集合
if (this.clientConfig.getNamesrvAddr() != null)
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: ", this.clientConfig.getNamesrvAddr());
//客户端id
this.clientId = clientId;
//MQ的admin控制台操作的实现
this.mQAdminImpl = new MQAdminImpl(this);
//push模式下,拉取消息的服务
this.pullMessageService = new PullMessageService(this);
//消息消费的负载均衡服务
this.rebalanceService = new RebalanceService(this);
//客户端内部的生产者,该生产者用于发送消息回退请求
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
//消费者状态管理器
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("Created a new client Instance, InstanceIndex:, ClientID:, ClientConfig:, ClientVersion:, SerializerType:",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
2.2 registerProducer注册生产者
registerProducer方法尝试将当前生产者组和生产者实例的映射关系加入到MQClientInstance内部的producerTable属性中。如果此前已存在相同生产者组的数据,那么不会再次添加并返回false。
/**
* MQClientInstance的方法
*
* @param group 生产者组
* @param producer 生产者实例
* @return 是否加入成功
*/
public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer)
//任何一个为null直接返回false
if (null == group || null == producer)
return false;
//如果生产者组不存在,则添加到producerTable集合中,并返回null,否则返回已存在的producer
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
//如果已存入相同的生产者组的生产者,则返回false
if (prev != null)
log.warn("the producer group[] exist already.", group);
return false;
return true;
3 start启动MQClientInstance
该方法是producer启动的核心方法。启动CreateMQClientInstance客户端通信实例,将会初始化netty服务、各种定时任务、拉取消息服务、rebalanceService服务、内部的生产者服务等等。
3.1 mQClientAPIImpl#start启动netty客户端
该方法创建一个netty客户端,以及两个定时任务和netty事件监听器,注意并没有真正执行netty客户端的connect连接操作。
/**
* MQClientAPIImpl的方法
*/
public void start()
//调用NettyRemotingClient#start
this.remotingClient.start();
mQClientAPIImpl#start内部就是调用的NettyRemotingClient#start方法。NettyRemotingClient属于remoting包下面的类,该类被其他模块例如broker共同使用。
/**
* NettyRemotingClient的方法
* 创建netty客户端,并没有真正启动
*/
@Override
public void start()
//创建默认事件处理器组,默认4个线程,线程名以NettyClientWorkerThread_为前缀。
//主要用于执行在真正执行业务逻辑之前需要进行的SSL验证、编解码、空闲检查、网络连接管理等操作
//其工作时间位于IO线程组之后,process线程组之前
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory()
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r)
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
);
/*
* 初始化netty客户端
*/
//eventLoopGroupWorker线程组,默认一个线程
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NiosocketChannel.class)
//对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关
.option(ChannelOption.TCP_NODELAY, true)
//对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态
.option(ChannelOption.SO_KEEPALIVE, false)
//用来设置连接超时时长,单位是毫秒,默认3000
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>()
@Override
public void RocketMQ源码—Producer发送消息源码—发送消息的总体流程一万字