RocketMQ源码—Producer发送消息源码—发送消息的总体流程一万字
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—Producer发送消息源码—发送消息的总体流程一万字相关的知识,希望对你有一定的参考价值。
基于RocketMQ 4.9.3,详细的介绍了Producer发送消息的总体流程的源码,包括生产者重试机制、生产者故障转移机制、VIP通道等知识都会一一介绍。
文章目录
- 1 send源码入口
- 2 sendDefaultImpl发送消息实现
- 3 总结
下面是一个最简单的producer的使用案例:
public class Producer
public static void main(String[] args) throws MQClientException, InterruptedException
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
/*
* Launch the instance.
*/
producer.start();
try
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("Topic1" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
catch (Exception e)
e.printStackTrace();
Thread.sleep(1000);
//producer.shutdown();
可以看到producer通过调用send方法发送消息,实际上RocketMQ的producer发送消息的模式可以分为三种:
- 单向发送:把消息发向Broker服务器,而不用管消息是否成功发送到Broker服务器,只管发送,不管结果。
- 同步发送:把消息发送给Broker服务器,如果消息成功发送给Broker服务器,能得到Broker服务器的响应结果。
- 异步发送:把消息发送给Broker服务器,如果消息成功发送给Broker服务器,能得到Broker服务器的响应结果。因为是异步发送,发送完消息以后,不用等待,等到Broker服务器的响应调用回调。
DefaultMQProducer提供了更多的send的重载方法,来实现上面三种发送模式:
模式 | 方法 | 描述 |
---|---|---|
同步 | SendResult send(Collection msgs) | 同步批量发送消息 |
SendResult send(Collection msgs, long timeout) | 同步批量发送消息 | |
SendResult send(Collection msgs, MessageQueue messageQueue) | 向指定的消息队列同步批量发送消息 | |
SendResult send(Collection msgs, MessageQueue messageQueue, long timeout) | 向指定的消息队列同步批量发送消息,并指定超时时间 | |
SendResult send(Message msg) | 同步单条发送消息 | |
SendResult send(Message msg, long timeout) | 同步发送单条消息,并指定超时时间 | |
SendResult send(Message msg, MessageQueue mq) | 向指定的消息队列同步发送单条消息 | |
SendResult send(Message msg, MessageQueue mq, long timeout) | 向指定的消息队列同步单条发送消息,并指定超时时间 | |
SendResult send(Message msg, MessageQueueSelector selector, Object arg) | 向消息队列同步单条发送消息,并指定发送队列选择器 | |
SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) | 向消息队列同步单条发送消息,并指定发送队列选择器与超时时间 | |
异步 | void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback) | 向指定的消息队列异步单条发送消息 |
void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) | 向指定的消息队列异步单条发送消息,并指定超时时间 | |
void send(Message msg, SendCallback sendCallback) | 异步发送消息 | |
void send(Message msg, SendCallback sendCallback, long timeout) | 异步发送消息,并指定回调方法和超时时间 | |
void send(Message msg, MessageQueue mq, SendCallback sendCallback) | 向指定的消息队列异步单条发送消息,并指定回调方法 | |
void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout) | 向指定的消息队列异步单条发送消息,并指定回调方法和超时时间 | |
单向 | void sendOneway(Message msg) | 单向发送消息,不等待broker响应 |
void sendOneway(Message msg, MessageQueue mq) | 单向发送消息到指定队列,不等待broker响应 | |
void sendOneway(Message msg, MessageQueueSelector selector, Object arg) | 单向发送消息到队列选择器的选中的队列,不等待broker响应 |
上次我们分析了producer的启动流程源码,这次我们分析producer发送消息的源码。
1 send源码入口
DefaultMQProducer#send方法作为源码分析的入口方法,该方法被使用者直接调用。其内部调用defaultMQProducerImpl#send方法发送消息。
1.1 同步消息
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
//根据namespace设置topic
msg.setTopic(withNamespace(msg.getTopic()));
//调用defaultMQProducerImpl#send发送消息
return this.defaultMQProducerImpl.send(msg);
该方法内部调用defaultMQProducerImpl#send发送消息。
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
//调用另一个send方法,设置超时时间参数,默认3000ms
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。
/**
* DefaultMQProducerImpl的方法
*
* @param msg 消息
* @param timeout 超时时间,毫秒值
*/
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
//调用另一个sendDefaultImpl方法,设置消息发送模式为SYNC,即同步;设置回调函数为null
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
该方法内部又调用另一个sendDefaultImpl方法,设置消息发送模式为SYNC,即同步;设置回调函数为null。
1.2 单向消息
单向消息使用sendOneway发送。
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException
//根据namespace设置topic
msg.setTopic(withNamespace(msg.getTopic()));
//调用defaultMQProducerImpl#sendOneway发送消息
this.defaultMQProducerImpl.sendOneway(msg);
该方法内部调用defaultMQProducerImpl#sendOneway。
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException
try
//调用sendDefaultImpl方法,设置消息发送模式为ONEWAY,即单向;设置回调函数为null;设置超时时间参数,默认3000ms
this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
catch (MQBrokerException e)
throw new MQClientException("unknown exception", e);
最终调用sendDefaultImpl方法,设置消息发送模式为ONEWAY,即单向;设置回调函数为null;设置超时时间参数,默认3000ms。
1.3 异步消息
异步消息使用带有callback函数的send方法发送。
public void send(Message msg, SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException
//根据namespace设置topic
msg.setTopic(withNamespace(msg.getTopic()));
//调用defaultMQProducerImpl#send发送消息,带有sendCallback参数
this.defaultMQProducerImpl.send(msg, sendCallback);
该方法内部调用defaultMQProducerImpl#send方法发送消息,带有sendCallback参数。
public void send(Message msg, SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException
//该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException
//调用起始时间
final long beginStartTime = System.currentTimeMillis();
//获取异步发送执行器线程池
ExecutorService executor = this.getAsyncSenderExecutor();
try
/*
* 使用线程池异步的执行sendDefaultImpl方法,即异步发送消息
*/
executor.submit(new Runnable()
@Override
public void run()
/*
* 发送之前计算超时时间,如果超时则不发送,直接执行回调函数onException方法
*/
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime)
try
//调用sendDefaultImpl方法执行发送操作
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
catch (Exception e)
//抛出异常,执行回调函数onException方法
sendCallback.onException(e);
else
//超时,执行回调函数onException方法
sendCallback.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
);
catch (RejectedExecutionException e)
throw new MQClientException("executor rejected ", e);
该方法内部会获取获取异步发送执行器线程池,使用线程池异步的执行sendDefaultImpl方法,即异步发送消息。
发送之前计算超时时间,如果超时则不发送,直接执行回调函数onException方法。
2 sendDefaultImpl发送消息实现
该方法位于DefaultMQProducerImpl中,无论是同步消息、异步消息还是单向消息,最终都是调用该方法实现发送消息的逻辑的,因此该方法是真正的发送消息的方法入口。
该方法的大概步骤为:
- 调用makeSureStateOK方法,确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常。
- 调用checkMessage方法,校验消息的合法性。
- 调用tryToFindTopicPublishInfo方法,尝试查找消息的一个topic路由,用以发送消息。
- 计算循环发送消息的总次数timesTotal,默认情况下,同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改。实际上异步发送消息也会重试,最多两次,只不过不是通过这里的逻辑重试的。
- 调用selectOneMessageQueue方法,选择一个消息队列MessageQueue,该犯法支持失败故障转移。
- 调用sendKernelImpl方法发送消息,异步、同步、单向发送消息的模式都是通过该方法实现的。
- 调用updateFaultItem方法,更新本地错误表缓存数据,用于延迟时间的故障转移的功能。
- 根据发送模式执行不同的处理,如果是异步或者单向模式则直接返回,如果是同步模式,如果开启了retryAnotherBrokerWhenNotStoreOK开关,那么如果返回值不是返回SEND_OK状态,则仍然会执行重试发送。
- 此过程中,如果抛出了RemotingException、MQClientException、以及部分MQBrokerException异常时,那么会进行重试,如果抛出了InterruptedException,或者因为超时则不再重试。
/**
* DefaultMQProducerImpl的方法
*
* @param msg 方法
* @param communicationMode 通信模式
* @param sendCallback 回调方法
* @param timeout 超时时间
*/
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
/*
* 1 确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常
*/
this.makeSureStateOK();
/*
* 2 校验消息的合法性
*/
Validators.checkMessage(msg, this.defaultMQProducer);
//生成本次调用id
final long invokeID = random.nextLong();
//开始时间戳
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
//结束时间戳
long endTimestamp = beginTimestampFirst;
/*
* 3 尝试查找消息的一个topic路由,用以发送消息
*/
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
//找到有效的topic信息
if (topicPublishInfo != null && topicPublishInfo.ok())
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
/*
* 4 计算发送消息的总次数
* 同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改
*/
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
//记录每一次重试时候发送消息目标Broker名字的数组
String[] brokersSent = new String[timesTotal];
/*
* 在循环中,发送消息,包含消息重试的逻辑,总次数默认不超过3
*/
for (; times < timesTotal; times++)
//上次使用过的broker,可以为空,表示第一次选择
String lastBrokerName = null == mq ? null : mq.getBrokerName();
/*
* 5 选择一个消息队列MessageQueue
*/
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null)
mq = mqSelected;
//设置brokerName
brokersSent[times] = mq.getBrokerName();
try
//调用的开始时间
beginTimestampPrev = System.currentTimeMillis();
//如果还有可调用次数,那么
if (times > 0)
//在重新发送期间用名称空间重置topic
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
//现在调用的开始时间 减去 开始时间,判断时候在调用发起之前就超时了
long costTime = beginTimestampPrev - beginTimestampFirst;
//如果已经超时了,那么直接结束循环,不再发送
//即超时的时候,即使还剩下重试次数,也不会再继续重试
if (timeout < costTime)
callTimeout = true;
break;
/*
* 6 异步、同步、单向发送消息
*/
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
//方法调用结束时间戳
endTimestamp = System.currentTimeMillis();
/*
* 7 更新本地错误表缓存数据,用于延迟时间的故障转移的功能
*/
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
/*
* 8 根据发送模式执行不同的处理
*/
switch (communicationMode)
//异步和单向模式直接返回null
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
//同步模式,如果开启了retryAnotherBrokerWhenNotStoreOK开关,那么如果不是返回SEND_OK状态,则仍然会执行重试发送
if (sendResult.getSendStatus() != SendStatus.SEND_OK)
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK())
continue;
//如果发送成功,则返回
return sendResult;
default:
break;
catch (RemotingException e)
//RemotingException异常,会执行重试
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
catch (MQClientException e)
//MQClientException异常,会执行重试
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
catch (MQBrokerException e)
//MQBrokerException异常
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
//如果返回的状态码属于一下几种,则支持重试:
//ResponseCode.TOPIC_NOT_EXIST,
//ResponseCode.SERVICE_NOT_AVAILABLE,
//ResponseCode.SYSTEM_ERROR,
//ResponseCode.NO_PERMISSION,
//ResponseCode.NO_BUYER_ID,
//ResponseCode.NOT_IN_CURRENT_UNIT
if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode()))
continue;
else
//其他状态码不支持重试,如果有结果则返回,否则直接抛出异常
if (sendResult != null)
return sendResult;
throw e;
catch (InterruptedException e)
//InterruptedException异常,不会执行重试
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn(RocketMQ源码—Producer发送单向同步异步消息源码一万字