RocketMQ 自己的整理和理解
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 自己的整理和理解相关的知识,希望对你有一定的参考价值。
不能有中文路径!
不能有中文路径!
不能有中文路径!
关系
两个接口
interface MQProducer //生产者接口
{
实现该接口的只有一个 默认的 DefaultMQProducer
DefaultMQProducer 实现 MQProducer 接口的时候 还继承了 ClientConfig类 (客户端配置类)
可以配置如 sendMsgTimeout 超时时间 producerGroup 消息最大多少 超过多少压缩等等
关键方法 :
send(Message) 发送一个消息到MQ
这个方法其实是调用 DefaultMQProducer构造方法 创建的 defaultMQProducerImpl 类对象的 send(..)方法
defaultMQProducerImpl 类 才是真正发送消息的核心类
defaultMQProducerImpl.send 方法 --》 sendDefaultImpl方法
sendDefaultImpl --》 tryToFindTopicPublishInfo 来检测映射 队列是否存在 是否正常
{
final Segment<K,V>[] segments; 这个 键值
不存在 不正常 :
创建一个 TopicPublishInfo 到 segments 映射文件 同时 将 Topic (队列) 信息 更新到NameServer中
}
sendDefaultImpl --》 通过设置是失败重复次数 和 超时时间 来从新发送消息
详细 for (; times < 失败重复次数 && (结束时间 - 开始时间) < 配置的超时时间; times++)
sendDefaultImpl --》sendKernelImpl 装载 配置 信息
--》sendKernelImpl --》this.mQClientFactory.getMQClientAPIImpl().sendMessage()
MQClientInstance mQClientFactory 对象 是在 DefaultMQProducer start启动方式时候初始化的
MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer,rpcHook);
--》 --》sendMessage
{
MQClientInstance --》 MQClientAPIImpl mQClientAPIImpl
MQClientAPIImpl.sendMessage() --> sendMessageSync
switch (communicationMode) 同步 异步 单向 处理 默认是 同步
}
后续返回 SendResult sendResult 改类型描述当时 发送MQ 的最终状态
Message 消息的 Topic 不能为空
producer.shutdown(); 关闭 shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
}
发送消息负载的问题
{
看源码 是通过循环从 namesrv 获取的到 Topic 路由消息 (也就是有几个broker 每个 broker 有几个队列)
然后 记录当前发送过的 +1
备注 : 队列数量 小于 消费者数量 多余的消费者将不起做用
}
关于顺序消息发送 的问题
{
环境: 1 下单 2 付款 3 收货 3个状态 , 普通模式下 发送到队列中的 是轮询队列 将3个消息分别发送到多个队列中。
很可能会照成出现 先消费 2 在消费 1 流程错乱的情况 当然可以业务层处理 但是业务层处理比较麻烦
顺序消费的发送的原理 :
我们自己指定 消息将要添加的队列
SendResult sendResult = producer.send(msg,
new MessageQueueSelector()
{
@Override
public MessageQueue select(List<MessageQueue> mqs,
Message msg, Object arg)
{
Integer id = (Integer) arg;
int index = id % mqs.size(); // 通过取于来 讲 同一个订单号 访入同一个队列中
// 前提是 队列数量没有变动
return mqs.get(index);
}
}, “10001”); // orderID “10001”是传递给回调方法的 自定义数据
List<MessageQueue> mqs 就是从namesrv 获取的所有队列
}
备注
// 订单 Id
String orderId = "20034568923546";
message.setKeys(orderId);
// Keys
每个消息在业务局面的唯一标识码 通过 topic,key 来查询返条消息内容,以及消息被谁消费
查询的时候 非常重要
消费者
interface MQConsumer
{
// 回溯消费
{
mqadmin resetOffsetByTime 命令
改方式 是通过消费的日志来恢复的 但是只能通过 消费的组来恢复 恢复消息后 也只能用改组来从新消费
-s : 时间戳的问题 可以是 毫秒 或者是从什么时候开始
}
//拉取模式
interface MQPullConsumer:
{
}
// 接收模式
长轮询模式 一次获取一批 消息
记录
批量和单条 内部实现 还是获取了所有的 可以获取到的队列消息 放入集合中 判断集合大小是否 大于设置的单次消费数量
小于
直接将其 消息集合 放入执行回调方法中
大于
使用的是For 循环 来单条处理
interface MQPushConsumer:
{
class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer
DefaultMQPushConsumer 包含很多可以配置的信息 详情见文档
其中最主要的 有几个
messageModel 消息模型 支持以下两种 1、集群消费 2、广播消费
messageListener 消息监听器
consumeThreadMin 消费线程池数量 默认10
consumeThreadMax 消费线程池数量 默认20
重要的是 消费线程池 ! 这就说明 我发布一个 消费应用 消费逻辑就可以 N 个 处理! 不用自己搞了有没有!!
安默认的来算 20个消费逻辑 可以配置 而且还 可以横向 增加 消费应用 只要保持是一个组就可以了
难怪会在文档中 特意话一个 性能图啊!!
应用通常吐 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法
MessageListenerOrderly 这个是有序的
MessageListenerConcurrently 这个是无序的
关键方法 DefaultMQPushConsumer registerMessageListener(new implements MessageListenerConcurrently)
{
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener; // 给自己复制一个 消费逻辑类对象 方法后续查询 替换修改等
关键方法
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
// 将消费逻辑类告诉 调用者类
}
}
关键方法 start
DefaultMQPushConsumer.start() --> DefaultMQPushConsumerImpl.start()
{
this.serviceState 来记录设置当前程序运行状态 来做多态
checkConfig() 检查配置 初始化赋值
copySubscription() 拷贝订阅者信息 赋值 消费逻辑类
// 有就获取 没有就创建一个
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer,this.rpcHook);
接着初始化一系列信息
// 加载消费进度
this.offsetStore.load();
// 该方法有两个实现
一个是本地 this.readLocalOffset() 获取数据
{
//获取文件字符串
String content = MixAll.file2String(this.storePath);
OffsetSerializeWrapper offsetSerializeWrapper =OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
可以看出 淘宝使用的是JSON
}
if (this.getMessageListenerInner() instanceof MessageListenerOrderly)
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently)
判断 消费逻辑类 实现那个接口 创建对应的 ConsumeMessageOrderlyService 对象
ConsumeMessageConcurrentlyService 该实现为空
本地
ConsumeMessageOrderlyService.start()
{
创建并执行一个周期性的动作成为了第一个在给定的初始延迟之后,随后用给定的时期,这是执行后将开始initialDelay然后initialDelay +,然后initialDelay + 2 *时期,等等。如果任何执行任务遇到异常,后续执行的镇压。否则,只会终止的任务通过取消或终止执行器。如果执行这个任务花费的时间比其期,然后后续执行可能会迟到,但不会同时执行。
//就是一个定时器
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.RebalanceLockInterval, TimeUnit.MILLISECONDS);
scheduleAtFixedRate 应该是一个线程池管理
不用去关心 scheduleAtFixedRate 方法 看 ConsumeMessageOrderlyService.this.lockMQPeriodically()
{
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll()
是
RebalanceImpl.lockAll() // 将读取到的消息上锁
}
}
// 最关键的服务启动了
// 正在的启动了
mQClientFactory.start();
{
synchronized (this){
//Start request-response channel 启动请求-响应通道
this.mQClientAPIImpl.start();
//Start various schedule tasks 开始各种安排任务 启动定时任务 其中就包含 获取到MQ消息消费的 回调方法
this.startScheduledTask();
//Start pull service 开始拉取服务
this.pullMessageService.start();
//Start rebalance service 启动负载均衡 // 该服务非常重要
this.rebalanceService.start();
//Start push service 开始推动服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
}
}
}
}
指定 group
订阅 topic
注册消息监听处理器,当消息到来时消费消息
消费端 Start
复制订阅关系
初始化 rebalance 变量
构建 offsetStore 消费进度存储对象
启动消费消息服务
向 mqClientFactory 注册本消费者
启动 client 端远程通信
* 加载消费进度 Loand()
* 启动定时任务
定时获取 nameserver 地址
定时从 nameserver 获取 topic 路由信息
定时清理下线的 borker
定时向所有 broker 发送心跳信息, (包括订阅关系)
* 定时持久化 Consumer 消费进度(广播存储到本地,集群存储到 Broker)
PS: 这里也是是个关键 持久化消费进度 是用来记录当前 组的消费情况 可以做到 回溯消费 和宕机等情况下 启动后接着上次执行消费
统计信息打点
动态调整消费线程池
启动拉消息服务 PullMessageService
启动消费端负载均衡服务 RebalanceService
从 namesrv 更新 topic 路由信息
向所有 broker 发送心跳信息, (包括订阅关系)
唤醒 Rebalance 服务线程
}
// 有些懒得看了 直接看别人 得了
消费端负载均衡
{
这个也是个重点
消费端会通过 RebalanceService 线程,10 秒钟做一次基于 topic 下的所有队列负载
消费端 遍历自己所有的 Topic 获取 Topic 下所有的 队列 (一个Topic 包含对个队列 默认是 4 个 有别于其他MQ )
从 broker 获取当前 组(group)的所有消费端( 有心跳的)
获取队列集合Set<MessageQueue> mqSet
现在队列分配策略实例 AllocateMessageQueueStrategy 执行分配算法
{
1:平均分配算法 :
其实是类似于分页的算法
将所有 queue 排好序类似于记录
将所有消费端 consumer 排好序,相当于页数
然后获取当前 consumer 所在页面应该分配到的 queue
2:按照配置来分配队列 :
消费服务启动的时候 就指定好了要消费的 是哪个队列
3:按照机房来配置队列 :
Consumer 启动的时候会指定在哪些机房的消息 (应该是指定 broker)
获取指定机房的 queue
然后在执行如 1)平均算法
}
根据分配队列的结果更新 ProccessQueueTable<MessageQueue, ProcessQueue>
{
比对 mqSet 将多余的队列删除, 当 broker 当机或者添加,会导致分配到 mqSet 变化,
添加新增队列, 比对 mqSet,给新增的 messagequeue 构建长轮询对象 PullRequest 对象,会从 broker 获取消费的进度 构建这个队列的 ProcessQueue
将 PullRequest 对象派发到长轮询拉消息服务(单线程异步拉取)
注:ProcessQueue 正在被消费的队列,
(1) 长轮询拉取到消息都会先存储到 ProcessQueue 的 TreeMap<Long, MessageExt> 集合中,消费调后会删除掉,用来控制 consumer 消息堆积, TreeMap<Long, MessageExt> key 是消息在此 ConsumeQueue 队列中索引
(2) 对于顺序消息消费 处理
locked 属性:当 consumer 端向 broker 申请锁队列成功后设置 true, 只有被锁定 的 processqueue 才能被执行消费 rollback: 将消费在 msgTreeMapTemp 中的消息,放回 msgTreeMap 重新消费
commit: 将临时表 msgTreeMapTemp 数据清空,代表消费完成,放回最大偏移
值
(3) 这里是个 TreeMap,对 key 即消息的 offset 进行排序,这个样可以使得消息进 行顺序消费
}
}
长轮询
{
Rocketmq 的消息是由 consumer 端主动到 broker拉取的, consumer 向 broker 发送拉消息
请求, PullMessageService 服务通过一个线程将阻塞队列 LinkedBlockingQueue<PullRequest>
中的 PullRequest 到 broker 拉取消息
DefaultMQPushConsumerImpl 的 pullMessage(pullRequest)方法执行向 broker 拉消息动作
1. 获取 ProcessQueue 判读是否 drop 的, drop 为 true 返回
2. 给 ProcessQueue 设置拉消息时间戳
3. 流量控制,正在消费队列中消息(未被消费的)超过阀值,稍后在执行拉消息
4. 流量控制,正在消费队列中消息的跨度超过阀值(默认 2000) ,稍后在消费
5. 根据 topic 获取订阅关系
6. 构建拉消息回调对象 PullBack, 从 broker 拉取消息(异步拉取)返回结果是回调
7. 从内存中获取 commitOffsetValue //TODO 这个值跟 pullRequest.getNextOffset 区别
8. 构建 sysFlag pull 接口用到的 flag
9. 调底层通信层向 broker 发送拉消息请求
如果 master 压力过大,会建议去 slave 拉取消息
如果是到 broker 拉取消息清楚实时提交标记位,因为 slave 不允许实时提交消费进
度,可以定时提交
//TODO 关于 master 拉消息实时提交指的是什么?
10. 拉到消息后回调 PullCallback
处理 broker 返回结果 pullResult
更新从哪个 broker(master 还是 slave)拉取消息
反序列化消息
消息过滤
消息中放入队列最大最小 offset, 方便应用来感知消息堆积度
将消息加入正在处理队列 ProcessQueue
将消息提交到消费消息服务 ConsumeMessageService
流控处理, 如果 pullInterval 参数大于 0 (拉消息间隔,如果为了降低拉取速度,
可以设置大于 0 的值) , 延迟再执行拉消息, 如果 pullInterval 为 0 立刻在执行拉
消息动作
看图 人家花的不错 很明了
}
push 消息
{
PS:
长轮询向broker拉取消息是批量拉取的, 默认设置批量的值为pullBatchSize = 32, 可配置
消费端 consumer 构建一个消费消息任务 ConsumeRequest 消费一批消息的个数是 可配置的 consumeMessageBatchMaxSize = 1, 默认批量个数为一个
也就是说 每次传递给回调方法的 参数 消息集合 的解释
ConsumeRequest 任务 run 方法执行
判断 proccessQueue 是否被 droped 的, 废弃直接返回,不在消费消息
构建并行消费上下文
给消息设置消费失败时候的 retry topic,当消息发送失败的时候发送到 topic
为%RETRY%groupname 的队列中
调 MessageListenerConcurrently 监听器的 consumeMessage 方法消费消息,返回消
费结果
如果 ProcessQueue 的 droped 为 true,不处理结果,不更新 offset, 但其实这里消
费端是消费了消息的,这种情况感觉有被重复消费的风险
处理消费结果 :
消费成功, 对于批次消费消息, 返回消费成功并不代表所有消息都消费成功,
但是消费消息的时候一旦遇到消费消息失败直接放回,根据 ackIndex 来标记
成功消费到哪里了
消费失败, ackIndex 设置为-1
广播模式发送失败的消息丢弃, 广播模式对于失败重试代价过高,对整个集
群性能会有较大影响,失败重试功能交由应用处理
集群模式, 将消费失败的消息一条条的发送到 broker 的重试队列中去,如果
此时还有发送到重试队列发送失败的消息, 那就在 cosumer 的本地线程定时 5
秒钟以后重试重新消费消息, 在走一次上面的消费流程。
删除正在消费的队列 processQueue 中本次消费的消息,放回消费进度
更新消费进度, 这里的更新只是一个内存 offsetTable 的更新,后面有定时任务定
时更新到 broker 上去
PS:
关于消费成功 和 失败的 问题
在集群模式下 回调方法设置为消费失败 会将当前消费的失败消息 发送到 broker 的容错度列中 等待N次+ 从新消费 。
push 消费-顺序消费消息
顺序消费服务 ConsumeMessageConcurrentlyService 构建的时候
构建一个线程池来接收消费请求 ConsumeRequest
构建一个单线程的本地线程, 用来稍后定时重新消费 ConsumeRequest, 用来执行
定时周期性(一秒)钟锁队列任务
周期性锁队列 lockMQPeriodically
获取正在消费队列列表 ProcessQueueTable 所有 MesssageQueue, 构建根据 broker
归类成 MessageQueue 集合 Map<brokername, Set<MessageQueue>>
遍历 Map<brokername, Set<MessageQueue>>的 brokername, 获取 broker 的 master
机器地址, 将brokerName的Set<MessageQueue>发送到broker请求锁定这些队列。 在broker
端锁定队列,其实就是在 broker 的 queue 中标记一下消费端,表示这个 queue 被某个 client
锁定。 Broker 会返回成功锁定队列的集合, 根据成功锁定的 MessageQueue,设置对应的正
在处理队列 ProccessQueue 的 locked 属性为 true 没有锁定设置为 false
通过长轮询拉取到消息后会提交到消息服务 ConsumeMessageOrderlyService,
ConsumeMessageOrderlyService 的 submitConsumeRequest 方法构建 ConsumeRequest 任务提
交到线程池。ConsumeRequest 是由 ProcessQueue 和 Messagequeue 组成。
ConsumeRequest 任务的 run 方法
判断 proccessQueue 是否被 droped 的, 废弃直接返回,不在消费消息
每个 messagequeue 都会生成一个队列锁来保证在当前 consumer 内,同一个队列串行
消费,
判断 processQueue 的 lock 属性是否为 true, lock 属性是否过期, 如果为 false 或者过期,
放到本地线程稍后锁定在消费。 如果 lock 为 true 且没有过期,开始消费消息
计算任务执行的时间如果大于一分钟且线程数小于队列数情况下,将 processqueue,
messagequeue 重新构建 ConsumeRequest 加到线程池 10ms 后在消费,这样防止个别队列被
饿死
获取客户端的消费批次个数,默认一批次为一条
从 proccessqueue 获取批次消息, processqueue.takeMessags(batchSize) , 从 msgTreeMap
中移除消息放到临时 map 中 msgTreeMapTemp, 这个临时 map 用来回滚消息和 commit 消
息来实现事物消费
调回调接口消费消息,返回状态对象 ConsumeOrderlyStatus
根据消费状态,处理结果
1) 非事物方式,自动提交
消息消息状态为 success: 调用 processQueue.commit 方法
获取 msgTreeMapTemp 的最后一个 key,表示提交的 offset
清空 msgTreeMapTemp 的消息,已经成功消费
2) 事物提交,由用户来控制提交回滚(精卫专用)
更新消费进度, 这里的更新只是一个内存 offsetTable 的更新, 后面有定时任务定时更
新到 broker 上去
}
关闭
{
shutdown
DefaultMQPushConsumerImpl 关闭消费端
关闭消费线程
将分配到的 Set<MessageQueue>的消费进度保存到 broker
利 用
DefaultMQPushConsumerImpl
获 取
ProcessQueueTable<MessageQueue,
ProcessQueue>的 keyset 的 messagequeue 去获取
RemoteBrokerOffsetStore.offsetTable<MessageQueue, AutomicLong>Map 中的消费进
度,
offsetTable 中 的 messagequeue 的 值, 在 update 的时候如果 没有对应 的
Messagequeue 会构建, 但是也会 rebalance 的时候将没有分配到的 messagequeue
删除
rebalance 会将 offsettable 中没有分配到 messagequeue 删除, 但是在从 offsettable
删除之前会将 offset 保存到 broker
Unregiser 客户端
pullMessageService 关闭
scheduledExecutorService 关闭,关闭一些客户端的起的定时任务
mqClientApi 关闭
rebalanceService 关闭
}
额 怎么拷贝一下 就没有格式了啊!!!!!!!
努力或许不会有收获,但是不努力却一定不会有收获!
以上是关于RocketMQ 自己的整理和理解的主要内容,如果未能解决你的问题,请参考以下文章