RocketMQ 自己的整理和理解

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 自己的整理和理解相关的知识,希望对你有一定的参考价值。

 

 

不能有中文路径!

 

不能有中文路径!

  

不能有中文路径!    

 

 

关系

 

 两个接口 

 

interface MQProducer  //生产者接口 

   

    {

   实现该接口的只有一个 默认的 DefaultMQProducer 

   

   DefaultMQProducer 实现 MQProducer 接口的时候 还继承了 ClientConfig类 (客户端配置类)

       可以配置如  sendMsgTimeout 超时时间  producerGroup  消息最大多少 超过多少压缩等等

   

   

   

   关键方法 :

        send(Message) 发送一个消息到MQ

 

   这个方法其实是调用 DefaultMQProducer构造方法 创建的  defaultMQProducerImpl 类对象的 send(..)方法 

  

  defaultMQProducerImpl 类 才是真正发送消息的核心类 

  

  defaultMQProducerImpl.send 方法 --》 sendDefaultImpl方法

 

      sendDefaultImpl --》  tryToFindTopicPublishInfo 来检测映射 队列是否存在 是否正常

  {

   final Segment<K,V>[] segments;  这个 键值

 

   不存在 不正常 :

     创建一个 TopicPublishInfo 到 segments 映射文件 同时 将 Topic (队列) 信息 更新到NameServer

  }  

 

  sendDefaultImpl --》   通过设置是失败重复次数  和  超时时间 来从新发送消息  

   

             详细  for (; times < 失败重复次数 && (结束时间 开始时间) < 配置的超时时间; times++) 

 

  sendDefaultImpl --sendKernelImpl  装载 配置 信息 

 

                  --sendKernelImpl --this.mQClientFactory.getMQClientAPIImpl().sendMessage()

   

       MQClientInstance mQClientFactory  对象  是在  DefaultMQProducer start启动方式时候初始化的

       

   MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer,rpcHook);

   

                   

      --  --sendMessage

{

MQClientInstance  --》  MQClientAPIImpl mQClientAPIImpl 

   

 MQClientAPIImpl.sendMessage()  --> sendMessageSync

  switch (communicationMode)  同步 异步  单向 处理  默认是 同步

        }

        

 

后续返回  SendResult sendResult  改类型描述当时 发送MQ 的最终状态

 

  

  Message 消息的 Topic 不能为空  

    

   

   producer.shutdown(); 关闭 shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己

   

}

 

 

 

 发送消息负载的问题  

 {

 

    看源码  是通过循环从 namesrv 获取的到 Topic 路由消息 (也就是有几个broker 每个 broker 有几个队列)

然后  记录当前发送过的 +1

 

 

    备注 : 队列数量 小于 消费者数量  多余的消费者将不起做用

 }

 

 

 

 关于顺序消息发送 的问题

 {

     环境: 下单 付款 收货 3个状态 ,  普通模式下 发送到队列中的 是轮询队列 将3个消息分别发送到多个队列中。

 

 很可能会照成出现 先消费 在消费 流程错乱的情况  当然可以业务层处理 但是业务层处理比较麻烦

 

 

 顺序消费的发送的原理 :

  

    我们自己指定 消息将要添加的队列 

 

        SendResult sendResult = producer.send(msg,

new MessageQueueSelector()

{

@Override

public MessageQueue select(List<MessageQueue> mqs,

Message msg, Object arg)

{

Integer id = (Integer) arg;

int index = id % mqs.size(); // 通过取于来 讲 同一个订单号 访入同一个队列中  

// 前提是 队列数量没有变动

return mqs.get(index);

 

}

}, 10001); // orderID 10001”是传递给回调方法的 自定义数据 

 

 

List<MessageQueue> mqs 就是从namesrv 获取的所有队列

 }

  

 

 

备注   

        // 订单 Id 

String orderId = "20034568923546"; 

message.setKeys(orderId); 

// Keys

        每个消息在业务局面的唯一标识码   通过 topickey 来查询返条消息内容,以及消息被谁消费

查询的时候 非常重要

 

 

 

消费者

interface MQConsumer   

{

 

    // 回溯消费

{

    mqadmin resetOffsetByTime 命令  

改方式 是通过消费的日志来恢复的  但是只能通过 消费的组来恢复  恢复消息后 也只能用改组来从新消费 

 

-s : 时间戳的问题  可以是 毫秒 或者是从什么时候开始

 

}

 

 

 

//拉取模式

interface MQPullConsumer:

{

 

 

 

}

 

 

 

 

 

 

// 接收模式

 

长轮询模式  一次获取一批 消息 

 

 

记录  

     批量和单条 内部实现   还是获取了所有的 可以获取到的队列消息 放入集合中 判断集合大小是否 大于设置的单次消费数量

 

 小于

    直接将其 消息集合 放入执行回调方法中

 大于

                使用的是For 循环 来单条处理

 

 

    

 

 

 

 

interface MQPushConsumer:

{

class DefaultMQPushConsumer  extends ClientConfig implements MQPushConsumer

 

DefaultMQPushConsumer 包含很多可以配置的信息   详情见文档 

其中最主要的 有几个

      messageModel  消息模型   支持以下两种  1、集群消费 2、广播消费

messageListener  消息监听器

consumeThreadMin 消费线程池数量 默认10

consumeThreadMax 消费线程池数量 默认20

 

重要的是  消费线程池 !  这就说明  我发布一个 消费应用 消费逻辑就可以 个 处理! 不用自己搞了有没有!!

    安默认的来算  20个消费逻辑  可以配置  而且还 可以横向 增加 消费应用  只要保持是一个组就可以了

 

    难怪会在文档中 特意话一个 性能图啊!!

 

 

 

 

应用通常吐 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法

 

MessageListenerOrderly 这个是有序的

MessageListenerConcurrently 这个是无序的

 

关键方法 DefaultMQPushConsumer registerMessageListener(new implements MessageListenerConcurrently)

{

public void registerMessageListener(MessageListenerConcurrently messageListener) {

this.messageListener = messageListener; // 给自己复制一个 消费逻辑类对象  方法后续查询 替换修改等

 

    关键方法

       this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);

// 将消费逻辑类告诉 调用者类 

}

 

关键方法 start  

 

DefaultMQPushConsumer.start()  -->  DefaultMQPushConsumerImpl.start()

{

 this.serviceState 来记录设置当前程序运行状态  来做多态 

 

 checkConfig() 检查配置 初始化赋值 

 

 copySubscription() 拷贝订阅者信息 赋值 消费逻辑类 

 

 

 // 有就获取 没有就创建一个

 this.mQClientFactory =  MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);

 

 

 

 接着初始化一系列信息

 

 // 加载消费进度

 this.offsetStore.load();

 // 该方法有两个实现   

 

 一个是本地 this.readLocalOffset() 获取数据 

 {

 //获取文件字符串

     String content = MixAll.file2String(this.storePath);

 OffsetSerializeWrapper offsetSerializeWrapper =OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);

 

可以看出 淘宝使用的是JSON

 

 }

 

 

 

 

 if (this.getMessageListenerInner() instanceof MessageListenerOrderly)

 else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently)

 判断 消费逻辑类 实现那个接口 创建对应的 ConsumeMessageOrderlyService 对象

    

    ConsumeMessageConcurrentlyService  该实现为空

 

 

本地 

ConsumeMessageOrderlyService.start()

{

 

 

 

 创建并执行一个周期性的动作成为了第一个在给定的初始延迟之后,随后用给定的时期,这是执行后将开始initialDelay然后initialDelay +,然后initialDelay + 2 *时期,等等。如果任何执行任务遇到异常,后续执行的镇压。否则,只会终止的任务通过取消或终止执行器。如果执行这个任务花费的时间比其期,然后后续执行可能会迟到,但不会同时执行。

 //就是一个定时器

 

 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

ConsumeMessageOrderlyService.this.lockMQPeriodically();

}

}, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);

 

scheduleAtFixedRate 应该是一个线程池管理  

 

不用去关心 scheduleAtFixedRate 方法  看  ConsumeMessageOrderlyService.this.lockMQPeriodically()

{

 

this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll()

  RebalanceImpl.lockAll() //  将读取到的消息上锁 

}

 

 

}

 

 

// 最关键的服务启动了

// 正在的启动了

mQClientFactory.start(); 

{

 synchronized (this){

 

//Start request-response channel  启动请求-响应通道

this.mQClientAPIImpl.start();

//Start various schedule tasks    开始各种安排任务  启动定时任务   其中就包含 获取到MQ消息消费的 回调方法

this.startScheduledTask();

//Start pull service              开始拉取服务

this.pullMessageService.start();

//Start rebalance service         启动负载均衡  //  该服务非常重要

this.rebalanceService.start();

//Start push service              开始推动服务

this.defaultMQProducer.getDefaultMQProducerImpl().start(false); 

 } 

}

 

 

}

 

指定 group 

订阅 topic 

注册消息监听处理器,当消息到来时消费消息 

消费端 Start 

 

复制订阅关系 

 

初始化 rebalance 变量 

 

构建 offsetStore 消费进度存储对象 

 

启动消费消息服务 

 

向 mqClientFactory 注册本消费者 

 

启动 client 端远程通信 

 

加载消费进度 Loand() 

 

启动定时任务  

 

定时获取 nameserver 地址  

 

定时从 nameserver 获取 topic 路由信息  

 

定时清理下线的 borker  

 

定时向所有 broker 发送心跳信息, (包括订阅关系)  

 

定时持久化 Consumer 消费进度(广播存储到本地,集群存储到 Broker)  

PS:  这里也是是个关键   持久化消费进度 是用来记录当前 组的消费情况 可以做到 回溯消费  和宕机等情况下 启动后接着上次执行消费 

 

统计信息打点  

 

动态调整消费线程池 

 

启动拉消息服务 PullMessageService 

 

启动消费端负载均衡服务 RebalanceService 

 

从 namesrv 更新 topic 路由信息 

 

向所有 broker 发送心跳信息, (包括订阅关系) 

 

唤醒 Rebalance 服务线程 

 

 

}

 

 

// 有些懒得看了 直接看别人 得了

 

 消费端负载均衡 

{

   这个也是个重点 

   

   消费端会通过 RebalanceService 线程,10 秒钟做一次基于 topic 下的所有队列负载 

   

   消费端 遍历自己所有的 Topic 获取 Topic 下所有的 队列  (一个Topic 包含对个队列  默认是 个 有别于其他MQ 

   

   从 broker 获取当前 组(group)的所有消费端( 有心跳的) 

   获取队列集合Set<MessageQueue> mqSet

   现在队列分配策略实例  AllocateMessageQueueStrategy 执行分配算法

   {

      1:平均分配算法 :

        其实是类似于分页的算法 

将所有 queue 排好序类似于记录 

将所有消费端 consumer 排好序,相当于页数 

然后获取当前 consumer 所在页面应该分配到的 queue 

          2:按照配置来分配队列 :

       消费服务启动的时候 就指定好了要消费的 是哪个队列

  3:按照机房来配置队列 :

Consumer 启动的时候会指定在哪些机房的消息  (应该是指定 broker

获取指定机房的 queue 

然后在执行如 1)平均算法 

   }

 

    根据分配队列的结果更新 ProccessQueueTable<MessageQueue, ProcessQueue> 

    {

    比对 mqSet 将多余的队列删除, 当 broker 当机或者添加,会导致分配到 mqSet 变化, 

 

添加新增队列, 比对 mqSet,给新增的 messagequeue 构建长轮询对象 PullRequest 对象,会从 broker 获取消费的进度 构建这个队列的 ProcessQueue 

将 PullRequest 对象派发到长轮询拉消息服务(单线程异步拉取) 

 

 

注:ProcessQueue 正在被消费的队列,  

 

(1) 长轮询拉取到消息都会先存储到 ProcessQueue 的 TreeMap<Long, MessageExt> 集合中,消费调后会删除掉,用来控制 consumer 消息堆积,  TreeMap<Long, MessageExt> key 是消息在此 ConsumeQueue 队列中索引 

 

(2) 对于顺序消息消费 处理  

 

locked 属性:当 consumer 端向 broker 申请锁队列成功后设置 true,  只有被锁定 的 processqueue 才能被执行消费  rollback: 将消费在 msgTreeMapTemp 中的消息,放回 msgTreeMap 重新消费 

 

commit: 将临时表 msgTreeMapTemp 数据清空,代表消费完成,放回最大偏移

值 

 

(3) 这里是个 TreeMap,对 key 即消息的 offset 进行排序,这个样可以使得消息进 行顺序消费

 

 

 

}

 

 

 

}

 

 

 

 

长轮询 

{

Rocketmq 的消息是由 consumer 端主动到 broker拉取的, consumer 向 broker 发送拉消息

请求, PullMessageService 服务通过一个线程将阻塞队列 LinkedBlockingQueue<PullRequest>

中的 PullRequest 到 broker 拉取消息 

 

DefaultMQPushConsumerImpl 的 pullMessage(pullRequest)方法执行向 broker 拉消息动作 

1. 获取 ProcessQueue 判读是否 drop 的, drop 为 true 返回 

2. 给 ProcessQueue 设置拉消息时间戳 

3. 流量控制,正在消费队列中消息(未被消费的)超过阀值,稍后在执行拉消息 

4. 流量控制,正在消费队列中消息的跨度超过阀值(默认 2000) ,稍后在消费 

5. 根据 topic 获取订阅关系 

6. 构建拉消息回调对象 PullBack, 从 broker 拉取消息(异步拉取)返回结果是回调 

7. 从内存中获取 commitOffsetValue  //TODO 这个值跟 pullRequest.getNextOffset 区别 

8. 构建 sysFlag   pull 接口用到的 flag 

9. 调底层通信层向 broker 发送拉消息请求 

如果 master 压力过大,会建议去 slave 拉取消息 

如果是到 broker 拉取消息清楚实时提交标记位,因为 slave 不允许实时提交消费进

度,可以定时提交   

//TODO 关于 master 拉消息实时提交指的是什么? 

10. 拉到消息后回调 PullCallback 

处理 broker 返回结果 pullResult 

  

更新从哪个 brokermaster 还是 slave)拉取消息 

  

反序列化消息 

  

消息过滤 

  

消息中放入队列最大最小 offset, 方便应用来感知消息堆积度 

将消息加入正在处理队列 ProcessQueue 

将消息提交到消费消息服务 ConsumeMessageService 

流控处理, 如果 pullInterval 参数大于 (拉消息间隔,如果为了降低拉取速度,

可以设置大于 的值) , 延迟再执行拉消息,  如果 pullInterval 为 立刻在执行拉

消息动作 

 

 

        看图 人家花的不错 很明了

}

   

push 消息

{

 

  PS:

        长轮询向broker拉取消息是批量拉取的,  默认设置批量的值为pullBatchSize = 32, 可配置 

  

  消费端 consumer 构建一个消费消息任务 ConsumeRequest 消费一批消息的个数是 可配置的 consumeMessageBatchMaxSize = 1, 默认批量个数为一个 

      也就是说 每次传递给回调方法的 参数 消息集合 的解释

  

  ConsumeRequest 任务 run 方法执行 

 

 

判断 proccessQueue 是否被 droped 的, 废弃直接返回,不在消费消息  

 

构建并行消费上下文  

 

给消息设置消费失败时候的 retry topic,当消息发送失败的时候发送到 topic

%RETRY%groupname 的队列中 

 

 

调 MessageListenerConcurrently 监听器的 consumeMessage 方法消费消息,返回消

费结果 

  

如果 ProcessQueue 的 droped 为 true,不处理结果,不更新 offset, 但其实这里消

费端是消费了消息的,这种情况感觉有被重复消费的风险 

  

处理消费结果  :

 

消费成功,  对于批次消费消息, 返回消费成功并不代表所有消息都消费成功,

但是消费消息的时候一旦遇到消费消息失败直接放回,根据 ackIndex 来标记

成功消费到哪里了  

 

 

消费失败, ackIndex 设置为-1 

广播模式发送失败的消息丢弃, 广播模式对于失败重试代价过高,对整个集

群性能会有较大影响,失败重试功能交由应用处理 

集群模式, 将消费失败的消息一条条的发送到 broker 的重试队列中去,如果

此时还有发送到重试队列发送失败的消息, 那就在 cosumer 的本地线程定时 5

秒钟以后重试重新消费消息, 在走一次上面的消费流程。 

 

 

删除正在消费的队列 processQueue 中本次消费的消息,放回消费进度 

 

 

更新消费进度, 这里的更新只是一个内存 offsetTable 的更新,后面有定时任务定

时更新到 broker 上去  

  

  

 

  

  PS

        关于消费成功 和 失败的  问题  

 

    在集群模式下   回调方法设置为消费失败  会将当前消费的失败消息 发送到 broker 的容错度列中  等待N从新消费 。

  

  

  push 消费-顺序消费消息 

 

顺序消费服务 ConsumeMessageConcurrentlyService 构建的时候  

 

构建一个线程池来接收消费请求 ConsumeRequest  

 

构建一个单线程的本地线程用来稍后定时重新消费 ConsumeRequest, 用来执行

定时周期性(一秒)钟锁队列任务  

 

周期性锁队列 lockMQPeriodically  

 

获取正在消费队列列表 ProcessQueueTable 所有 MesssageQueue,  构建根据 broker

归类成 MessageQueue 集合 Map<brokername, Set<MessageQueue>>  

 

遍历 Map<brokername, Set<MessageQueue>>的 brokername, 获取 broker 的 master

机器地址, 将brokerNameSet<MessageQueue>发送到broker请求锁定这些队列。  在broker

端锁定队列,其实就是在 broker 的 queue 中标记一下消费端,表示这个 queue 被某个 client

锁定。 Broker 会返回成功锁定队列的集合, 根据成功锁定的 MessageQueue,设置对应的正

在处理队列 ProccessQueue 的 locked 属性为 true 没有锁定设置为 false  

 

通过长轮询拉取到消息后会提交到消息服务 ConsumeMessageOrderlyService, 

ConsumeMessageOrderlyService 的 submitConsumeRequest 方法构建 ConsumeRequest 任务提

交到线程池。ConsumeRequest 是由 ProcessQueue 和 Messagequeue 组成。 

 

ConsumeRequest 任务的 run 方法 

 

判断 proccessQueue 是否被 droped 的, 废弃直接返回,不在消费消息 

 

每个 messagequeue 都会生成一个队列锁来保证在当前 consumer 内,同一个队列串行

消费,  

 

判断 processQueue 的 lock 属性是否为 true, lock 属性是否过期, 如果为 false 或者过期,

放到本地线程稍后锁定在消费。 如果 lock 为 true 且没有过期,开始消费消息 

 

计算任务执行的时间如果大于一分钟且线程数小于队列数情况下,将 processqueue, 

messagequeue 重新构建 ConsumeRequest 加到线程池 10ms 后在消费,这样防止个别队列被

饿死 

 

获取客户端的消费批次个数,默认一批次为一条 

 

从 proccessqueue 获取批次消息,  processqueue.takeMessags(batchSize) , 从 msgTreeMap

中移除消息放到临时 map 中 msgTreeMapTemp, 这个临时 map 用来回滚消息和 commit 

息来实现事物消费 

 

调回调接口消费消息,返回状态对象 ConsumeOrderlyStatus 

 

根据消费状态,处理结果 

1) 非事物方式,自动提交 

消息消息状态为 success: 调用 processQueue.commit 方法  

 

获取 msgTreeMapTemp 的最后一个 key,表示提交的 offset  

 

清空 msgTreeMapTemp 的消息,已经成功消费 

2) 事物提交,由用户来控制提交回滚(精卫专用) 

 更新消费进度,  这里的更新只是一个内存 offsetTable 的更新, 后面有定时任务定时更

新到 broker 上去 

  

  

}

   

   

关闭

{

 

 shutdown 

 

DefaultMQPushConsumerImpl  关闭消费端 

 

关闭消费线程 

 

将分配到的 Set<MessageQueue>的消费进度保存到 broker 

利 用

DefaultMQPushConsumerImpl

获 取

ProcessQueueTable<MessageQueue, 

ProcessQueue>的 keyset 的 messagequeue 去获取 

RemoteBrokerOffsetStore.offsetTable<MessageQueue, AutomicLong>Map 中的消费进

度, 

offsetTable 中 的 messagequeue 的 值, 在 update 的时候如果 没有对应 的

Messagequeue 会构建, 但是也会 rebalance 的时候将没有分配到的 messagequeue

删除 

rebalance 会将 offsettable 中没有分配到 messagequeue 删除,  但是在从 offsettable

删除之前会将 offset 保存到 broker 

 

Unregiser 客户端 

 

pullMessageService 关闭 

 

scheduledExecutorService 关闭,关闭一些客户端的起的定时任务 

 

mqClientApi 关闭 

 

rebalanceService 关闭 

 

 

}

 

 

 

  额 怎么拷贝一下 就没有格式了啊!!!!!!!  

   

努力或许不会有收获,但是不努力却一定不会有收获!

 

   

   

      

 

以上是关于RocketMQ 自己的整理和理解的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ - 如何用死信队列解决消费者异常

常用python日期日志获取内容循环的代码片段

小程序各种功能代码片段整理---持续更新

RocketMQ概念整理

IOS开发-OC学习-常用功能代码片段整理

深入理解RocketMQ---实战(控制台搭建)