RocketMQ源码-同步和异步刷盘

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码-同步和异步刷盘相关的知识,希望对你有一定的参考价值。

参考技术A 1 概述
2 相关类介绍
3 同步刷盘原理
4 异步刷盘

RocketMQ和其他存储系统类似,如Redis等,提供了同步和异步两种刷盘方式,同步刷盘方式能够保证数据被写入硬盘,做到真正的持久化,但是也会让系统的写入速度受制于磁盘的IO速度;而异步刷盘方式在将数据写入缓冲之后就返回,提供了系统的IO速度,却存在系统发生故障时未来得及写入硬盘的数据丢失的风险。

RocketMQ提供了 SYNC_FLUSH 和 ASYNC_FLUSH 两种方式,也即同步和异步刷盘方式,同步刷盘在写入消息后会等待刷盘进度大于等于当前写入经度之后返回,而异步刷盘则在写入消息之后直接返回,不再等待刷盘进度。

在阅读本文前可先看文章 RocketMQ源码-MappedFile介绍 ,了解其中介绍的暂存池相关原理以及具体刷盘操作时 commit 和 flush 动作的区别,本文在介绍刷盘时则不再赘述。

其实同步刷盘、异步刷盘和我们在文章 RocketMQ源码-主从同步复制和异步复制 介绍的同步复制、异步复制原理基本相同,同步刷盘也是阻塞等待当前刷盘进度大于等于此次写入进度然后返回,而异步刷盘写入之后直接返回,由后台线程定时进行刷盘动作。

如果配置的刷盘方式为同步方式,即 SYNC_FLUSH ,那么根据我们在文章 RocketMQ源码-MappedFile介绍 第8节介绍的注意事项可知,该配置肯定不会启用 MappedFile 的暂存池 TransientStorePool 功能。而 GroupCommitService 就是用于同步刷盘时进行实际的刷盘动作。

用于没有启用暂存池的异步刷盘动作,主要是定时触发 flush 动作。

用于启用了暂存池的异步刷盘动作,和 FlushRealTimeService 不同的是, CommitRealTimeService 在刷盘时会先将从暂存池借用的 ByteBuffer 中的数据 commit 到 fileChannel 中,然后调用 flush 对 fileChannel 进行刷盘操作。

CommitLog.putMessage 在写入消息之后,会调用 handleDiskFlush 进行刷盘相关处理,该方法实现如下:

相关的服务源码比较简单,和主从同步复制及其类似,这里不再介绍,建议阅读文章 RocketMQ源码-主从同步复制和异步复制 做对比理解。

异步刷盘则写入消息之后直接返回,由 ServiceThread 实现类 FlushRealTimeService 以及 CommitRealTimeService 在后台根据配置的刷盘频率进行异步刷盘, FlushRealTimeService 对未启用暂存池的 MappedFile 进行刷盘,而 CommitRealTimeService 则对启用了暂存池的 MappedFile 进行刷盘。

RocketMQ:消息存储机制详解与源码解析

消息存储机制

1.前言

本文主要讲解内容是Broker接收到消息生产者发送的消息之后,如何将消息持久化存储在Broker中。

2.核心存储类:DefaultMessageStore

private final MessageStoreConfig messageStoreConfig;	//消息配置属性
private final CommitLog commitLog;		//CommitLog文件存储的实现类->消息存储在commitLog中
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;	//消息队列存储缓存表,按照消息主题分组
private final FlushConsumeQueueService flushConsumeQueueService;	//消息队列文件刷盘服务线程
private final CleanCommitLogService cleanCommitLogService;	//过期CommitLog文件删除服务
private final CleanConsumeQueueService cleanConsumeQueueService;	//过期ConsumerQueue队列文件删除服务
private final IndexService indexService;	//索引服务
private final AllocateMappedFileService allocateMappedFileService;	//MappedFile分配服务->内存映射处理commitLog、consumerQueue文件
private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件
private final HAService haService;	//消息主从同步实现服务
private final ScheduleMessageService scheduleMessageService;	//消息服务调度服务
private final StoreStatsService storeStatsService;	//消息存储服务
private final MessageArrivingListener messageArrivingListener;  //消息到达监听器
private final TransientStorePool transientStorePool;	//消息堆外内存缓存
private final BrokerStatsManager brokerStatsManager;	//Broker状态管理器
private final MessageArrivingListener messageArrivingListener;	//消息拉取长轮询模式消息达到监听器
private final BrokerConfig brokerConfig;	//Broker配置类
private StoreCheckpoint storeCheckpoint;	//文件刷盘监测点
private final LinkedList<CommitLogDispatcher> dispatcherList;	//CommitLog文件转发请求

以上属性是消息存储的核心,需要重点关注每个属性的具体作用。

3.消息存储流程

消息存储时序图如下:

消息存储入口:DefaultMessageStore#putMessage

//检查Broker是否是Slave || 判断当前写入状态如果是正在写入,则不能继续 
PutMessageStatus checkStoreStatus = this.checkStoreStatus();		
if (checkStoreStatus != PutMessageStatus.PUT_OK) 
    return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));


//检查消息主题和消息体长度是否合法
PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) 
    return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));

//记录开始写入时间
long beginTime = this.getSystemClock().now();
//写入消息
CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);

resultFuture.thenAccept((result) -> 
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) 
        log.warn("not in lock elapsed time(ms)=, bodyLength=", elapsedTime, messageExtBatch.getBody().length);
    
	//记录相关统计信息
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
	//存储失败
    if (null == result || !result.isOk()) 
        //存储状态服务->消息存储失败次数自增
        this.storeStatsService.getPutMessageFailedTimes().add(1);
    
);

return resultFuture;

DefaultMessageStore#checkStoreStatus

//存储服务已停止
if (this.shutdown) 
    log.warn("message store has shutdown, so putMessage is forbidden");
    return PutMessageStatus.SERVICE_NOT_AVAILABLE;

//Broker为Slave->不可写入
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) 
    long value = this.printTimes.getAndIncrement();
    if ((value % 50000) == 0) 
        log.warn("broke role is slave, so putMessage is forbidden");
    
    return PutMessageStatus.SERVICE_NOT_AVAILABLE;


//不可写入->broker磁盘已满/写入逻辑队列错误/写入索引文件错误
if (!this.runningFlags.isWriteable()) 
    long value = this.printTimes.getAndIncrement();
    if ((value % 50000) == 0) 
        log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
            "the broker's disk is full, write to logic queue error, write to index file error, etc");
    
    return PutMessageStatus.SERVICE_NOT_AVAILABLE;
 else 
    this.printTimes.set(0);

//操作系统页写入是否繁忙
if (this.isOSPageCacheBusy()) 
    return PutMessageStatus.OS_PAGECACHE_BUSY;

return PutMessageStatus.PUT_OK;

CommitLog#asyncPutMessages

//记录消息存储时间
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result;

StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());

//消息类型是否合法
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) 
    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));


//....

//获取上一个MapperFile对象->内存映射的具体实现
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

//追加消息需要加锁->串行化处理
putMessageLock.lock();
try 
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    this.beginTimeInLock = beginLockTimestamp;

    //记录消息存储时间->保证消息的有序性
    messageExtBatch.setStoreTimestamp(beginLockTimestamp);

    //判断如果mappedFile如果为空或者已满,创建新的mappedFile文件
    if (null == mappedFile || mappedFile.isFull()) 
        mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
    
    //如果创建失败,直接返回
    if (null == mappedFile) 
        log.error("Create mapped file1 error, topic:  clientAddr: ", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
        beginTimeInLock = 0;
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
    

    //!!!写入消息到mappedFile中!!!
    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
    //根据写入结果做不同的处理
    switch (result.getStatus()) 
        case PUT_OK:
            break;
        case END_OF_FILE:
            unlockMappedFile = mappedFile;
            // Create a new file, re-write the message
            mappedFile = this.mappedFileQueue.getLastMappedFile(0);
            if (null == mappedFile) 
                // XXX: warn and notify me
                log.error("Create mapped file2 error, topic:  clientAddr: ", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
            
            result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
            break;
        case MESSAGE_SIZE_EXCEEDED:
        case PROPERTIES_SIZE_EXCEEDED:
            beginTimeInLock = 0;
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
        case UNKNOWN_ERROR:
        default:
            beginTimeInLock = 0;
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
    

    elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
    beginTimeInLock = 0;
 finally 
    putMessageLock.unlock();


if (elapsedTimeInLock > 500) 
    log.warn("[NOTIFYME]putMessages in lock cost time(ms)=, bodyLength= AppendMessageResult=", elapsedTimeInLock, messageExtBatch.getBody().length, result);


if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) 
    this.defaultMessageStore.unlockMappedFile(unlockMappedFile);


PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());

//根据刷盘策略进行刷盘
CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
//主从同步
CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);

MappedFile#appendMessagesInner

assert messageExt != null;
assert cb != null;

//获取写指针/写入位置
int currentPos = this.wrotePosition.get();

//写指针偏移量小于文件指定大小
if (currentPos < this.fileSize) 
    //写入缓冲区
    ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
    byteBuffer.position(currentPos);
    AppendMessageResult result;
    //根据消息类型->批量/单个->进行不同处理
    if (messageExt instanceof MessageExtBrokerInner) 
        //单个消息
        //调用回调方法写入磁盘->CommitLog#doAppend
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                (MessageExtBrokerInner) messageExt, putMessageContext);
     else if (messageExt instanceof MessageExtBatch) 
        //批量消息
        result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                (MessageExtBatch) messageExt, putMessageContext);
     else 
        //未知消息->返回异常结果
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    
    //更新写指针
    this.wrotePosition.addAndGet(result.getWroteBytes());
    //更新写入时间戳
    this.storeTimestamp = result.getStoreTimestamp();
    //返回写入结果->成功
    return result;

log.error("MappedFile.appendMessage return null, wrotePosition:  fileSize: ", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);

CommitLog#doAppend

public AppendMessageResult doAppend(final long fileFromOffset,			//文件序列偏移量
                                    final ByteBuffer byteBuffer, 		//NIO字节容器
                                    final int maxBlank,				   //最大可写入字节数	
                                    final MessageExtBrokerInner msgInner, //消息封装实体
                                    PutMessageContext putMessageContext) 
    //文件写入偏移量
    long wroteOffset = fileFromOffset + byteBuffer.position();

    //构建msgId
    Supplier<String> msgIdSupplier = () -> 
        //系统标识
        int sysflag = msgInner.getSysFlag();
        //msgId底层存储由16个字节组成
        int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
        //分配16个字节的存储空间
        ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
        //8个字节->ip、host各占用4个字节
        MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
        //清除缓冲区->因为接下来需要翻转缓冲区
        msgIdBuffer.clear();
        //剩下的8个字节用来存储commitLog偏移量-wroteOffset
        msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
        return UtilAll.bytes2string(msgIdBuffer.array());
    ;

    //获取当前主题消息队列唯一key
    String key = putMessageContext.getTopicQueueTableKey();
    //根据key获取消息存储偏移量
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    if (null == queueOffset) 
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    

    // Transaction messages that require special handling
    final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
    switch (tranType) 
        // Prepared and Rollback message is not consumed, will not enter the
        // consumer queuec
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            queueOffset = 0L;
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        default:
            break;
    

    ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
    //计算消息存储长度
    final int msgLen = preEncodeBuffer.getInt(0);

    // Determines whether there is sufficient free space
    //消息是如果没有足够的存储空间则新创建CommitLog文件
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) 
        this.msgStoreItemMemory.clear();
        // 1 TOTALSIZE
        this.msgStoreItemMemory.putInt(maxBlank);
        // 2 MAGICCODE
        this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
        // 3 The remaining space may be any value
        // Here the length of the specially set maxBlank
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        byteBuffer.put(this.msgStoreItemMemory.array(), 0深入源码聊聊RocketMQ的刷盘机制

RocketMQ和Kafka到底选哪个

RocketMQ源码(12)—Broker 的消息刷盘源码深度解析一万字

消息队列技术选型(Kafka + RocketMQ)

RocketMQ刷盘机制

RocketMQ源码-主从同步复制和异步复制