消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

Posted 芋道源码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收相关的知识,希望对你有一定的参考价值。



  • 1、概述

  • 2、Producer 发送消息

    • DefaultMQProducerImpl#tryToFindTopicPublishInfo()

    • MQFaultStrategy

    • DefaultMQProducerImpl#sendKernelImpl()

    • MQFaultStrategy

    • LatencyFaultTolerance

    • LatencyFaultToleranceImpl

    • FaultItem

    • DefaultMQProducer#send(Message)

    • DefaultMQProducerImpl#sendDefaultImpl()

  • 3、Broker 接收消息

    • AbstractSendMessageProcessor#msgCheck

    • SendMessageProcessor#sendMessage

    • DefaultMessageStore#putMessage

  • 4、某种结尾


1、概述

  1. Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。

  2. Broker 接收消息。(存储消息在《RocketMQ 源码分析 —— Message 存储》解析)

2、Producer 发送消息

消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

DefaultMQProducer#send(Message)

  // .... 省略代码


  • 说明:发送同步消息,DefaultMQProducer#send(Message) 对 DefaultMQProducerImpl#send(Message)进行封装。

DefaultMQProducerImpl#sendDefaultImpl()

 // .... 省略代码


  • 说明 :发送消息。步骤:获取消息路由信息,选择要发送到的消息队列,执行消息发送核心方法,并对发送结果进行封装返回。

  • 第 1 至 7 行:对sendsendDefaultImpl(...)进行封装。

  • 第 20 行 :invokeID仅仅用于打印日志,无实际的业务用途。

  • 第 25 行 :获取 Topic路由信息, 详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo()

  • 第 30 & 34 行 :计算调用发送消息到成功为止的最大次数,并进行循环。同步或异步发送消息会调用多次,默认配置为3次。

  • 第 36 行 :选择消息要发送到的队列,详细解析见:MQFaultStrategy

  • 第 43 行 :调用发送消息核心方法,详细解析见:DefaultMQProducerImpl#sendKernelImpl()

  • 第 46 行 :更新Broker可用性信息。在选择发送到的消息队列时,会参考Broker发送消息的延迟,详细解析见:MQFaultStrategy

  • 第 62 至 68 行:当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。

DefaultMQProducerImpl#tryToFindTopicPublishInfo()

 // .... 省略代码


  • 说明 :获得 Topic发布信息。优先从缓存topicPublishInfoTable,其次从Namesrv中获得。

  • 第 3 行 :从缓存topicPublishInfoTable中获得 Topic发布信息。

  • 第 5 至 9 行 :从 Namesrv 中获得 Topic发布信息。

  • 第 13 至 17 行 :当从 Namesrv 无法获取时,使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。目的是当 Broker 开启自动创建 Topic开关时,Broker 接收到消息后自动创建Topic,详细解析见《RocketMQ 源码分析 —— Topic》。

MQFaultStrategy

MQFaultStrategy

 // .... 省略代码


  • 说明 :Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false

  • 第 30 至 62 行 :容错策略选择消息队列逻辑。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。

  • 第 64 行 :未开启容错策略选择消息队列逻辑。

  • 第 74 至 79 行 :更新延迟容错信息。当 Producer 发送消息时间过长,则逻辑认为N秒内不可用。按照latencyMaxnotAvailableDuration的配置,对应如下:

    Producer发送消息消耗时长 Broker不可用时长
    >= 15000 ms 600 * 1000 ms
    >= 3000 ms 180 * 1000 ms
    >= 2000 ms 120 * 1000 ms
    >= 1000 ms 60 * 1000 ms
    >= 550 ms 30 * 1000 ms
    >= 100 ms 0 ms
    >= 50 ms 0 ms

LatencyFaultTolerance

  // .... 省略代码


  • 说明 :延迟故障容错接口

LatencyFaultToleranceImpl

  // .... 省略代码


  • 说明 :延迟故障容错实现。维护每个对象的信息。

FaultItem

 // .... 省略代码


  • 说明 :对象故障信息。维护对象的名字、延迟、开始可用的时间。

DefaultMQProducerImpl#sendKernelImpl()

 // .... 省略代码


  • 说明 :发送消息核心方法。该方法真正发起网络请求,发送消息给 Broker

  • 第 21 行 :生产消息编号,详细解析见《RocketMQ 源码分析 —— Message 基础》。

  • 第 64 至 121 行 :构建发送消息请求SendMessageRequestHeader

  • 第 107 至 117 行 :执行 MQClientInstance#sendMessage(...) 发起网络请求。

3、Broker 接收消息

SendMessageProcessor#sendMessage

 // .... 省略代码


  • #processRequest() 说明 :处理消息请求。

  • #sendMessage() 说明 :发送消息,并返回发送消息结果。

  • 第 51 至 55 行 :消息配置(Topic配置)校验,详细解析见:AbstractSendMessageProcessor#msgCheck()。

  • 第 60 至 64 行 :消息队列编号小于0时,Broker 可以设置随机选择一个消息队列。

  • 第 72 至 103 行 :对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名, 即加 死信队 (Dead Letter Queue),详细解析见:《RocketMQ 源码分析 —— Topic》。

  • 第 105 至 118 行 :创建MessageExtBrokerInner

  • 第 132 :存储消息,详细解析见:DefaultMessageStore#putMessage()。

  • 第 133 至 183 行 :处理消息发送结果,设置响应结果和提示。

  • 第 186 至 214 行 :发送成功,响应。这里doResponse(ctx, request, response)进行响应,最后return null,原因是:响应给 Producer 可能发生异常,#doResponse(ctx, request, response)捕捉了该异常并输出日志。这样做的话,我们进行排查 Broker 接收消息成功后响应是否存在异常会方便很多。

AbstractSendMessageProcessor#msgCheck

 // .... 省略代码


  • 说明:校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。

  • 第 11 至 18 行 :检查Topic是否可以被发送。目前是 {@link MixAll.DEFAULT_TOPIC} 不被允许发送。

  • 第 20 至 51 行 :当找不到Topic配置,则进行创建。当然,创建会存在不成功的情况,例如说:defaultTopic 的Topic配置不存在,又或者是 存在但是不允许继承,详细解析见《RocketMQ 源码分析 —— Topic》。

DefaultMessageStore#putMessage

  // .... 省略代码
  • 说明:存储消息封装,最终存储需要 CommitLog 实现。

  • 第 7 至 27 行 :校验 Broker 是否可以写入。

  • 第 29 至 39 行 :消息格式与大小校验。

  • 第 47 行 :调用 CommitLong 进行存储,详细逻辑见:《RocketMQ 源码分析 —— Message 存储》

4、某种结尾

感谢阅读、收藏、点赞本文的工程师同学。

阅读源码是件令自己很愉悦的事情,编写源码解析是让自己脑细胞死伤无数的过程,痛并快乐着。

如果有内容写的存在错误,或是不清晰的地方,见笑了,

以上是关于消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收的主要内容,如果未能解决你的问题,请参考以下文章

消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)

消息中间件搬迁

从源码探究消息队列的设计与实现丨极客时间

深入剖析 RocketMQ 源码 - 负载均衡机制

分布式技术专题分布式消息队列-RocketMQ延迟消息实现原理和源码分析