深度挖掘RocketMQ底层源码「底层问题分析系列」深度挖掘RocketMQ底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]

Posted 洛神灬殇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深度挖掘RocketMQ底层源码「底层问题分析系列」深度挖掘RocketMQ底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]相关的知识,希望对你有一定的参考价值。

常见问题汇总分析

最近因为项目的并发量以及数据的吞吐处理量越来越高,我们的RocketMQ的处理数据的能力,已经慢慢成为了我们的问题和瓶颈了,频繁会出现OOM的瓶颈问题,当然内存的问题我们可以扩充资源和调整配额就可以解决了,但是又出现了其他可怕的问题,消息会出现丢失的常见,其中有代码层面的失误和bug,也有资源引起的问题,所以我们本期先重点分析说明一下因为资源所引起的问题,主要由以下这几个大场景所导致,看看你有没有遇到过?

接下来,我们就将对以上这几个场景进行分析和介绍它们出现的根本原因以及如何解决!本篇内容主要针对于[REJECTREQUEST]system busy, start flow control for a while进行分析说明。

问题介绍

[REJECTREQUEST] system busy, start flow control for a while

首先,我们限定为源码的未知,这样子才可以方便我们去分析问题,以及探究根本的因素所在,源码方法太长,只好截图了,如下图所示。

可以看到话红框的地方就是报错和打印的地方,大家可以使用自己的IDE进行搜索,这块的代码在org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand。从上图可以看出,抛出上述错误的关键原因是:pair.getObject1().rejectRequest()返回了true所引起的错误。如果想要看清楚这个地方的代码,需要先认识一下NettyRequestProcessor、Pair、RequestCode。

Pair

Pair主要用来封装NettyRequestProcessor与ExecuteService的绑定关系。在RocketMQ的网络处理模型中,会为每一个NettyRequestProcessor与特定的线程池绑定,所有该NettyRequestProcessor的处理逻辑都在该线程池中运行。

上述的pair.getObject1()的方法主要来自于上图的这个Pair的包装类,可以看出来Rocket-Client端将不同的请求定义不同的请求命令CODE,服务端会将客户端请求进行分类,每个命令或每类请求命令定义一个处理器(NettyRequestProcessor),然后每一个NettyRequestProcessor绑定到一个单独的线程池,进行命令处理,不同类型的请求将使用不同的线程池进行处理,实现线程隔离。

  • object1:对应的是NettyRequestProcessor对象
  • object2:对应的是ExecuteService对象

所以可以得出结论getObject1()说明获取的正是NettyRequestProcessor的接口的实现类。而对应的NettyRequestProcessor的实现类也有很多,如下图所示。

NettyRequestProcessor

RocketMQ服务端请求处理器。例如SendMessageProcessor是消息发送处理器、PullMessageProcessor是消息拉取命令处理器、ClientRemotingProcessor等等。

RequestCode

请求CODE,用来区分请求的类型,例如SEND_MESSAGE:表示该请求为消息发送,PULL_MESSAGE:消息拉取请求。当然还有很多其他的编码,可以看看这个类org.apache.rocketmq.common.protocol.RequestCode。如下代码所示进行获取对应的处理器以及线程池。

final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());

这样子相信大家应该知道了整体的相关的结构和执行机制了,我们来看一下为什么会出现这个错误!

SendMessageProcessor#rejectRequest(broker端源码)

public boolean rejectRequest() 
    return this.brokerController.getMessageStore().isOSPageCacheBusy() || 
        this.brokerController.getMessageStore().isTransientStorePoolDeficient();

拒绝请求的条件有两个,只要其中任意一个满足,则返回true。

  • Os PageCache busy:判断操作系统PageCache是否繁忙,如果忙,则返回true。
  • transientStorePool是否不足
isOSPageCacheBusy()的操作系统缓存页繁忙

DefaultMessageStore#isOSPageCacheBusy()主要的目的就是针对于CommitLog的锁定时间进行判定,OSPageCache是否存在超时锁定的问题。

public boolean isOSPageCacheBusy() 
    long begin = this.getCommitLog().getBeginTimeInLock();
    long diff = this.systemClock.now() - begin;
    return diff < 10000000
                && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills(); 

begin、diff两个局部变量

  • begin(开始时间)

将消息写入Commitlog文件所持有锁的开始时间,就是将消息体追加到内存映射文件(DirectByteBuffer)或pageCache(FileChannel#map),在该过程中开始持有锁的时间戳。具体可以查看源码CommitLog#putMessage

  • diff(总长时间)

一次消息追加过程中持有锁的总时长,即往内存映射文件或pageCache追加一条消息所耗时间。如果一次消息追加过程的时间超过了Broker配置文件osPageCacheBusyTimeOutMills,则认为pageCache繁忙,osPageCacheBusyTimeOutMills默认值为1000,表示1s。

isTransientStorePoolDeficient

说到了isTransientStorePoolDeficient就要说一下transientStorePoolEnable机制。

transientStorePoolEnable机制

Java NIO的内存映射机制,提供了将文件系统中的文件映射到内存机制,实现对文件的操作转换对内存地址的操作,极大的提高了IO特性,但这部分内存并不是常驻内存,可以被置换到交换内存(虚拟内存),RocketMQ为了提高消息发送的性能,引入了内存锁定机制,即将最近需要操作的commitlog文件映射到内存,并提供内存锁定功能,确保这些文件始终存在内存中,该机制的控制参数就是transientStorePoolEnable。

DefaultMessageStore#isTransientStorePoolDeficient
public boolean isTransientStorePoolDeficient() 
    return remainTransientStoreBufferNumbs() == 0;


public int remainTransientStoreBufferNumbs() 
    return this.transientStorePool.remainBufferNumbs();

从上面可以看出来,最底层调用TransientStorePool#remainBufferNumbs方法。

public int remainBufferNumbs() 
        if (storeConfig.isTransientStorePoolEnable()) 
            return availableBuffers.size();
        
        return Integer.MAX_VALUE;

如果启用transientStorePoolEnable机制,返回当前可用的ByteBuffer个数,即整个isTransientStorePoolDeficient方法的用意是是否还存在可用的ByteBuffer,如果不存在,即表示pageCache繁忙。

transientStorePoolEnable的MappedFile实现

MappedFile的ByteBuffer writeBuffer、MappedByteBuffer mappedByteBuffer这两个属性的初始化,两个方法是写消息与查消息操作的直接数据结构。

MappedFile底层是使用ByteBuffer以及MappedByteBuffer进行实现的堆外直接内存的写入和读取操作
当RocketMQ开启了transientStorePoolEnable,则使用ByteBuffer.allocateDirect(fileSize),创建(java.nio的内存映射机制)。如果未开启,则为空。

MappedByteBuffer

MappedByteBuffer使用FileChannel#map方法创建,即真正意义上的PageCache。

transientStorePoolEnable的读写分离模式

顺序写模式

RocketMQ在消息写入时采用的顺序写模式,如果writerBuffer不为空,说明开启了transientStorePoolEnable机制,则消息首先写入writerBuffer中,如果其为空,则写入mappedByteBuffer中。

随机读模式

消息读取时,是从mappedByteBuffer中读(pageCache),并且采用的随机读的模式。开启transientStorePoolEnable机制,有点类似于读写分离的效果,先写入writerBuffer中,读却是从mappedByteBuffer中读取。

至此,我们基本分析了对应的出现的大概出现了([REJECTREQUEST]system busy, start flow control for a while)的原因了,因为要想解决OSPageCacheBusy锁的问题。我们尽可能减少OsPageCache锁以及读写冲突的问题,所以就是使用transientStorePoolEnable进行读写分离,从而减少对应的冲突问题。

开启transientStorePoolEnable

我们先来对比以下两种模式的基底原理有什么不同?

mmap+PageCache的模式

读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁(锁超时)、并发冲突的问题。

transientStorePoolEnable的模式

开启该模式的效果,从而使用了DirectByteBuffer(堆外内存)+PageCache的叠加效果,可以实现读写消息分离。

写入消息时候写到的是DirectByteBuffer—堆外内存中,读消息走的是PageCache,对于,DirectByteBuffer是两步刷盘,一步是刷到PageCache,之后才是刷到commitLog中,带来的好处就是,避免了内存操作所造成的OsPageCache锁等待的问题,降低了时延,比如说缺页中断降低,内存加锁。

引发的问题

带来的风险就是OSPageCache会保证即使在宕机的时候,也会将数据写入到磁盘中,但是堆外内存,一旦Broker宕机了,例如OOM,还没来的及写入到磁盘(commitLog)中就会丢失。所以当存在高可用场景下,需要考虑该问题。如下图所示。

最终结论

问题:如何避免([REJECTREQUEST]system busy, start flow control for a while)的问题?

  • 开启对应的transientStorePoolEnable的配置开启堆外内存模式,实现读写分离,从而减少了直接对OSPageCache的读写分离冲突问题。

  • 但是也会带来一个负面问题

本身的OSPageCache会保证即使在宕机的时候,也会将数据写入到磁盘中,但是如果开启了transientStorePoolEnable的堆外内存,一旦Broker宕机了,例如OOM,还没来的及写入到磁盘(commitLog)中就会丢失。

以上是关于深度挖掘RocketMQ底层源码「底层问题分析系列」深度挖掘RocketMQ底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]的主要内容,如果未能解决你的问题,请参考以下文章

深度挖掘RocketMQ底层源码「底层系列」深度挖掘RocketMQ底层导致消息丢失透析(Broker Busy和ToManyRequest)

深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)

深度挖掘 RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-下)

深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)

深度挖掘 RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)

精华推荐 | 深入浅出 RocketMQ原理及实战「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)