RocketMQ源码系列 消息store存储设计核心原理解析
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码系列 消息store存储设计核心原理解析相关的知识,希望对你有一定的参考价值。
目录
rocketmq版本: 4.8.0
一、初识消息存储
rocketmq是一个可以存储亿级消息的队列,能够将海量的消息持久化到硬盘上,如此优秀地存储能力一定程度上决定了rocketmq的接收消息和处理消息的并发能力, store模块是rocketmq存储消息和管理消息的模块,也是rocketmq的核心模块之一, 稍后会紧跟着源码的部分一起探究rocketmq是怎么设计存储, 怎么store消息的!
1、 Message最终存储在磁盘上哪个位置?
rocketmq中的消息是通过文件来存储的,store相关的信息存储的路径在user.home\\\\store目录下, windows环境和linux环境是一样的, 都在user.home 目录下,
为什么会在此路径? 在代码里有配置,等下看源码就知道了。
2、store目录解析
1) commitLog 目录
消息存储目录, 文件名当前文件所在的全局偏移量,全局偏移量不一定从00000000000000000000开始。
大小是一样的! 其实默认是1GB, 满了会自动会生成下一个文件。
2) config 目录
存放并备份了topic相关信息消费者过滤器、消费位移等信息。
config目录包含了消费位点、延迟消费位点、topic相关信息的配置,打开比较熟悉的topic.json看一下:
consumerFilter: 主题消息过滤信息。
consumerOffset: 集群消息模式消息消费进度。
delayOffset: 延时消息队列拉取进度。
subscriptionGroup: 消息消费组的配置消息。
topic: topic 配置信息。
打开一个topic.json看下;
里面包含了很多topic相关的信息,比如batchTest, 包含了topic自身的一些属性, 是否是顺序的tipoc、读写队列数量(开源版默认队列数量有16个)、 权限(perm = 6 表示可读可写,可读为2,可写为4,相加就是6, 在PermName类有配置)。
3) consumequeue 目录
消息消费队列目录
consumequeue目录包含了我们自定义的topic名称,进入到topic里
0,1,2,3分别代表的是队列ID, 这也解释了topic的结构里是队列
真正存储消息的文件是commitlog里的文件,为了加快检索速度, 无后缀的20位长度命名的文件00000000000000000000是CommitLog关于消息消费的索引文件,如果观察仔细,文件的命名也是有规律的,每个文件的名字大小偏差为30万*20, 也就是6000000, 该值为全局偏移量。文件里的消息条目记录消息消费位点、消息大小和tag的hashcode等, 该文件里的条目存储格式如下:
单个00000000000000000000文件包含30万个条目,每个条目的长度为20个字节,因此单个文件的长度为30万* 20 约等于5860KB。
4) index目录
index下目录下的文件是专门为消息订阅设计的索引文件。
通过索引加快检索速度。
5) abort文件
abort文件如果存在说明Broker非正常关闭,因为在broker正常退出的情况下,该abort文件会被删除掉, abort文件在启动时创建,正常退出之前删除。
6) checkPoint文件
保存consumeQueue、index、commitlog最后一次刷盘时间。
3、store 工程目录解析
store
├─ AllocateMappedFileService.java
├─ AppendMessageCallback.java
├─ AppendMessageResult.java
├─ AppendMessageStatus.java
├─ CommitLog.java
├─ CommitLogDispatcher.java
├─ ConsumeQueue.java
├─ ConsumeQueueExt.java
├─ DefaultMessageFilter.java
├─ DefaultMessageStore.java // 消息存储的核心类,也是消息存储的入口
├─ DispatchRequest.java
├─ GetMessageResult.java
├─ GetMessageStatus.java
├─ MappedFile.java
├─ MappedFileQueue.java
├─ MessageArrivingListener.java
├─ MessageExtBrokerInner.java
├─ MessageFilter.java
├─ MessageStore.java
├─ PutMessageLock.java
├─ PutMessageReentrantLock.java
├─ PutMessageResult.java
├─ PutMessageSpinLock.java
├─ PutMessageStatus.java
├─ QueryMessageResult.java
├─ ReferenceResource.java
├─ RunningFlags.java
├─ SelectMappedBufferResult.java
├─ StoreCheckpoint.java
├─ StoreStatsService.java
├─ StoreUtil.java
├─ TransientStorePool.java
├─ config // 消息配置
│ ├─ BrokerRole.java
│ ├─ FlushDiskType.java
│ ├─ MessageStoreConfig.java // 消息核心配置,含消息存储路径、commitLog 路径、commitFile大小(默认1G)、每个队列大小(默认300000*20, 约等于5860KB)
│ └─ StorePathConfigHelper.java
// 阿里 dleder中间件
├─ dledger
│ └─ DLedgerCommitLog.java // 消息提交管理
├─ ha
│ ├─ HAConnection.java
│ ├─ HAService.java
│ └─ WaitNotifyObject.java
├─ index // 索引管理
│ ├─ IndexFile.java // 索引文件管理
│ ├─ IndexHeader.java // 索引头实体类
│ ├─ IndexService.java // 索引服务
│ └─ QueryOffsetResult.java
├─ schedule // 定时任务,管理延迟消费,实际上是延迟存储
│ ├─ DelayOffsetSerializeWrapper.java
│ └─ ScheduleMessageService.java
├─ stats // broker 状态管理
│ ├─ BrokerStats.java
│ └─ BrokerStatsManager.java
└─ util
└─ LibC.java
config : config目录是消息存储的配置目录, 核心配置类MessageStoreConfig,主要配置消息存储路径storePathRootDir、commitLog 路径 storePathCommitLog、commitLogFile大小(默认大小为1G)、topic队列里的映射文件每个队列里子文件的大小(默认300000*20, 约等于5860KB)
dledger: 阿里的开源中间件, 用来管理rocketmq的集群,可以平滑地实现rocketmq的主从节点切换。
ha: HAConnection管理与broker集群的连接。HAService 管理与与roker集群的服务。
schedule: ScheduleMessageService 提供了定时任务自动刷新offsetTable, 也就是消费位点。
上述是store的文件布局,我们接下来去看一下存储文件与内存的文件是怎么去做映射的。
二、内存映射
Rocketmq是通过内存映射的方式来提高I/O访问速度,通过映射来能够快速地定位到在哪个存储文件里,如果你足够细心,会从上文中找到一些答案, 其中commitLog、consumeQueue还有indexFile, rocketmq把他们每个文件设置为固定长度, 当文件装满后,会重新生成一个文件, 文件名就为该文件第一条消息的全局偏移量, 有了这些数据后,如果有了全局偏移量offset,就能计算出offset到哪个文件了。
学习内存映射,一定要掌握3个类,分别是MappedFile、MappedFileQueue、TransientStorePool。
1) MappedFile
学习MappedFile之前,先看一个操作系统的页存储映射图, 通过映射将操作系统的页与内存的区块进行关联, 只要知道页号就能找到对应的内存块。
rocketmq的store借鉴了此设计,通过内存映射来查找内存块,MappedFile是rocketmq的内存映射文件类,通过MappedByteBuffer 来存储页与内存的映射。
MappedFile是rocketmq的内存映射的具体实现,下面看一下它有哪些属性 。
// 每页数据的大小为4KB, 类似于操作系统页的概念,数据存储到页里。
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 映射的总的虚拟内存
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// 映射文件数量
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// 写指针、提交指针
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
protected final AtomicInteger committedPosition = new AtomicInteger(0);
private final AtomicInteger flushedPosition = new AtomicInteger(0);
// 每个mappedFile的大小
protected int fileSize;
// 读写通道,使用RandomAccessFile 来进行读写。
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
// 消息先放到缓存区里,当缓冲区里有数据时, 会将缓存区里的数据写入到FileChannel里,最终写入到磁盘文件里。
protected ByteBuffer writeBuffer = null;
// 短暂的缓冲区
protected TransientStorePool transientStorePool = null;
private String fileName;
// 从
private long fileFromOffset;
private File file;
// 内存文件的映射区域存放到MappedByteBuffer里。
private MappedByteBuffer mappedByteBuffer;
private volatile long storeTimestamp = 0;
private boolean firstCreateInQueue = false;
ByteBudder是一个数据与磁盘之间的缓冲池,数据先会进入到缓冲池里,当缓冲池里有数据时,缓冲区里的数据就会写入到FileChannel里, ByteBuffer在读写文件时起到缓冲的作用, 由TransientStorePool提供。
this.writeBuffer = transientStorePool.borrowBuffer();
MappedByteBuffer 相当是一个内存缓存池,数据先暂存到MappedByteBuffer里,然后通过commit线程定时任务将MappedByteBuffer里的数据映射到指定的内存块里。rocketmq提供该机制的原因是提供一种内存锁定机制。
fileFromOffSet是 当前文件开始的全局偏移量,依据文件名得到。
this.fileFromOffset = Long.parseLong(this.file.getName());
使用AtomicInteger 记录文件读写的指针位置,每个操作都能够使position的值发生改变,
2) MappedFileQueue
MappedFileQueue是用来管理MappedFile对象,存放MappedFile的容器是CopyOnWriteArrayList,CopyOnWriteArrayList是线程安全的,读没有加锁,写用了ReentrantLock, 写都是通过复制出来一个新数组来实现的。
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
CopyOnWriteArrayList 的缺点是写非常多的时候,会出现资源消耗很大的问题,因为每次写都要复制一遍数组生成一个新数组在堆里,堆的内存也会占用较大。
rocketmq用CopyOnWriteArrayList来存放MappedFile应该主要是为了线程安全考虑。
3) TransientStorePool
TransientStorePool是堆外的内存缓冲池, 可以分配ByteBuffer,通过一个双向队列来管理ByteBuffer, 如果MessageStoreConfig的isTransientStorePoolEnable为true,那么就可以使用堆外内存缓存。
private final Deque<ByteBuffer> availableBuffers;
在init()方法里初始化,该方法比较重,因为要分配内存,这是一个非常耗时的工作。
public void init() {
for (int i = 0; i < poolSize; i++) {
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
final long address = ((DirectBuffer) byteBuffer).address();
Pointer pointer = new Pointer(address);
LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
availableBuffers.offer(byteBuffer);
}
}
在DefaultMessageStore里初始化该队列。
三、MessageStoreConfig解析
MessageStoreConfig类是store模块的核心配置类,主要包含消息存储的根路径、每个CommitLog文件大小1GB、ConsumeQueue文件大小 5860KB等,详细信息如下:
//The root directory in which the log data is kept
@ImportantField
// store文件的根路径
private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
//The directory in which the commitlog is kept
@ImportantField
// commitLog的根路径
private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store"
+ File.separator + "commitlog";
// CommitLog file size,default is 1G
// commitLog的文件大小
private int mappedFileSizeCommitLog = 1024 * 1024 * 1024;
// ConsumeQueue file size,default is 30W
// 队列下映射的文件每个大小为5860K
private int mappedFileSizeConsumeQueue = 300000 * ConsumeQueue.CQ_STORE_UNIT_SIZE;
// enable consume queue ext
private boolean enableConsumeQueueExt = false;
// ConsumeQueue extend file size, 48M
private int mappedFileSizeConsumeQueueExt = 48 * 1024 * 1024;
// Bit count of filter bit map.
// this will be set by pipe of calculate filter bit map.
private int bitMapLengthConsumeQueueExt = 64;
// CommitLog flush interval
// flush data to disk
// 将数据持久化到磁盘上的时间间隔
@ImportantField
private int flushIntervalCommitLog = 500;
// Only used if TransientStorePool enabled
// flush data to FileChannel
@ImportantField
private int commitIntervalCommitLog = 200;
/**
* introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.<br/>
* By default it is set to false indicating using spin lock when putting message.
*/
private boolean useReentrantLockWhenPutMessage = false;
// Whether schedule flush,default is real-time
@ImportantField
private boolean flushCommitLogTimed = false;
// ConsumeQueue flush interval
private int flushIntervalConsumeQueue = 1000;
// Resource reclaim interval
private int cleanResourceInterval = 10000;
// CommitLog removal interval
private int deleteCommitLogFilesInterval = 100;
// ConsumeQueue removal interval
private int deleteConsumeQueueFilesInterval = 100;
private int destroyMapedFileIntervalForcibly = 1000 * 120;
private int redeleteHangedFileInterval = 1000 * 120;
// When to delete,default is at 4 am
@ImportantField
private String deleteWhen = "04";
private int diskMaxUsedSpaceRatio = 75;
// The number of hours to keep a log file before deleting it (in hours)
@ImportantField
// 日志文件失效时间
private int fileReservedTime = 72;
// Flow control for ConsumeQueue
private int putMsgIndexHightWater = 600000;
// The maximum size of message,default is 4M
private int maxMessageSize = 1024 * 1024 * 4;
// Whether check the CRC32 of the records consumed.
// This ensures no on-the-wire or on-disk corruption to the messages occurred.
// This check adds some overhead,so it may be disabled in cases seeking extreme performance.
private boolean checkCRCOnRecover = true;
// How many pages are to be flushed when flush CommitLog
private int flushCommitLogLeastPages = 4;
// How many pages are to be committed when commit data to file
private int commitCommitLogLeastPages = 4;
// Flush page size when the disk in warming state
private int flushLeastPagesWhenWarmMapedFile = 1024 / 4 * 16;
// How many pages are to be flushed when flush ConsumeQueue
private int flushConsumeQueueLeastPages = 2;
private int flushCommitLogThoroughInterval = 1000 * 10;
private int commitCommitLogThoroughInterval = 200;
private int flushConsumeQueueThoroughInterval = 1000 * 60;
@ImportantField
private int maxTransferBytesOnMessageInMemory = 1024 * 256;
@ImportantField
private int maxTransferCountOnMessageInMemory = 32;
@ImportantField
private int maxTransferBytesOnMessageInDisk = 1024 * 64;
@ImportantField
private int maxTransferCountOnMessageInDisk = 8;
@ImportantField
private int accessMessageInMemoryMaxRatio = 40;
@ImportantField
private boolean messageIndexEnable = true;
private int maxHashSlotNum = 5000000;
private int maxIndexNum = 5000000 * 4;
private int maxMsgsNumBatch = 64;
@ImportantField
private boolean messageIndexSafe = false;
private int haListenPort = 10912;
private int haSendHeartbeatInterval = 1000 * 5;
private int haHousekeepingInterval = 1000 * 20;
private int haTransferBatchSize = 1024 * 32;
@ImportantField
private String haMasterAddress = null;
private int haSlaveFallbehindMax = 1024 * 1024 * 256;
@ImportantField
private BrokerRole brokerRole = BrokerRole.ASYNC_MASTER;
@ImportantField
private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;
private int syncFlushTimeout = 1000 * 5;
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
private long flushDelayOffsetInterval = 1000 * 10;
@ImportantField
private boolean cleanFileForciblyEnable = true;
private boolean warmMapedFileEnable = false;
private boolean offsetCheckInSlave = false;
private boolean debugLockEnable = false;
private boolean duplicationEnable = false;
private boolean diskFallRecorded = true;
private long osPageCacheBusyTimeOutMills = 1000;
private int defaultQueryMaxNum = 32;
@ImportantField
private boolean transientStorePoolEnable = false;
private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false;
private boolean enableDLegerCommitLog = false;
private String dLegerGroup;
private String dLegerPeers;
private String dLegerSelfId;
private String preferredLeaderId;
private boolean isEnableBatchPush = false;
storePathRootDir: store文件的根路径,默认设置在user.home\\store目录下。
storePathCommitLog: 消息存储的根路径,默认在user.home\\store\\commitlog目录下。
mappedFileSizeCommitLog: 消息文件大小,默认1GB。
mappedFileSizeConsumeQueue: 映射消息文件大小,每个文件包含30万个条目,每个条目大小20字节,约5860KB 。
transientStorePoolEnable: 堆外内存缓存池开关。
putMsgIndexHightWater: ConsumeQueue的水位线,默认为超过60万为高水位线。
maxMessageSize: 消息的最大长度,默认4M。
maxHashSlotNum: 哈希槽数量,默认大小50万个。
messageDelayLevel: 消息延迟级别,默认18个。"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
fileReservedTime: 失效映射文件的默认清除时间为72个小时后。
以上是关于RocketMQ源码系列 消息store存储设计核心原理解析的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字
RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字
#私藏项目实操分享#Alibaba中间件技术系列「RocketMQ技术专题」RocketMQ消息发送的全部流程和落盘原理分析