RocketMQ源码系列 消息store存储设计核心原理解析

Posted Dream_it_possible!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码系列 消息store存储设计核心原理解析相关的知识,希望对你有一定的参考价值。

目录

一、初识消息存储

1、 Message最终存储在磁盘上哪个位置?

2、store目录解析

1)  commitLog 目录

2)  config 目录

3) consumequeue 目录

4)  index目录

5) abort文件

6) checkPoint文件

3、store 工程目录解析

二、内存映射

1)  MappedFile

2)  MappedFileQueue

3)  TransientStorePool

三、MessageStoreConfig解析


rocketmq版本: 4.8.0

一、初识消息存储

        rocketmq是一个可以存储亿级消息的队列,能够将海量的消息持久化到硬盘上,如此优秀地存储能力一定程度上决定了rocketmq的接收消息和处理消息的并发能力, store模块是rocketmq存储消息和管理消息的模块,也是rocketmq的核心模块之一, 稍后会紧跟着源码的部分一起探究rocketmq是怎么设计存储, 怎么store消息的!

1、 Message最终存储在磁盘上哪个位置?

        rocketmq中的消息是通过文件来存储的,store相关的信息存储的路径在user.home\\\\store目录下, windows环境和linux环境是一样的, 都在user.home 目录下,

为什么会在此路径? 在代码里有配置,等下看源码就知道了。

2、store目录解析

1)  commitLog 目录

       消息存储目录, 文件名当前文件所在的全局偏移量,全局偏移量不一定从00000000000000000000开始。

大小是一样的!  其实默认是1GB, 满了会自动会生成下一个文件。

2)  config 目录

      存放并备份了topic相关信息消费者过滤器、消费位移等信息。

config目录包含了消费位点、延迟消费位点、topic相关信息的配置,打开比较熟悉的topic.json看一下:

consumerFilter: 主题消息过滤信息。

consumerOffset: 集群消息模式消息消费进度。

delayOffset:  延时消息队列拉取进度。

subscriptionGroup: 消息消费组的配置消息。

topic:  topic 配置信息。

打开一个topic.json看下; 

里面包含了很多topic相关的信息,比如batchTest, 包含了topic自身的一些属性,  是否是顺序的tipoc、读写队列数量(开源版默认队列数量有16个)、 权限(perm = 6 表示可读可写,可读为2,可写为4,相加就是6, 在PermName类有配置)。

3) consumequeue 目录

     消息消费队列目录

consumequeue目录包含了我们自定义的topic名称,进入到topic里

0,1,2,3分别代表的是队列ID,  这也解释了topic的结构里是队列

真正存储消息的文件是commitlog里的文件,为了加快检索速度, 无后缀的20位长度命名的文件00000000000000000000是CommitLog关于消息消费的索引文件,如果观察仔细,文件的命名也是有规律的,每个文件的名字大小偏差为30万*20, 也就是6000000, 该值为全局偏移量。文件里的消息条目记录消息消费位点、消息大小和tag的hashcode等, 该文件里的条目存储格式如下:

单个00000000000000000000文件包含30万个条目,每个条目的长度为20个字节,因此单个文件的长度为30万* 20 约等于5860KB。

4)  index目录

      index下目录下的文件是专门为消息订阅设计的索引文件。

       通过索引加快检索速度。

5) abort文件

      abort文件如果存在说明Broker非正常关闭,因为在broker正常退出的情况下,该abort文件会被删除掉, abort文件在启动时创建,正常退出之前删除。

6) checkPoint文件

       保存consumeQueue、index、commitlog最后一次刷盘时间。

3、store 工程目录解析

store
├─ AllocateMappedFileService.java
├─ AppendMessageCallback.java
├─ AppendMessageResult.java
├─ AppendMessageStatus.java
├─ CommitLog.java
├─ CommitLogDispatcher.java
├─ ConsumeQueue.java
├─ ConsumeQueueExt.java
├─ DefaultMessageFilter.java
├─ DefaultMessageStore.java   // 消息存储的核心类,也是消息存储的入口
├─ DispatchRequest.java
├─ GetMessageResult.java
├─ GetMessageStatus.java
├─ MappedFile.java
├─ MappedFileQueue.java
├─ MessageArrivingListener.java
├─ MessageExtBrokerInner.java
├─ MessageFilter.java
├─ MessageStore.java
├─ PutMessageLock.java
├─ PutMessageReentrantLock.java
├─ PutMessageResult.java
├─ PutMessageSpinLock.java
├─ PutMessageStatus.java
├─ QueryMessageResult.java
├─ ReferenceResource.java
├─ RunningFlags.java
├─ SelectMappedBufferResult.java
├─ StoreCheckpoint.java
├─ StoreStatsService.java
├─ StoreUtil.java
├─ TransientStorePool.java
├─ config   // 消息配置
│    ├─ BrokerRole.java
│    ├─ FlushDiskType.java
│    ├─ MessageStoreConfig.java // 消息核心配置,含消息存储路径、commitLog 路径、commitFile大小(默认1G)、每个队列大小(默认300000*20, 约等于5860KB)
│    └─ StorePathConfigHelper.java
 // 阿里 dleder中间件 
├─ dledger
│    └─ DLedgerCommitLog.java  // 消息提交管理
├─ ha
│    ├─ HAConnection.java
│    ├─ HAService.java
│    └─ WaitNotifyObject.java
├─ index  //  索引管理
│    ├─ IndexFile.java    // 索引文件管理
│    ├─ IndexHeader.java   // 索引头实体类
│    ├─ IndexService.java   // 索引服务
│    └─ QueryOffsetResult.java
├─ schedule   //   定时任务,管理延迟消费,实际上是延迟存储
│    ├─ DelayOffsetSerializeWrapper.java
│    └─ ScheduleMessageService.java
├─ stats // broker 状态管理
│    ├─ BrokerStats.java
│    └─ BrokerStatsManager.java
└─ util
       └─ LibC.java

config :    config目录是消息存储的配置目录, 核心配置类MessageStoreConfig,主要配置消息存储路径storePathRootDir、commitLog 路径 storePathCommitLog、commitLogFile大小(默认大小为1G)、topic队列里的映射文件每个队列里子文件的大小(默认300000*20, 约等于5860KB)

dledger:   阿里的开源中间件, 用来管理rocketmq的集群,可以平滑地实现rocketmq的主从节点切换。

ha:  HAConnection管理与broker集群的连接。HAService 管理与与roker集群的服务。

schedule:  ScheduleMessageService 提供了定时任务自动刷新offsetTable, 也就是消费位点。

     上述是store的文件布局,我们接下来去看一下存储文件与内存的文件是怎么去做映射的。 

二、内存映射

          Rocketmq是通过内存映射的方式来提高I/O访问速度,通过映射来能够快速地定位到在哪个存储文件里,如果你足够细心,会从上文中找到一些答案, 其中commitLog、consumeQueue还有indexFile, rocketmq把他们每个文件设置为固定长度, 当文件装满后,会重新生成一个文件, 文件名就为该文件第一条消息的全局偏移量, 有了这些数据后,如果有了全局偏移量offset,就能计算出offset到哪个文件了。

         学习内存映射,一定要掌握3个类,分别是MappedFile、MappedFileQueue、TransientStorePool。

1)  MappedFile

        学习MappedFile之前,先看一个操作系统的页存储映射图, 通过映射将操作系统的页与内存的区块进行关联, 只要知道页号就能找到对应的内存块。

         rocketmq的store借鉴了此设计,通过内存映射来查找内存块,MappedFile是rocketmq的内存映射文件类,通过MappedByteBuffer 来存储页与内存的映射。

MappedFile是rocketmq的内存映射的具体实现,下面看一下它有哪些属性 。

// 每页数据的大小为4KB, 类似于操作系统页的概念,数据存储到页里。
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 映射的总的虚拟内存
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// 映射文件数量
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
//  写指针、提交指针
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
protected final AtomicInteger committedPosition = new AtomicInteger(0);
private final AtomicInteger flushedPosition = new AtomicInteger(0);
// 每个mappedFile的大小
protected int fileSize;
//  读写通道,使用RandomAccessFile 来进行读写。
protected FileChannel fileChannel;
/**
 * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
 */
// 消息先放到缓存区里,当缓冲区里有数据时, 会将缓存区里的数据写入到FileChannel里,最终写入到磁盘文件里。
protected ByteBuffer writeBuffer = null;
// 短暂的缓冲区
protected TransientStorePool transientStorePool = null;
private String fileName;
// 从
private long fileFromOffset;
private File file;
// 内存文件的映射区域存放到MappedByteBuffer里。
private MappedByteBuffer mappedByteBuffer;
private volatile long storeTimestamp = 0;
private boolean firstCreateInQueue = false;

        ByteBudder是一个数据与磁盘之间的缓冲池,数据先会进入到缓冲池里,当缓冲池里有数据时,缓冲区里的数据就会写入到FileChannel里, ByteBuffer在读写文件时起到缓冲的作用, 由TransientStorePool提供。

this.writeBuffer = transientStorePool.borrowBuffer();

        MappedByteBuffer 相当是一个内存缓存池,数据先暂存到MappedByteBuffer里,然后通过commit线程定时任务将MappedByteBuffer里的数据映射到指定的内存块里。rocketmq提供该机制的原因是提供一种内存锁定机制。

       fileFromOffSet是 当前文件开始的全局偏移量,依据文件名得到。

  this.fileFromOffset = Long.parseLong(this.file.getName());

       使用AtomicInteger 记录文件读写的指针位置,每个操作都能够使position的值发生改变,

2)  MappedFileQueue

        MappedFileQueue是用来管理MappedFile对象,存放MappedFile的容器是CopyOnWriteArrayList,CopyOnWriteArrayList是线程安全的,读没有加锁,写用了ReentrantLock, 写都是通过复制出来一个新数组来实现的。

   public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

       CopyOnWriteArrayList 的缺点是写非常多的时候,会出现资源消耗很大的问题,因为每次写都要复制一遍数组生成一个新数组在堆里,堆的内存也会占用较大。

       rocketmq用CopyOnWriteArrayList来存放MappedFile应该主要是为了线程安全考虑。

3)  TransientStorePool

       TransientStorePool是堆外的内存缓冲池, 可以分配ByteBuffer,通过一个双向队列来管理ByteBuffer, 如果MessageStoreConfig的isTransientStorePoolEnable为true,那么就可以使用堆外内存缓存。

    private final Deque<ByteBuffer> availableBuffers;

       在init()方法里初始化,该方法比较重,因为要分配内存,这是一个非常耗时的工作。

   public void init() {
        for (int i = 0; i < poolSize; i++) {
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

            availableBuffers.offer(byteBuffer);
        }
    }

       在DefaultMessageStore里初始化该队列。

三、MessageStoreConfig解析

           MessageStoreConfig类是store模块的核心配置类,主要包含消息存储的根路径、每个CommitLog文件大小1GB、ConsumeQueue文件大小 5860KB等,详细信息如下:

//The root directory in which the log data is kept
@ImportantField
// store文件的根路径
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";

//The directory in which the commitlog is kept
@ImportantField
// commitLog的根路径
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
        + File.separator + "commitlog";

// CommitLog file size,default is 1G
// commitLog的文件大小
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
// ConsumeQueue file size,default is 30W
// 队列下映射的文件每个大小为5860K
private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
// enable consume queue ext
private boolean enableConsumeQueueExt = false;
// ConsumeQueue extend file size, 48M
private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024;
// Bit count of filter bit map.
// this will be set by pipe of calculate filter bit map.
private int bitMapLengthConsumeQueueExt = 64;

// CommitLog flush interval
// flush data to disk
// 将数据持久化到磁盘上的时间间隔
@ImportantField
private int flushIntervalCommitLog = 500;

// Only used if TransientStorePool enabled
// flush data to FileChannel
@ImportantField
private int commitIntervalCommitLog = 200;

/**
 * introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.<br/>
 * By default it is set to false indicating using spin lock when putting message.
 */
private boolean useReentrantLockWhenPutMessage = false;

// Whether schedule flush,default is real-time
@ImportantField
private boolean flushCommitLogTimed = false;
// ConsumeQueue flush interval
private int flushIntervalConsumeQueue = 1000;
// Resource reclaim interval
private int cleanResourceInterval = 10000;
// CommitLog removal interval
private int deleteCommitLogFilesInterval = 100;
// ConsumeQueue removal interval
private int deleteConsumeQueueFilesInterval = 100;
private int destroyMapedFileIntervalForcibly = 1000 * 120;
private int redeleteHangedFileInterval = 1000 * 120;
// When to delete,default is at 4 am
@ImportantField
private String deleteWhen = "04";
private int diskMaxUsedSpaceRatio = 75;
// The number of hours to keep a log file before deleting it (in hours)
@ImportantField
// 日志文件失效时间
private int fileReservedTime = 72;
// Flow control for ConsumeQueue
private int putMsgIndexHightWater = 600000;
// The maximum size of message,default is 4M
private int maxMessageSize = 1024 * 1024 * 4;
// Whether check the CRC32 of the records consumed.
// This ensures no on-the-wire or on-disk corruption to the messages occurred.
// This check adds some overhead,so it may be disabled in cases seeking extreme performance.
private boolean checkCRCOnRecover = true;
// How many pages are to be flushed when flush CommitLog
private int flushCommitLogLeastPages = 4;
// How many pages are to be committed when commit data to file
private int commitCommitLogLeastPages = 4;
// Flush page size when the disk in warming state
private int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16;
// How many pages are to be flushed when flush ConsumeQueue
private int flushConsumeQueueLeastPages = 2;
private int flushCommitLogThoroughInterval = 1000 * 10;
private int commitCommitLogThoroughInterval = 200;
private int flushConsumeQueueThoroughInterval = 1000 * 60;
@ImportantField
private int maxTransferBytesOnMessageInMemory = 1024 * 256;
@ImportantField
private int maxTransferCountOnMessageInMemory = 32;
@ImportantField
private int maxTransferBytesOnMessageInDisk = 1024 * 64;
@ImportantField
private int maxTransferCountOnMessageInDisk = 8;
@ImportantField
private int accessMessageInMemoryMaxRatio = 40;
@ImportantField
private boolean messageIndexEnable = true;
private int maxHashSlotNum = 5000000;
private int maxIndexNum = 5000000 * 4;
private int maxMsgsNumBatch = 64;
@ImportantField
private boolean messageIndexSafe = false;
private int haListenPort = 10912;
private int haSendHeartbeatInterval = 1000 * 5;
private int haHousekeepingInterval = 1000 * 20;
private int haTransferBatchSize = 1024 * 32;
@ImportantField
private String haMasterAddress = null;
private int haSlaveFallbehindMax = 1024 * 1024 * 256;
@ImportantField
private BrokerRole brokerRole = BrokerRole.ASYNC_MASTER;
@ImportantField
private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;
private int syncFlushTimeout = 1000 * 5;
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
private long flushDelayOffsetInterval = 1000 * 10;
@ImportantField
private boolean cleanFileForciblyEnable = true;
private boolean warmMapedFileEnable = false;
private boolean offsetCheckInSlave = false;
private boolean debugLockEnable = false;
private boolean duplicationEnable = false;
private boolean diskFallRecorded = true;
private long osPageCacheBusyTimeOutMills = 1000;
private int defaultQueryMaxNum = 32;

@ImportantField
private boolean transientStorePoolEnable = false;
private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false;

private boolean enableDLegerCommitLog = false;
private String dLegerGroup;
private String dLegerPeers;
private String dLegerSelfId;

private String preferredLeaderId;

private boolean isEnableBatchPush = false;

storePathRootDir:  store文件的根路径,默认设置在user.home\\store目录下。

storePathCommitLog:  消息存储的根路径,默认在user.home\\store\\commitlog目录下。

mappedFileSizeCommitLog:  消息文件大小,默认1GB。

mappedFileSizeConsumeQueue:  映射消息文件大小,每个文件包含30万个条目,每个条目大小20字节,约5860KB 。

transientStorePoolEnable:  堆外内存缓存池开关。

putMsgIndexHightWater:  ConsumeQueue的水位线,默认为超过60万为高水位线。

maxMessageSize:  消息的最大长度,默认4M。

maxHashSlotNum:  哈希槽数量,默认大小50万个。

messageDelayLevel:  消息延迟级别,默认18个。"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"

fileReservedTime:  失效映射文件的默认清除时间为72个小时后。

以上是关于RocketMQ源码系列 消息store存储设计核心原理解析的主要内容,如果未能解决你的问题,请参考以下文章

源码分析RocketMQ系列索引

RocketMQ源码解析-Store篇

RocketMQ源码解析-Store篇

RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字

RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字

#私藏项目实操分享#Alibaba中间件技术系列「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析