RocketMQ源码—Broker启动加载消息文件以及恢复数据源码一万字

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码—Broker启动加载消息文件以及恢复数据源码一万字相关的知识,希望对你有一定的参考价值。

详细介绍了Broker启动加载消息文件以及恢复数据源码。

此前我们学习了Broker的启动源码:RocketMQ源码(3)—Broker启动流程源码解析【一万字】,Broker的启动过程中,在DefaultMessageStore实例化之后,将会调用load方法将磁盘中的commitLog、ConsumeQueue、IndexFile文件的数据加载到内存中,还会进行数据恢复操作。

下面看看Broker启动加载消息文件以及恢复数据源码。

文章目录

load方法主要步骤为:

  1. 调用isTempFileExist方法判断上次broker是否是正常退出,如果是正常退出不会保留abort文件,异常退出则会。

  2. 加载CommitLog日志文件。CommitLog文件是真正存储消息内容的地方。

  3. 加载ConsumeQueue文件。ConsumeQueue文件可以看作是CommitLog的消息偏移量索引文件。

  4. 加载 index 索引文件。Index文件可以看作是CommitLog的消息时间范围索引文件。

  5. 恢复ConsumeQueue文件和CommitLog文件,将正确的的数据恢复至内存中,删除错误数据和文件。

  6. 加载RocketMQ延迟消息的服务,包括延时等级、配置文件等等。

/**
 * DefaultMessageStore的方法
 * 加载Commit Log、Consume Queue、index file等文件,将数据加载到内存中,并完成数据的恢复
 *
 * @throws IOException
 */
public boolean load() 
    boolean result = true;

    try 
        /*
         * 1 判断上次broker是否是正常退出,如果是正常退出不会保留abort文件,异常退出则会
         *
         * Broker在启动时会创建storePathRootDir/abort文件,并且注册钩子函数:在JVM退出时删除abort文件。
         * 如果下一次启动时存在abort文件,说明Broker是异常退出的,文件数据可能不一直,需要进行数据修复。
         */
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown ", lastExitOK ? "normally" : "abnormally");

        /*
         * 2 加载Commit Log日志文件,目录路径取自broker.conf文件中的storePathCommitLog属性
         * Commit Log文件是真正存储消息内容的地方,单个文件默认大小1G。
         */
        // load Commit Log
        result = result && this.commitLog.load();
        /*
         * 2 加载Consume Queue文件,目录路径为storePathRootDir/consumequeue,文件组织方式为topic/queueId/fileName
         * Consume Queue文件可以看作是Commit Log的索引文件,其存储了它所属Topic的消息在Commit Log中的偏移量
         * 消费者拉取消息的时候,可以从Consume Queue中快速的根据偏移量定位消息在Commit Log中的位置。
         */
        // load Consume Queue
        result = result && this.loadConsumeQueue();

        if (result) 
            /*
             * 3 加载checkpoint 检查点文件,文件位置是storePathRootDir/checkpoint
             * StoreCheckpoint记录着commitLog、ConsumeQueue、Index文件的最后更新时间点,
             * 当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复,这决定着文件从哪里开始恢复,甚至是删除文件
             */
            this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            /*
             * 4 加载 index 索引文件,目录路径为storePathRootDir/index
             * index 索引文件用于通过时间区间来快速查询消息,底层为HashMap结构,实现为hash索引。后面会专门出文介绍
             * 如果不是正常退出,并且最大更新时间戳比checkpoint文件中的时间戳大,则删除该 index 文件
             */
            this.indexService.load(lastExitOK);
            /*
             * 4 恢复ConsumeQueue文件和CommitLog文件,将正确的的数据恢复至内存中,删除错误数据和文件。
             */
            this.recover(lastExitOK);

            log.info("load over, and the max phy offset = ", this.getMaxPhyOffset());

            /*
             * 5 加载RocketMQ延迟消息的服务,包括延时等级、配置文件等等。
             */
            if (null != scheduleMessageService) 
                result = this.scheduleMessageService.load();
            
        

     catch (Exception e) 
        log.error("load exception", e);
        result = false;
    

    if (!result) 
        //如果上面的操作抛出异常,则文件服务停止
        this.allocateMappedFileService.shutdown();
    

    return result;

1 isTempFileExist是否存在临时文件

首先会判断是否存在临时文件,也就是abort文件,其路径为storePathRootDir/abort,Broker在启动时会创建ROCKET_HOME/store/abort文件,并且注册钩子函数:在JVM退出时删除abort文件。

如果下一次启动时不存在abort文件,表示钩子函数被正确执行,broker是正常退出的,不需要修复文件数据;如果存在abort文件,说明broker是异常退出的,因为钩子函数并没有执行成功,此时文件数据可能不一致,需要进行数据修复。

private boolean isTempFileExist() 
    //获取临时文件路径,路径为:storePathRootDir/abort
    String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
    //构建file文件对象
    File file = new File(fileName);
    //判断文件是否存在
    return file.exists();

下面就是在正常启动broker创建的abort文件:

2 commitLog#load加载消息日志文件

通过内部的CommitLog对象的load方法加载Commit Log日志文件,目录路径取自broker.conf文件中配置的storePathCommitLog属性,默认为$HOME/store/commitlog/。

CommitLog的load方法实际上是委托内部的mappedFileQueue的load方法进行加载。

/**
 * CommitLog的方法
 */
public boolean load() 
    //调用mappedFileQueue的load方法
    boolean result = this.mappedFileQueue.load();
    log.info("load commit log " + (result ? "OK" : "Failed"));
    return result;

2.1 load加载文件

MappedFileQueue#load方法会就是将commitLog目录路径下的commotlog文件进行全部的加载为MappedFile对象。

/**
 * MappedFileQueue的方法
 */
public boolean load() 
    //获取commitlog文件的存放目录,目录路径取自
    //broker.conf文件中配置的storePathCommitLog属性,默认为$HOME/store/commitlog/
    File dir = new File(this.storePath);
    //获取内部的文件集合
    File[] ls = dir.listFiles();
    if (ls != null) 
        //如果存在commitlog文件,那么进行加载
        return doLoad(Arrays.asList(ls));
    
    return true;

/**
 * MappedFileQueue的方法
 */
public boolean doLoad(List<File> files) 
    // 对commitlog文件按照文件名生序排序
    files.sort(Comparator.comparing(File::getName));

    for (File file : files) 
        //校验文件实际大小是否等于预定的文件大小,如果不想等,则直接返回false,不再加载其他文件
        if (file.length() != this.mappedFileSize) 
            log.warn(file + "\\t" + file.length()
                    + " length not matched message store config value, please check it manually");
            return false;
        

        try 
            /*
             * 核心代码
             * 每一个commitlog文件都创建一个对应的MappedFile对象
             *
             */
            MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
            //将wrotePosition 、flushedPosition、committedPosition 默认设置为文件大小
            //当前文件所映射到的消息写入page cache的位置
            mappedFile.setWrotePosition(this.mappedFileSize);
            //刷盘的最新位置
            mappedFile.setFlushedPosition(this.mappedFileSize);
            //已提交的最新位置
            mappedFile.setCommittedPosition(this.mappedFileSize);
            //添加到MappedFileQueue内部的mappedFiles集合中
            this.mappedFiles.add(mappedFile);
            log.info("load " + file.getPath() + " OK");
         catch (IOException e) 
            log.error("load file " + file + " error", e);
            return false;
        
    
    return true;

在物理上,commotlog目录下面是一个个的commitlog文件,但是在Java中进行了三层映射,CommitLog-MappedFileQueue-MappedFile。CommitLog中包含MappedFileQueue,以及commitlog相关的其他服务,例如刷盘服务;MappedFileQueue中包含MappedFile集合,以及单个commotlog文件大小等属性,而MappedFile才是真正的一个commotlog文件在Java中的映射,包含文件名、大小、mmap对象mappedByteBuffer等属性。

实际上MappedFileQueue和MappedFile都是通用类,commitlog、comsumequeue、indexfile文件都会使用到。

2.1.1 创建MappedFile并映射文件

MappedFile作为一个RocketMQ的物理文件在Java中的映射类。commitLog consumerQueue、indexFile3种文件磁盘的读写都是通过MappedFile操作的。

它的构造器中会对当前文件进行mmap内存映射操作。这里我们不会对mmap进行过多讨论,会在介绍RocketMQ高性能的部分会专门介绍。

public MappedFile(final String fileName, final int fileSize) throws IOException 
    //调用init初始化
    init(fileName, fileSize);

private void init(final String fileName, final int fileSize) throws IOException 
    //文件名。长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0
    this.fileName = fileName;
    //文件大小。默认1G=1073741824
    this.fileSize = fileSize;
    //构建file对象
    this.file = new File(fileName);
    //构建文件起始索引,就是取自文件名
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;
    //确保文件目录存在
    ensureDirOK(this.file.getParent());

    try 
        //对当前commitlog文件构建文件通道fileChannel
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        //把commitlog文件完全的映射到虚拟内存,也就是内存映射,即mmap,提升读写性能
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        //记录数据
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
     catch (FileNotFoundException e) 
        log.error("Failed to create file " + this.fileName, e);
        throw e;
     catch (IOException e) 
        log.error("Failed to map file " + this.fileName, e);
        throw e;
     finally 
        //释放fileChannel,注意释放fileChannel不会对之前的mappedByteBuffer映射产生影响
        if (!ok && this.fileChannel != null) 
            this.fileChannel.close();
        
    

2.2 commitlog文件简介

Commit Log文件是RocketMQ真正存储消息内容的地方,即消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。

官方描述如下:单个文件大小默认1G,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息顺序写入日志文件,效率很高,当文件满了,写入下一个文件。

我们使用RocketMQ下面的example包下面的快速案例生产1000个消息:

随后可以在配置的commitlog路径下面即可看到两个commitlog文件:

这1000数据的大小明显是不超过1G大小的,为什么会有两个commitlog文件呢?这就是RocketMQ的一个优化,即commitlog文件预创建,如果启用了MappedFile(MappedFile类可以看作是commitlog文件在Java中的抽象)预分配服务,那么在创建MappedFile时会同时创建两个MappedFile,一个同步创建并返回用于本次实际使用,一个后台异步创建用于下次取用。这样的好处是避免等到当前文件真正用完了才创建下一个文件,目的同样是提升性能。

3 loadConsumeQueue加载消费队列文件

该方法用于加载消费队列文件,ConsumeQueue文件可以看作是CommitLog的索引文件,其存储了它所属Topic的消息在Commit Log中的偏移量。消费者拉取消息的时候,可以从Consume Queue中快速的根据偏移量定位消息在Commit Log中的位置。

一个队列id目录对应着一个ConsumeQueue对象,其内部保存着一个mappedFileQueue对象,其表示当前队列id目录下面的ConsumeQueue文件集合,同样一个ConsumeQueue文件被映射为一个MappedFile对象。

随后ConsumeQueue及其topic和queueId的对应关系被存入DefaultMessageStore的consumeQueueTable属性集合中。

/**
 * DefaultMessageStore的方法
 */
private boolean loadConsumeQueue() 
    //获取ConsumeQueue文件所在目录,目录路径为storePathRootDir/consumequeue
    File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
    //获取目录下文件列表,实际上下面页是topic目录列表
    File[] fileTopicList = dirLogic.listFiles();
    if (fileTopicList != null) 
        //遍历topic目录
        for (File fileTopic : fileTopicList) 
            //获取topic名字
            String topic = fileTopic.getName();
            //获取topic目录下面的队列id目录
            File[] fileQueueIdList = fileTopic.listFiles();
            if (fileQueueIdList != null) 
                for (File fileQueueId : fileQueueIdList) 
                    int queueId;
                    try 
                        //获取队列id
                        queueId = Integer.parseInt(fileQueueId.getName());
                     catch (NumberFormatException e) 
                        continue;
                    
                    //创建ConsumeQueue对象,一个队列id目录对应着一个ConsumeQueue对象
                    //其内部保存着
                    ConsumeQueue logic = new ConsumeQueue(
                            topic,
                            queueId,
                            StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                            //大小默认30w数据
                            this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                            this);
                    //将当然ConsumeQueue对象及其对应关系存入consumeQueueTable中
                    this.putConsumeQueue(topic, queueId, logic);
                    //加载ConsumeQueue文件
                    if (!logic.load()) 
                        return false;
                    
                
            
        
    

3.1 load加载ConsumeQueue文件

ConsumeQueue对象建立之后,会对自己管理的队列id目录下面的ConsumeQueue文件进行加载。内部就是调用mappedFileQueue的load方法,该方法我们前面讲过了,会对每个ConsumeQueue文件床创建一个MappedFile对象并且进行内存映射mmap操作。

public boolean load() 
    //调用mappedFileQueue的load方法,会对每个ConsumeQueue文件床创建一个MappedFile对象并且进行内存映射mmap操作。
    boolean result = this.mappedFileQueue.load();
    log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
    if (isExtReadEnable()) 
        //扩展加载,扩展消费队列用于存储不重要的东西,如消息存储时间、过滤位图等。
        result &= this.consumeQueueExt.load();
    
    return result;

3.2 ConsumeQueue文件简介

官方描述如下:消息消费队列(可以理解为Topic中的队列),引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。

Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值,以及ConsumeOffset(每个消费者组的消费位置)。

consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/topic/queueId/fileName。

同样consumequeue文件中的条目采取定长设计,每个条目共20字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。

ConsumeQueue名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表第一个文件,起始偏移量为0,文件大小为600w,当第一个文件满之后创建的第二个文件的名字为00000000000006000000,起始偏移量为6000000,以此类推,消息存储的时候会顺序写入文件,当文件写满了,写入下一个文件。

我们使用RocketMQ下面的example包下面的快速案例生产1000个消息之后,可以看到consumequeue目录下面产生了预期的Consume Queue文件结构,便于理解:

4 创建StoreCheckpoint检查点对象

在commitlog和consumequeue文件都加载成功之后,加载checkpoint 检查点文件,创建storeCheckpoint对象,文件位置是storePathRootDir/checkpoint。

StoreCheckpoint记录着commitLog、consumeQueue、index文件的最后更新时间点,当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复,这决定着文件从哪里开始恢复,甚至是删除文件。

StoreCheckpoint记录了三个关键属性:

  1. physicMsgTimestamp:最新commitlog文件的刷盘时间戳,单位毫秒。

  2. logicsMsgTimestamp:最新consumeQueue文件的刷盘时间戳,单位毫秒。

  3. indexMsgTimestamp:创建最新indexfile文件的时间戳,单位毫秒。

public StoreCheckpoint(final String scpPath) throws IOException 
    File file = new File(scpPath);
    //判断存在当前文件
    MappedFile.ensureDirOK(file.getParent());
    boolean fileExists = file.exists();
    //对checkpoint文件同样执行mmap操作
    this.randomAccessFile = new RandomAccessFile(file, "rw");
    this.fileChannel = this.randomAccessFile.getChannel();
    //mmap大小为OS_PAGE_SIZE,即OS一页,4k
    this.mappedByteBuffer = fileChannel.map(MapMode.READ_WRITE, 0, MappedFile.OS_PAGE_SIZE);

    if (fileExists) 
        log.info("store checkpoint file exists, " + scpPath);
        //获取commitlog文件的时间戳,即最新commitlog文件的刷盘时间戳
        this.physicMsgTimestamp = this.mappedByteBuffer.getLong(0);
        //获取consumeQueue文件的时间戳,即最新consumeQueue文件的刷盘时间戳
        this.logicsMsgTimestamp = this.mappedByteBuffer.getLong(8);
        //获取index文件的时间戳,即创建最新indexfile文件的时间戳
        this.indexMsgTimestamp = this.mappedByteBuffer.getLong(16);

        log.info("store checkpoint file physicMsgTimestamp " + this.physicMsgTimestamp + ", "
            + UtilAll.timeMillisToHumanString(this.physicMsgTimestamp));
        log.info("store checkpoint file logicsMsgTimestamp " + this.logicsMsgTimestamp + ", "
            + UtilAll.timeMillisToHumanString(this.logicsMsgTimestamp));
        log.info("store checkpoint file indexMsgTimestamp " + this.indexMsgTimestamp + ", "
            + UtilAll.timeMillisToHumanString(this.indexMsgTimestamp));
     else 
        log.info("store checkpoint file not exists, " + scpPath);
    

5 加载index索引文件

加载 index 索引文件,目录路径为storePathRootDir/index。index 索引文件用于通过时间区间来快速查询消息,底层为HashMap结构,实现为hash索引。

最终一个index文件对应着一个IndexFile实例,并且会加到indexFileList集合中。还会判断如果上次broker不是正常退出,并且并且当前index文件中最后一个消息的落盘时间戳大于StoreCheckpoint中的最后一个index索引文件创建时间,则该索引文件被删除。

/**
 * IndexService的方法
 * @param lastExitOK 上次是否正常推出
 */
public boolean load(final boolean lastExitOK) 
    //获取上级目录路径,storePathRootDir/index
    File dir <

以上是关于RocketMQ源码—Broker启动加载消息文件以及恢复数据源码一万字的主要内容,如果未能解决你的问题,请参考以下文章

5RocketMQ 源码解析之 命名服务启动

5RocketMQ 源码解析之 命名服务启动

6RocketMQ 源码解析之 Broker 启动(上)

6RocketMQ 源码解析之 Broker 启动(上)

从RocketMQ的Broker源码层面验证一下这两个点

8RocketMQ 源码解析之消息发送