RocketMQ源码—Broker启动加载消息文件以及恢复数据源码一万字
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—Broker启动加载消息文件以及恢复数据源码一万字相关的知识,希望对你有一定的参考价值。
详细介绍了Broker启动加载消息文件以及恢复数据源码。
此前我们学习了Broker的启动源码:RocketMQ源码(3)—Broker启动流程源码解析【一万字】,Broker的启动过程中,在DefaultMessageStore实例化之后,将会调用load方法将磁盘中的commitLog、ConsumeQueue、IndexFile文件的数据加载到内存中,还会进行数据恢复操作。
下面看看Broker启动加载消息文件以及恢复数据源码。
文章目录
- 1 isTempFileExist是否存在临时文件
- 2 commitLog#load加载消息日志文件
- 3 loadConsumeQueue加载消费队列文件
- 4 创建StoreCheckpoint检查点对象
- 5 加载index索引文件
- 6 恢复CommitLog和ConsumeQueue数据
load方法主要步骤为:
-
调用isTempFileExist方法判断上次broker是否是正常退出,如果是正常退出不会保留abort文件,异常退出则会。
-
加载CommitLog日志文件。CommitLog文件是真正存储消息内容的地方。
-
加载ConsumeQueue文件。ConsumeQueue文件可以看作是CommitLog的消息偏移量索引文件。
-
加载 index 索引文件。Index文件可以看作是CommitLog的消息时间范围索引文件。
-
恢复ConsumeQueue文件和CommitLog文件,将正确的的数据恢复至内存中,删除错误数据和文件。
-
加载RocketMQ延迟消息的服务,包括延时等级、配置文件等等。
/**
* DefaultMessageStore的方法
* 加载Commit Log、Consume Queue、index file等文件,将数据加载到内存中,并完成数据的恢复
*
* @throws IOException
*/
public boolean load()
boolean result = true;
try
/*
* 1 判断上次broker是否是正常退出,如果是正常退出不会保留abort文件,异常退出则会
*
* Broker在启动时会创建storePathRootDir/abort文件,并且注册钩子函数:在JVM退出时删除abort文件。
* 如果下一次启动时存在abort文件,说明Broker是异常退出的,文件数据可能不一直,需要进行数据修复。
*/
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown ", lastExitOK ? "normally" : "abnormally");
/*
* 2 加载Commit Log日志文件,目录路径取自broker.conf文件中的storePathCommitLog属性
* Commit Log文件是真正存储消息内容的地方,单个文件默认大小1G。
*/
// load Commit Log
result = result && this.commitLog.load();
/*
* 2 加载Consume Queue文件,目录路径为storePathRootDir/consumequeue,文件组织方式为topic/queueId/fileName
* Consume Queue文件可以看作是Commit Log的索引文件,其存储了它所属Topic的消息在Commit Log中的偏移量
* 消费者拉取消息的时候,可以从Consume Queue中快速的根据偏移量定位消息在Commit Log中的位置。
*/
// load Consume Queue
result = result && this.loadConsumeQueue();
if (result)
/*
* 3 加载checkpoint 检查点文件,文件位置是storePathRootDir/checkpoint
* StoreCheckpoint记录着commitLog、ConsumeQueue、Index文件的最后更新时间点,
* 当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复,这决定着文件从哪里开始恢复,甚至是删除文件
*/
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
/*
* 4 加载 index 索引文件,目录路径为storePathRootDir/index
* index 索引文件用于通过时间区间来快速查询消息,底层为HashMap结构,实现为hash索引。后面会专门出文介绍
* 如果不是正常退出,并且最大更新时间戳比checkpoint文件中的时间戳大,则删除该 index 文件
*/
this.indexService.load(lastExitOK);
/*
* 4 恢复ConsumeQueue文件和CommitLog文件,将正确的的数据恢复至内存中,删除错误数据和文件。
*/
this.recover(lastExitOK);
log.info("load over, and the max phy offset = ", this.getMaxPhyOffset());
/*
* 5 加载RocketMQ延迟消息的服务,包括延时等级、配置文件等等。
*/
if (null != scheduleMessageService)
result = this.scheduleMessageService.load();
catch (Exception e)
log.error("load exception", e);
result = false;
if (!result)
//如果上面的操作抛出异常,则文件服务停止
this.allocateMappedFileService.shutdown();
return result;
1 isTempFileExist是否存在临时文件
首先会判断是否存在临时文件,也就是abort文件,其路径为storePathRootDir/abort,Broker在启动时会创建ROCKET_HOME/store/abort文件,并且注册钩子函数:在JVM退出时删除abort文件。
如果下一次启动时不存在abort文件,表示钩子函数被正确执行,broker是正常退出的,不需要修复文件数据;如果存在abort文件,说明broker是异常退出的,因为钩子函数并没有执行成功,此时文件数据可能不一致,需要进行数据修复。
private boolean isTempFileExist()
//获取临时文件路径,路径为:storePathRootDir/abort
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
//构建file文件对象
File file = new File(fileName);
//判断文件是否存在
return file.exists();
下面就是在正常启动broker创建的abort文件:
2 commitLog#load加载消息日志文件
通过内部的CommitLog对象的load方法加载Commit Log日志文件,目录路径取自broker.conf文件中配置的storePathCommitLog属性,默认为$HOME/store/commitlog/。
CommitLog的load方法实际上是委托内部的mappedFileQueue的load方法进行加载。
/**
* CommitLog的方法
*/
public boolean load()
//调用mappedFileQueue的load方法
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
2.1 load加载文件
MappedFileQueue#load方法会就是将commitLog目录路径下的commotlog文件进行全部的加载为MappedFile对象。
/**
* MappedFileQueue的方法
*/
public boolean load()
//获取commitlog文件的存放目录,目录路径取自
//broker.conf文件中配置的storePathCommitLog属性,默认为$HOME/store/commitlog/
File dir = new File(this.storePath);
//获取内部的文件集合
File[] ls = dir.listFiles();
if (ls != null)
//如果存在commitlog文件,那么进行加载
return doLoad(Arrays.asList(ls));
return true;
/**
* MappedFileQueue的方法
*/
public boolean doLoad(List<File> files)
// 对commitlog文件按照文件名生序排序
files.sort(Comparator.comparing(File::getName));
for (File file : files)
//校验文件实际大小是否等于预定的文件大小,如果不想等,则直接返回false,不再加载其他文件
if (file.length() != this.mappedFileSize)
log.warn(file + "\\t" + file.length()
+ " length not matched message store config value, please check it manually");
return false;
try
/*
* 核心代码
* 每一个commitlog文件都创建一个对应的MappedFile对象
*
*/
MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
//将wrotePosition 、flushedPosition、committedPosition 默认设置为文件大小
//当前文件所映射到的消息写入page cache的位置
mappedFile.setWrotePosition(this.mappedFileSize);
//刷盘的最新位置
mappedFile.setFlushedPosition(this.mappedFileSize);
//已提交的最新位置
mappedFile.setCommittedPosition(this.mappedFileSize);
//添加到MappedFileQueue内部的mappedFiles集合中
this.mappedFiles.add(mappedFile);
log.info("load " + file.getPath() + " OK");
catch (IOException e)
log.error("load file " + file + " error", e);
return false;
return true;
在物理上,commotlog目录下面是一个个的commitlog文件,但是在Java中进行了三层映射,CommitLog-MappedFileQueue-MappedFile。CommitLog中包含MappedFileQueue,以及commitlog相关的其他服务,例如刷盘服务;MappedFileQueue中包含MappedFile集合,以及单个commotlog文件大小等属性,而MappedFile才是真正的一个commotlog文件在Java中的映射,包含文件名、大小、mmap对象mappedByteBuffer等属性。
实际上MappedFileQueue和MappedFile都是通用类,commitlog、comsumequeue、indexfile文件都会使用到。
2.1.1 创建MappedFile并映射文件
MappedFile作为一个RocketMQ的物理文件在Java中的映射类。commitLog consumerQueue、indexFile3种文件磁盘的读写都是通过MappedFile操作的。
它的构造器中会对当前文件进行mmap内存映射操作。这里我们不会对mmap进行过多讨论,会在介绍RocketMQ高性能的部分会专门介绍。
public MappedFile(final String fileName, final int fileSize) throws IOException
//调用init初始化
init(fileName, fileSize);
private void init(final String fileName, final int fileSize) throws IOException
//文件名。长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0
this.fileName = fileName;
//文件大小。默认1G=1073741824
this.fileSize = fileSize;
//构建file对象
this.file = new File(fileName);
//构建文件起始索引,就是取自文件名
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
//确保文件目录存在
ensureDirOK(this.file.getParent());
try
//对当前commitlog文件构建文件通道fileChannel
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
//把commitlog文件完全的映射到虚拟内存,也就是内存映射,即mmap,提升读写性能
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
//记录数据
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
catch (FileNotFoundException e)
log.error("Failed to create file " + this.fileName, e);
throw e;
catch (IOException e)
log.error("Failed to map file " + this.fileName, e);
throw e;
finally
//释放fileChannel,注意释放fileChannel不会对之前的mappedByteBuffer映射产生影响
if (!ok && this.fileChannel != null)
this.fileChannel.close();
2.2 commitlog文件简介
Commit Log文件是RocketMQ真正存储消息内容的地方,即消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。
官方描述如下:单个文件大小默认1G,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息顺序写入日志文件,效率很高,当文件满了,写入下一个文件。
我们使用RocketMQ下面的example包下面的快速案例生产1000个消息:
随后可以在配置的commitlog路径下面即可看到两个commitlog文件:
这1000数据的大小明显是不超过1G大小的,为什么会有两个commitlog文件呢?这就是RocketMQ的一个优化,即commitlog文件预创建,如果启用了MappedFile(MappedFile类可以看作是commitlog文件在Java中的抽象)预分配服务,那么在创建MappedFile时会同时创建两个MappedFile,一个同步创建并返回用于本次实际使用,一个后台异步创建用于下次取用。这样的好处是避免等到当前文件真正用完了才创建下一个文件,目的同样是提升性能。
3 loadConsumeQueue加载消费队列文件
该方法用于加载消费队列文件,ConsumeQueue文件可以看作是CommitLog的索引文件,其存储了它所属Topic的消息在Commit Log中的偏移量。消费者拉取消息的时候,可以从Consume Queue中快速的根据偏移量定位消息在Commit Log中的位置。
一个队列id目录对应着一个ConsumeQueue对象,其内部保存着一个mappedFileQueue对象,其表示当前队列id目录下面的ConsumeQueue文件集合,同样一个ConsumeQueue文件被映射为一个MappedFile对象。
随后ConsumeQueue及其topic和queueId的对应关系被存入DefaultMessageStore的consumeQueueTable属性集合中。
/**
* DefaultMessageStore的方法
*/
private boolean loadConsumeQueue()
//获取ConsumeQueue文件所在目录,目录路径为storePathRootDir/consumequeue
File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
//获取目录下文件列表,实际上下面页是topic目录列表
File[] fileTopicList = dirLogic.listFiles();
if (fileTopicList != null)
//遍历topic目录
for (File fileTopic : fileTopicList)
//获取topic名字
String topic = fileTopic.getName();
//获取topic目录下面的队列id目录
File[] fileQueueIdList = fileTopic.listFiles();
if (fileQueueIdList != null)
for (File fileQueueId : fileQueueIdList)
int queueId;
try
//获取队列id
queueId = Integer.parseInt(fileQueueId.getName());
catch (NumberFormatException e)
continue;
//创建ConsumeQueue对象,一个队列id目录对应着一个ConsumeQueue对象
//其内部保存着
ConsumeQueue logic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
//大小默认30w数据
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
//将当然ConsumeQueue对象及其对应关系存入consumeQueueTable中
this.putConsumeQueue(topic, queueId, logic);
//加载ConsumeQueue文件
if (!logic.load())
return false;
3.1 load加载ConsumeQueue文件
ConsumeQueue对象建立之后,会对自己管理的队列id目录下面的ConsumeQueue文件进行加载。内部就是调用mappedFileQueue的load方法,该方法我们前面讲过了,会对每个ConsumeQueue文件床创建一个MappedFile对象并且进行内存映射mmap操作。
public boolean load()
//调用mappedFileQueue的load方法,会对每个ConsumeQueue文件床创建一个MappedFile对象并且进行内存映射mmap操作。
boolean result = this.mappedFileQueue.load();
log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
if (isExtReadEnable())
//扩展加载,扩展消费队列用于存储不重要的东西,如消息存储时间、过滤位图等。
result &= this.consumeQueueExt.load();
return result;
3.2 ConsumeQueue文件简介
官方描述如下:消息消费队列(可以理解为Topic中的队列),引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。
Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值,以及ConsumeOffset(每个消费者组的消费位置)。
consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/topic/queueId/fileName。
同样consumequeue文件中的条目采取定长设计,每个条目共20字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。
ConsumeQueue名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表第一个文件,起始偏移量为0,文件大小为600w,当第一个文件满之后创建的第二个文件的名字为00000000000006000000,起始偏移量为6000000,以此类推,消息存储的时候会顺序写入文件,当文件写满了,写入下一个文件。
我们使用RocketMQ下面的example包下面的快速案例生产1000个消息之后,可以看到consumequeue目录下面产生了预期的Consume Queue文件结构,便于理解:
4 创建StoreCheckpoint检查点对象
在commitlog和consumequeue文件都加载成功之后,加载checkpoint 检查点文件,创建storeCheckpoint对象,文件位置是storePathRootDir/checkpoint。
StoreCheckpoint记录着commitLog、consumeQueue、index文件的最后更新时间点,当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复,这决定着文件从哪里开始恢复,甚至是删除文件。
StoreCheckpoint记录了三个关键属性:
-
physicMsgTimestamp:最新commitlog文件的刷盘时间戳,单位毫秒。
-
logicsMsgTimestamp:最新consumeQueue文件的刷盘时间戳,单位毫秒。
-
indexMsgTimestamp:创建最新indexfile文件的时间戳,单位毫秒。
public StoreCheckpoint(final String scpPath) throws IOException
File file = new File(scpPath);
//判断存在当前文件
MappedFile.ensureDirOK(file.getParent());
boolean fileExists = file.exists();
//对checkpoint文件同样执行mmap操作
this.randomAccessFile = new RandomAccessFile(file, "rw");
this.fileChannel = this.randomAccessFile.getChannel();
//mmap大小为OS_PAGE_SIZE,即OS一页,4k
this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE);
if (fileExists)
log.info("store checkpoint file exists, " + scpPath);
//获取commitlog文件的时间戳,即最新commitlog文件的刷盘时间戳
this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
//获取consumeQueue文件的时间戳,即最新consumeQueue文件的刷盘时间戳
this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
//获取index文件的时间戳,即创建最新indexfile文件的时间戳
this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);
log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));
log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "
+ UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
else
log.info("store checkpoint file not exists, " + scpPath);
5 加载index索引文件
加载 index 索引文件,目录路径为storePathRootDir/index。index 索引文件用于通过时间区间来快速查询消息,底层为HashMap结构,实现为hash索引。
最终一个index文件对应着一个IndexFile实例,并且会加到indexFileList集合中。还会判断如果上次broker不是正常退出,并且并且当前index文件中最后一个消息的落盘时间戳大于StoreCheckpoint中的最后一个index索引文件创建时间,则该索引文件被删除。
/**
* IndexService的方法
* @param lastExitOK 上次是否正常推出
*/
public boolean load(final boolean lastExitOK)
//获取上级目录路径,storePathRootDir/index
File dir <以上是关于RocketMQ源码—Broker启动加载消息文件以及恢复数据源码一万字的主要内容,如果未能解决你的问题,请参考以下文章