聊聊rocketmq的RemotingTooMuchRequestException

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊rocketmq的RemotingTooMuchRequestException相关的知识,希望对你有一定的参考价值。

参考技术A 本文主要研究一下rocketmq的RemotingTooMuchRequestException

rocketmq-remoting-4.6.0-sources.jar!/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

深入源码聊聊RocketMQ的刷盘机制

大家好,我是Leo。

今天聊一下RocketMQ的三种刷盘机制。

  • 同步刷盘
  • 异步刷盘(RocketMQ默认)
  • 异步刷盘+缓冲区

出自微信公众号【欢少的成长之路】

本章概括

深入源码聊聊RocketMQ的刷盘机制_RocketMQ

同步刷盘

整个同步刷盘策略由 ​​FlushCommitLogService​​​ 与 ​​GroupCommitService​​ 实现。

​FlushCommitLogService​​​ 是 ​​GroupCommitService​​ 刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。

同步刷盘时,只有消息被真正持久化到磁盘才会响应 ACK,可靠性非常高,但是性能会受到较大影响,适用于金融业务。(可参考图一)

由下列参数决定刷盘策略

FlushDiskType:SYNC_FLUSH(同步刷盘)

深入源码聊聊RocketMQ的刷盘机制_RocketMQ_02

图一同步刷盘流程图

Broker 接收到消息后,会将消息交给 CommitLog 负责存储。CommitLog 先定位到最新的 MappedFile,然后将消息按照固定格式追加到其中。(可参考图二)

深入源码聊聊RocketMQ的刷盘机制_过期删除机制_03

图二CommitLog构造函数加载流程

CommitLog

是一个构造函数

深入源码聊聊RocketMQ的刷盘机制_过期删除机制_04

GroupCommitRequest

代表一个刷盘的请求

深入源码聊聊RocketMQ的刷盘机制_过期删除机制_05

图三GroupCommitRequest函数

putRequest

通过加锁对每一个写请求都加到 ​​requestWrite​​ 集合

深入源码聊聊RocketMQ的刷盘机制_过期删除机制_06

图四putRequest函数

swapRequests

每次提交完刷盘之后,需要对读写请求数据交换。

刷盘请求为啥还要分读写两个列表呢?这是用来做读写分离用的,Producer 发送消息的请求量是非常大的,GroupCommitService 的刷盘操作是同步的,刷盘期间仍然会有大量的刷盘请求被提交进来,拆分成两个读写列表,请求提交到写列表,刷盘时处理读列表,刷盘结束交换列表,循环往复,两者可以同时进行。

深入源码聊聊RocketMQ的刷盘机制_过期删除机制_07

图五swapRequests函数

当有同步请求被提交进来,线程就会被唤醒,然后执行​​doCommit​​​方法,刷盘的核心是调用了 MappedFileQueue 的​​flush​​​方法。flush 方法需要传​​flushLeastPages​​参数,它代表刷盘的最小页数,对于同步刷盘来说,不允许消息丢失,只要写入数据就要刷盘,所以页数为 0。

doCommit

深入源码聊聊RocketMQ的刷盘机制_RocketMQ_08

图六doCommit函数

flush

刷盘函数。flush 方法会根据刷盘的偏移量定位到对应的 MappedFile,然后调用​​flush​​开始刷盘,最后更新刷盘偏移量。

深入源码聊聊RocketMQ的刷盘机制_Broker删除机制_09

图七flush函数

isAbleToFlush

计算是否需要刷盘。只有新写入的数据超过指定页才刷盘,避免频繁无意义的刷盘。

刷盘最核心的方法自然是 MappedFile 的​​flush​​​方法了,它会先根据​​flushLeastPages​​计算是否需要刷盘。例如最小刷盘页为 4 时,最小刷盘数据就是 16KB,如果写入的数据不足 16KB 就会跳过刷盘

深入源码聊聊RocketMQ的刷盘机制_过期删除机制_10

图八isAbleToFlush函数

run

GroupCommitService子实现类的入口函数

深入源码聊聊RocketMQ的刷盘机制_Broker删除机制_11

图九run函数

总结

由CommitLog构造函数结构图可得知

  1. 如果是同步刷盘策略,会初始化​​GroupCommitService​​ 子实现类。最终继承的是Runnable接口,会先启动run线程。
  2. run线程主要做的一件事情就是不断监测当前服务是否停止,只要不停止就会一直执行,也就是每10毫秒会执行​​doCommit​​ 函数。
  3. doCommit函数主要处理的是,只要读请求不为空就一直处理,首先会判断文件刷新到的offset是否大于等于刷盘offset。并且考虑到一条消息可能存在两个文件中,因此最多可能存在两次刷盘。
  4. 成立之后会调用​​flush​​​ 函数,因为同步刷盘的规则是来一条就刷一条,所以是​​flush(0)​
  5. 刷新完毕后,唤醒用户线程,(通知其他线程,继续工作了)
  6. 收尾工作(刷新消息物理落盘时间,交互读写请求)
  1. 如果文件刷盘的偏移量 < 请求的下一个偏移量,则说明还没有刷新完,还需要继续刷新

异步刷盘

RocketMQ默认是采用异步刷盘的策略,因为异步刷盘既兼顾了性能,也兼顾了可靠性。

异步刷盘时,消息写入 PageCache 就会响应 ACK,然后由后台线程异步将 PageCache 里的内容持久化到磁盘,降低了读写延迟,提高了性能和吞吐量。服务宕机消息不丢失,机器断电少量消息丢失。

由下列参数决定刷盘策略

FlushDiskType:ASYNC_FLUSH(异步刷盘)

整个异步刷盘策略由 ​​FlushCommitLogService​​​ 与 ​​FlushRealTimeService​​ 实现。

​FlushCommitLogService​​ 是CommitLog 刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。

对于异步刷盘,没有提交刷盘请求一说。它不像同步刷盘,只要有消息写入 CommitLog 就要执行刷盘操作,因为异步刷盘是定时执行的。

异步刷盘时,仅仅需要调用 ​​wakeup​​​ 方法唤醒线程即可。所以,我们重点看它的​​run​​方法。

run

深入源码聊聊RocketMQ的刷盘机制_RocketMQ_12

图十异步刷盘子类run函数

flush

刷盘函数。flush 方法会根据刷盘的偏移量定位到对应的 MappedFile,然后调用​​flush​​开始刷盘,最后更新刷盘偏移量。

深入源码聊聊RocketMQ的刷盘机制_RocketMQ_13

图十一flush函数

isAbleToFlush

计算是否需要刷盘。只有新写入的数据超过指定页才刷盘,避免频繁无意义的刷盘。

刷盘最核心的方法自然是 MappedFile 的​​flush​​​方法了,它会先根据​​flushLeastPages​​计算是否需要刷盘。例如最小刷盘页为 4 时,最小刷盘数据就是 16KB,如果写入的数据不足 16KB 就会跳过刷盘

深入源码聊聊RocketMQ的刷盘机制_Broker删除机制_14

图十二isAbleToFlush函数

waitForRunning

深入源码聊聊RocketMQ的刷盘机制_RocketMQ_15

总结

  1. 如果是异步刷盘策略,会初始化​​FlushRealTimeService​​ 子实现类。最终继承的是Runnable接口,会先启动run线程。
  2. run线程会一直执行,直到服务被迫停止,或者人为干预停止
  3. 首先获取RocketMQ当前配置是否定时刷新日志,如果是,sleep睡眠自定义的时间。如果不是,执行​​waitForRunning​​ 函数等待自定义的时间。
  4. 根据自定义的刷盘页数进行​​flush​​ 刷盘。更新后修改消息落盘时间。
  5. 第五点主要关键,为了保证所有的数据一致性,如果服务停止,那么把剩余的没有刷新到磁盘的消息刷盘,重复次数为10次
  1. 获取是否定时刷新日志的设定
  2. 获取刷新到磁盘的时间间隔
  3. 获取一次刷新到磁盘的最少页数
  4. 获取刷新CommitLog的频率

异步刷盘+缓冲区

异步刷盘+缓冲区,消息先写入直接内存缓冲区,然后由后台线程异步将缓冲区里的内容持久化到磁盘,性能最好。但是最不可靠,服务宕机和机器断电都会丢失消息。

整个异步刷盘策略由 ​​FlushCommitLogService​​​ 与 ​​CommitRealTimeService​​ 实现。

​FlushCommitLogService​​ 是CommitLog 刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。

这是最激进的一种刷盘策略,性能最好,但是也最不可靠。同样没有刷盘请求一说,只需要唤醒线程即可开始工作。

和 FlushRealTimeService 流程差不多,区别仅仅是将​​flush​​​换成​​commit​​了,先将直接内存缓冲区的数据写入 FileChannel,然后唤醒 FlushRealTimeService 对 FIleChannel 做持久化。

run

深入源码聊聊RocketMQ的刷盘机制_RocketMQ_16

commit

深入源码聊聊RocketMQ的刷盘机制_Broker删除机制_17

关键点在 MappedFileQueue 的​​commit​​​方法,它会根据 Commit 偏移量定位到 MappedFile,然后调用它的​​commit0​​方法。它仅仅是将直接内存缓冲区的数据写入 FileChannel,此时数据并没有真正持久化到磁盘。

commit0

深入源码聊聊RocketMQ的刷盘机制_过期删除机制_18

总结

  1. 从Broker发送给CommitLog存储时,就已经创建了​​CommitRealTimeService​​ 实现类,只是用不用的问题,如果用到了就是执行开始处理run函数里的逻辑
  2. 它跟异步刷盘的不同点是,异步刷盘是定时​​flush​​ 。 这里没有进行flush,而且通过先按照文件提交的offset查找数据。然后提交
  3. 这里的提交分两类,一类是没有使用临时存储池。使用的是mappedByteBuffer也就是内存映射的方式。直接写到映射区域中的,那么这个时候就不需要写入的fileChannel了。直接返回写入的位置作为已经提交的位置。
  4. 另一类是临时存储池,使用的堆外内存,那么这个时候需要先把信息提交到fileChannel中

结尾

RocketMQ 针对 CommitLog 文件支持三种持久化策略。

  • 同步刷盘时,每次消息写入都会提交刷盘请求给 GroupCommitService,调用 MappedByteBuffer 的​​force​​方法将内核缓冲区的数据强制刷新到磁盘,成功才响应 ACK。
  • 异步刷盘时,消息写入 PageCache 立即响应 ACK,由 FlushRealTimeService 线程每隔 500ms 对 CommitLog 文件进行一次刷盘操作,流程和上述一样。
  • 异步刷盘且开启缓冲区时,RocketMQ 申请一块直接内存用作数据缓冲区,消息先写入缓冲区,然后由 CommitRealTimeService 线程定时将缓冲区数据写入 FileChannel,再唤醒 FlushRealTimeService 将 FileChannel 缓冲区数据强制刷新到磁盘。

开启缓冲区有什么用?类似在内存层面做了读写分离,写数据走直接内存,读数据走 PageCache,最大程度的消除了 PageCache 锁竞争,避免 PageCache 被交换到 Swap 分区,导致服务响应耗时出现毛刺。

有些不懂的地方或者不对的地方,麻烦各位指出,一定修改优化!

非常欢迎大家加我个人微信有关后端方面的问题我们在群内一起讨论! 我们下期再见!

欢迎『点赞』、『在看』、『转发』三连支持一下,下次见~

以上是关于聊聊rocketmq的RemotingTooMuchRequestException的主要内容,如果未能解决你的问题,请参考以下文章

聊聊rocketmq的RemotingTooMuchRequestException

3万字聊聊什么是RocketMQ

3万字聊聊什么是RocketMQ

聊聊rocketmq的retryTimesWhenSendFailed

聊聊rocketmq的ListenerContainerConfiguration

聊聊rocketmq的sendBatchMessage