聊聊rocketmq的RemotingTooMuchRequestException
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊rocketmq的RemotingTooMuchRequestException相关的知识,希望对你有一定的参考价值。
参考技术A 本文主要研究一下rocketmq的RemotingTooMuchRequestExceptionrocketmq-remoting-4.6.0-sources.jar!/org/apache/rocketmq/remoting/exception/RemotingTooMuchRequestException.java
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
深入源码聊聊RocketMQ的刷盘机制
大家好,我是Leo。
今天聊一下RocketMQ的三种刷盘机制。
- 同步刷盘
- 异步刷盘(RocketMQ默认)
- 异步刷盘+缓冲区
出自微信公众号【欢少的成长之路】
本章概括
同步刷盘
整个同步刷盘策略由 FlushCommitLogService
与 GroupCommitService
实现。
FlushCommitLogService
是 GroupCommitService
刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。
同步刷盘时,只有消息被真正持久化到磁盘才会响应 ACK,可靠性非常高,但是性能会受到较大影响,适用于金融业务。(可参考图一)
由下列参数决定刷盘策略
FlushDiskType:SYNC_FLUSH(同步刷盘)
图一同步刷盘流程图
Broker 接收到消息后,会将消息交给 CommitLog 负责存储。CommitLog 先定位到最新的 MappedFile,然后将消息按照固定格式追加到其中。(可参考图二)
图二CommitLog构造函数加载流程
CommitLog
是一个构造函数
GroupCommitRequest
代表一个刷盘的请求
图三GroupCommitRequest函数
putRequest
通过加锁对每一个写请求都加到 requestWrite
集合
图四putRequest函数
swapRequests
每次提交完刷盘之后,需要对读写请求数据交换。
刷盘请求为啥还要分读写两个列表呢?这是用来做读写分离用的,Producer 发送消息的请求量是非常大的,GroupCommitService 的刷盘操作是同步的,刷盘期间仍然会有大量的刷盘请求被提交进来,拆分成两个读写列表,请求提交到写列表,刷盘时处理读列表,刷盘结束交换列表,循环往复,两者可以同时进行。
图五swapRequests函数
当有同步请求被提交进来,线程就会被唤醒,然后执行doCommit
方法,刷盘的核心是调用了 MappedFileQueue 的flush
方法。flush 方法需要传flushLeastPages
参数,它代表刷盘的最小页数,对于同步刷盘来说,不允许消息丢失,只要写入数据就要刷盘,所以页数为 0。
doCommit
图六doCommit函数
flush
刷盘函数。flush 方法会根据刷盘的偏移量定位到对应的 MappedFile,然后调用flush
开始刷盘,最后更新刷盘偏移量。
图七flush函数
isAbleToFlush
计算是否需要刷盘。只有新写入的数据超过指定页才刷盘,避免频繁无意义的刷盘。
刷盘最核心的方法自然是 MappedFile 的flush
方法了,它会先根据flushLeastPages
计算是否需要刷盘。例如最小刷盘页为 4 时,最小刷盘数据就是 16KB,如果写入的数据不足 16KB 就会跳过刷盘
图八isAbleToFlush函数
run
GroupCommitService子实现类的入口函数
图九run函数
总结
由CommitLog构造函数结构图可得知
- 如果是同步刷盘策略,会初始化
GroupCommitService
子实现类。最终继承的是Runnable接口,会先启动run线程。 - run线程主要做的一件事情就是不断监测当前服务是否停止,只要不停止就会一直执行,也就是每10毫秒会执行
doCommit
函数。 - doCommit函数主要处理的是,只要读请求不为空就一直处理,首先会判断文件刷新到的offset是否大于等于刷盘offset。并且考虑到一条消息可能存在两个文件中,因此最多可能存在两次刷盘。
- 成立之后会调用
flush
函数,因为同步刷盘的规则是来一条就刷一条,所以是flush(0)
- 刷新完毕后,唤醒用户线程,(通知其他线程,继续工作了)
- 收尾工作(刷新消息物理落盘时间,交互读写请求)
- 如果文件刷盘的偏移量 < 请求的下一个偏移量,则说明还没有刷新完,还需要继续刷新
异步刷盘
RocketMQ默认是采用异步刷盘的策略,因为异步刷盘既兼顾了性能,也兼顾了可靠性。
异步刷盘时,消息写入 PageCache 就会响应 ACK,然后由后台线程异步将 PageCache 里的内容持久化到磁盘,降低了读写延迟,提高了性能和吞吐量。服务宕机消息不丢失,机器断电少量消息丢失。
由下列参数决定刷盘策略
FlushDiskType:ASYNC_FLUSH(异步刷盘)
整个异步刷盘策略由 FlushCommitLogService
与 FlushRealTimeService
实现。
FlushCommitLogService
是CommitLog 刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。
对于异步刷盘,没有提交刷盘请求一说。它不像同步刷盘,只要有消息写入 CommitLog 就要执行刷盘操作,因为异步刷盘是定时执行的。
异步刷盘时,仅仅需要调用 wakeup
方法唤醒线程即可。所以,我们重点看它的run
方法。
run
图十异步刷盘子类run函数
flush
刷盘函数。flush 方法会根据刷盘的偏移量定位到对应的 MappedFile,然后调用flush
开始刷盘,最后更新刷盘偏移量。
图十一flush函数
isAbleToFlush
计算是否需要刷盘。只有新写入的数据超过指定页才刷盘,避免频繁无意义的刷盘。
刷盘最核心的方法自然是 MappedFile 的flush
方法了,它会先根据flushLeastPages
计算是否需要刷盘。例如最小刷盘页为 4 时,最小刷盘数据就是 16KB,如果写入的数据不足 16KB 就会跳过刷盘
图十二isAbleToFlush函数
waitForRunning
总结
- 如果是异步刷盘策略,会初始化
FlushRealTimeService
子实现类。最终继承的是Runnable接口,会先启动run线程。 - run线程会一直执行,直到服务被迫停止,或者人为干预停止
- 首先获取RocketMQ当前配置是否定时刷新日志,如果是,sleep睡眠自定义的时间。如果不是,执行
waitForRunning
函数等待自定义的时间。 - 根据自定义的刷盘页数进行
flush
刷盘。更新后修改消息落盘时间。 - 第五点主要关键,为了保证所有的数据一致性,如果服务停止,那么把剩余的没有刷新到磁盘的消息刷盘,重复次数为10次
- 获取是否定时刷新日志的设定
- 获取刷新到磁盘的时间间隔
- 获取一次刷新到磁盘的最少页数
- 获取刷新CommitLog的频率
异步刷盘+缓冲区
异步刷盘+缓冲区,消息先写入直接内存缓冲区,然后由后台线程异步将缓冲区里的内容持久化到磁盘,性能最好。但是最不可靠,服务宕机和机器断电都会丢失消息。
整个异步刷盘策略由 FlushCommitLogService
与 CommitRealTimeService
实现。
FlushCommitLogService
是CommitLog 刷盘服务的父类,它是一个抽象类,本身没有实现,只是一个标记类,刷盘策略由子类负责完成。
这是最激进的一种刷盘策略,性能最好,但是也最不可靠。同样没有刷盘请求一说,只需要唤醒线程即可开始工作。
和 FlushRealTimeService 流程差不多,区别仅仅是将flush
换成commit
了,先将直接内存缓冲区的数据写入 FileChannel,然后唤醒 FlushRealTimeService 对 FIleChannel 做持久化。
run
commit
关键点在 MappedFileQueue 的commit
方法,它会根据 Commit 偏移量定位到 MappedFile,然后调用它的commit0
方法。它仅仅是将直接内存缓冲区的数据写入 FileChannel,此时数据并没有真正持久化到磁盘。
commit0
总结
- 从Broker发送给CommitLog存储时,就已经创建了
CommitRealTimeService
实现类,只是用不用的问题,如果用到了就是执行开始处理run函数里的逻辑 - 它跟异步刷盘的不同点是,异步刷盘是定时
flush
。 这里没有进行flush,而且通过先按照文件提交的offset查找数据。然后提交 - 这里的提交分两类,一类是没有使用临时存储池。使用的是mappedByteBuffer也就是内存映射的方式。直接写到映射区域中的,那么这个时候就不需要写入的fileChannel了。直接返回写入的位置作为已经提交的位置。
- 另一类是临时存储池,使用的堆外内存,那么这个时候需要先把信息提交到fileChannel中
结尾
RocketMQ 针对 CommitLog 文件支持三种持久化策略。
- 同步刷盘时,每次消息写入都会提交刷盘请求给 GroupCommitService,调用 MappedByteBuffer 的
force
方法将内核缓冲区的数据强制刷新到磁盘,成功才响应 ACK。 - 异步刷盘时,消息写入 PageCache 立即响应 ACK,由 FlushRealTimeService 线程每隔 500ms 对 CommitLog 文件进行一次刷盘操作,流程和上述一样。
- 异步刷盘且开启缓冲区时,RocketMQ 申请一块直接内存用作数据缓冲区,消息先写入缓冲区,然后由 CommitRealTimeService 线程定时将缓冲区数据写入 FileChannel,再唤醒 FlushRealTimeService 将 FileChannel 缓冲区数据强制刷新到磁盘。
开启缓冲区有什么用?类似在内存层面做了读写分离,写数据走直接内存,读数据走 PageCache,最大程度的消除了 PageCache 锁竞争,避免 PageCache 被交换到 Swap 分区,导致服务响应耗时出现毛刺。
有些不懂的地方或者不对的地方,麻烦各位指出,一定修改优化!
非常欢迎大家加我个人微信有关后端方面的问题我们在群内一起讨论! 我们下期再见!
欢迎『点赞』、『在看』、『转发』三连支持一下,下次见~
以上是关于聊聊rocketmq的RemotingTooMuchRequestException的主要内容,如果未能解决你的问题,请参考以下文章
聊聊rocketmq的RemotingTooMuchRequestException
聊聊rocketmq的retryTimesWhenSendFailed