8RocketMQ 源码解析之消息发送
Posted carl-zhao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了8RocketMQ 源码解析之消息发送相关的知识,希望对你有一定的参考价值。
当我们启动了元数据管理 NameServer 以及 消息管理 Broker。下面我们就可以进行消息发送了。RocketMQ 支持三种消息发送方式:
- 同步消息发送(sync):当 Producer 发送消息到 Broker 时会同步等待消息处理结果
- 异步消息发送(async):当 Producer 发送消息到 Broker 时会指定一个消息发送成功的回调函数,调用消息发送后立即返回不会阻塞。消息发送成功或者失败会在一个新的线程中进行处理。
- 单向消息发送(oneway):当 Producer 发送消息到 Broker 时直接返回,只管把消息发送出去,并不关心 Broker 的响应结果。
1、发送消息 Demo
首先我们来看一下 RocketMQ 的发送消息 Demo 的代码,如下所示:
public class Producer
public static void main(String[] args) throws MQClientException, InterruptedException
DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
producer.setNamesrvAddr ("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1000; i++)
try
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
catch (Exception e)
e.printStackTrace();
Thread.sleep(1000);
producer.shutdown();
其实上面的代码逻辑很清晰:
- 首先创建 DefaultMQProducer 对象,并且设置存储 RocketMQ 元数据的 NameServer 地址,然后开启 DefaultMQProducer 服务
- 根据业务请求构建 RocketMQ 的 Message,发送消息并查看发送结果
- 关闭 DefaultMQProducer 类,并且释放资源
2、DefaultMQProducer 启动
生产者的启动,我们应该看 DefaultMQProducerImpl#start
这个方法。
DefaultMQProducerImpl#start
public void start(final boolean startFactory) throws MQClientException
switch (this.serviceState)
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP))
this.defaultMQProducer.changeInstanceNameToPID();
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK)
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory)
mQClientFactory.start();
log.info("the producer [] start OK. sendMessageWithVIPChannel=", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.startScheduledTask();
生产者的启动逻辑为:
- 检查 producerGroup 是否符合要求,并且修改生产者的 instanceName 为 PID
- 创建 MQClientInstance 实例,MQClientManager 这个单例对象来管理 MQClientInstance 实例,它的内部有一个 Map 用于维护 clientId 与 MQClientInstance 实例的映射关系。clientId 的生成规则为:
客户端IP + instanceName + unitName(可选)
。上一步是把生产者的 instanceName 修改为了 PID。这样就可以在同一台机器上面启动多台 RocketMQ 实例了。 - 向 MQClientInstance 中注册生产者,方便后续网络请求
- 启动 MQClientInstance 实例,这里主要是创建 NettyRemotingClient 这个客户端网络连接与 NameServer 与 Broker 进行通信。
3、Topic 路由机制
在我们讲消息发送之前,我们首先看一下 RocketMQ 的 Topic 路由机制。当 RocketMQ 生产者第一次发送消息的时候,它首先会去 NameServer 查询业务方已经创建好的 Topic 并且把它保存到内存当中。然后通过定时任务每隔 30 秒去 NameServer 拉取最新的 Topic 信息,这样就实现了 Producer 对 Topic 配置信息的动态感知。
RocketMQ 支持自动创建 Topic:就是业务方如果没有使用命令行创建 Topic,生产者可以直接发送到没有创建的 Topic 里面,RocketMQ 的 Broker 会自动创建这个 Broker。自动创建 Topic 的时序图如下:
下面我们来理一下 RocketMQ 是如何支持自动创建 Topic 的:
- 首先在 Broker 配置 autoCreateTopicEnable = true (默认) 支持自动创建 Topic,然后会创建
TBW102
这个自动创建 Topic 的 Topic,并且上报到 NameServer。 - 然后 Producer 发送消息到未创建的 Topic 当中,这个时候就会查询
TBW102
这个默认 Topic 进行发送。 - 在 Broker 如果不存在该 topic 的路由信息,则尝试查找默认topic的路由信息,如果能找到,则根据请求中的队列数,创建对应的 topic 路由信息。注意:此时还存在于 Broker 的内存中
- Broker 每隔 30 秒会上报自己的 Topic 路由信息到 NameServer,然后 Producer 会更新 NameServer 的 Topic 路由信息到本地缓存当中。
- Producer 更新 Topic 路由信息到本地缓存中后,后续再往之前未创建的这个 Topic 发送消息的时候,就会直接从本地缓存中获取这个 Topic 路由信息了。
注意:RocketMQ中的路由消息是持久化在Broker中的,NameServer中的路由
信息来自Broker的心跳包并存储在内存中。
4、消息发送
从上面的 demo 中可以发现,当我们需要消息发送的时候必须构建出来 RocketMQ 里面定义的消息对象,下面我们首先来看一下这个消息里面有哪些内容:
Message.java
public class Message implements Serializable
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
...
topic
:topic 就是消息主题,也就是消息要发往哪个地方。- flag:消息Flag,RocketMQ暂不做处理
- body:消息体,也就是我们需要传递的业务数据
- transactionId:事务消息的事务ID
- properties:消息扩展参数,RocketMQ 默认使用了:TAGS – 消息TAG,用于消息过滤;KEYS – Message索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息;WAIT – 消息发送时是否等消息存储完成后再返回;delayTimeLevel – 消息延迟级别,用于定时消息或消息重试。
现在需要发送的消息已经构建好了,下面我们来看一下生产的消息发送过程。
4.1 消息校验
在消息发送之前首先会验证一下消息信息。校验逻辑如下:
- 消息主题校验:消息主题名称不能为空;消息主题格式名称、长度是否合法;不允许往系统主题中发送消息
- 消息体校验:消息体不能为空;消息体长度是否合法;
4.2 查询 Topic 路由
在消息真正发送之前,我们需要获取 Topic 路由信息,这样我们才知道我们往哪个具体的 Broker 发送消息。
DefaultMQProducerImpl#tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic)
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok())
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok())
return topicPublishInfo;
else
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
可以参考一下第三小节的 Topic 的路由机制,RocketMQ 的查询 Topic 的路由优先级如下:
- 本地缓存的 Topic 名称的路由信息
- 根据传入的 Topic 名称去 NameServer 查询 Topic 路由信息
- 根据默认的 Topic 名称去 NameServer 查询 Topic 路由信息
- 如果没有找到 Topic 的路由信息,则抛出没有找到 Topic 异常
4.3 选择消息队列
根据路由信息选择消息队列,返回的消息队列按照broker序号进行排序。消息发送端采用重试机制,由 retryTimesWhenSendFailed
指定同步方式重试次数,异步重试机制在收到消息发送结果执行回调之前进行重试,由retryTimesWhenSendAsyncFailed
指定异常重试次数。接下来就是循环执行,选择消息队列、发送消息,发送成功则返回,收到异常则重试。
4.4 发送消息到 Broker
发送消息到 Broker 的逻辑在 DefaultMQProducerImpl#sendKernelImpl
这个方法当中:
DefaultMQProducerImpl#sendKernelImpl
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
long beginStartTime = System.currentTimeMillis();
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr)
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
SendMessageContext context = null;
if (brokerAddr != null)
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch))
MessageClientIDSetter.setUniqID(msg);
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace())
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg))
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg))
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
if (hasCheckForbiddenHook())
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
if (this.hasSendMessageHook())
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true"))
context.setMsgType(MessageType.Trans_Msg_Half);
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null)
context.setMsgType(MessageType.Delay_Msg);
this.executeSendMessageHookBefore(context);
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null)
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null)
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
SendResult sendResult = null;
switch (communicationMode)
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
if (msgBodyCompressed)
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
if (topicWithNamespace)
if (!messageCloned)
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync)
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout")Azure SB 队列将消息发送到禁用的死信队列
RocketMQ源码(23)—DefaultMQPushConsumer消费者重试消息和死信消息源码
RabbitMq高级特性之死信队列 通俗易懂 超详细 内含案例