打怪升级rocketMqrocket的持久化

Posted fisher

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了打怪升级rocketMqrocket的持久化相关的知识,希望对你有一定的参考价值。

rocket持久化保证的思想有两点:1是刷盘保证大部分数据不丢失;2是持久化文件的处理,零拷贝技术和内存页,NIO模型保证处理能力

  • 文件持久化目录

  ├──abort:rocket broker启动检查的文件,正常启动会写入一个abort,正常退出会删除abort,通过它来判断上一次是否异常退出

  ├──checkpoint:随着broker启动,加载的历史检查点

  ├──lock:全局资源的文件锁

  ├──commitlog:broker存储的核心,我们都是到rocket是broker集中存储,落地存盘就存在commitlog里

  │ ├──00000000000000000000(示例)rocket会对commitlog进行预创建,并将消息写入,每次创建的文件根据当前文件偏移量决定,例如第一次创建就是00000000000000000000

  ├──compaction:(基于rocket 5.0)

  │ ├──position-checkpoint:缓存上一次消费的检查点,每次处理完成后会更新

  ├──config:

  │ ├──consumerFilter.json:存储对应topic下的消息过滤规则:ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>

  │ ├──consumerOffset.json:根据消费者组存储的每个消费者消费点位:ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>>

  │ ├──consumerOrderInfo.json:顺序消息顺序:ConcurrentHashMap<String/* topic@group*/, ConcurrentHashMap<Integer/*queueId*/, OrderInfo>>

  │ ├──delayOffset.json:针对消费者pull的延时队列拉取消费点位

  │ ├──subscriptionGroup.json:消费者组对应订阅的消息信息,其实就是broker接收的消费者信息

  │ ├──topics.json:存储对应的topic信息

  │ ├──timercheck:基于定时消息的时间轮配置文件,rocket5.0以上版本

  │ ├──timermetrics:基于定时消息的时间轮配置文件,rocket5.0以上版本

  ├──consumequeue:broker对应topic下队列的消费信息

  │ ├──%topicName:主题名称

  │ │ ├──%queueId:队列id

  │ │ │ ├──00000000000000000000:消费点位

  ├──index:索引文目录

  │ ├──00000000000000000000:索引文件,快速定位commitlog中的消息位置

  └──timerwheel:基于时间轮算法实现定时消息的配置

  这些文件是broker支持容灾的基础,rocket集群其实就是broker集群的能力,通过这些配置文件可以做到不丢失,在broker启动时会加载对应的配置。

/**
 * 上层抽象的配置工厂,在broker启动时会根据组件依次加载,并将文件读取到变量中。例如consumerOffsetTable
 * 抽象类下每一个manager加载对应的配置信息
 */
public abstract class ConfigManager 
    private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

    public abstract String encode();   

 

 

  • store存储

  rocket基于文件的处理,底层是采用mmap的方式和NIO的byteBuffer,在store上层封装了基本的组件

  

/**
 * TODO store消息处理的核心对象 mappedFile封装了对消息处理 写入
 *      NIO 的文件到磁盘的处理工具
 */
public class DefaultMappedFile extends AbstractMappedFile 
    // 操作系统数据页 4K,unix系列通常是这个大小
    public static final int OS_PAGE_SIZE = 1024 * 4;
    public static final Unsafe UNSAFE = getUnsafe();
    private static final Method IS_LOADED_METHOD;
    public static final int UNSAFE_PAGE_SIZE = UNSAFE == null ? OS_PAGE_SIZE : UNSAFE.pageSize();

    protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    // mq总共分配的映射文件内存大小
    protected static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    // mq总共创建的内存文件映射数量
    protected static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);

    protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> WROTE_POSITION_UPDATER;
    protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> COMMITTED_POSITION_UPDATER;
    protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> FLUSHED_POSITION_UPDATER;

    // 当前数据的写入位置指针,下次写数据从此开始写入
    protected volatile int wrotePosition;
    // 当前数据的提交指针,指针之前的数据已提交到fileChannel,commitPos~writePos之间的数据是还未提交到fileChannel的
    protected volatile int committedPosition;
    // 当前数据的刷盘指针,指针之前的数据已落盘,commitPos~flushedPos之间的数据是还未落盘的
    protected volatile int flushedPosition;
    //文件大小 字节
    protected int fileSize;
    // TODO 磁盘文件的内存文件通道对象 也是mmap的方式体现
    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    // 异步刷盘时数据先写入writeBuf,由CommitRealTime线程定时200ms提交到fileChannel内存,再由FlushRealTime线程定时500ms刷fileChannel落盘
    protected ByteBuffer writeBuffer = null;
    // 堆外内存池,服务于异步刷盘机制,为了减少内存申请和销毁的时间,提前向OS申请并锁定一块对外内存池,writeBuf就从这里获取
    protected TransientStorePool transientStorePool = null;
    // 文件起始的字节
    protected String fileName;
    // 文件的初始消费点位,跟文件的命名相关 例如 00000000000000000000 就代表从0开始,默认一个commitLog是1G 大小,那么超过之后会生成新的commitLog 文件名称就是当前文件起始的偏移量
    protected long fileFromOffset;
    protected File file;
    // 磁盘文件的内存映射对象,同步刷盘时直接将数据写入到mapedBuf
    protected MappedByteBuffer mappedByteBuffer;
    // 最近操作的时间戳
    protected volatile long storeTimestamp = 0;
    protected boolean firstCreateInQueue = false;
    private long lastFlushTime = -1L;

    protected MappedByteBuffer mappedByteBufferWaitToClean = null;
    protected long swapMapTime = 0L;
    protected long mappedByteBufferAccessCountSinceLastSwap = 0L;

  首先,核心的DefaultMappedFile 使用了 FileChannel 通道,它也是基于mmap的实现零拷贝技术。

  其中它定义了三个指针,分别是
  wrotePosition:当前数据的写入位置指针,下次写数据从此开始写入

  committedPosition:当前数据的提交指针,指针之前的数据已提交到fileChannel,commitPos~writePos之间的数据是还未提交到fileChannel的

  flushedPosition:当前数据的刷盘指针,指针之前的数据已落盘,commitPos~flushedPos之间的数据是还未落盘的

  同时,定义了ByteBuffer,基于NIO在异步刷盘时,先会将数据写入byteBuffer,然后会有定时线程会定时拉取到fileChannel通道,最后将fileChannel进行刷盘

 

/**
     * 根据队列中的AllocateRequest创建下一个commitLog
     */
    public void run() 
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped() && this.mmapOperation()) 

        
        log.info(this.getServiceName() + " service end");
    

  AllocateRequest封装的是对commitLog预处理的动作,AllocateRequest是对预创建commitLog的封装,会在处理时预创建并将放入队列,在store启动时会启动AllocateMappedFileService的线程监听创建

 /**
     * TODO commitLog 创建预处理封装的核心
     * @param nextFilePath
     * @param nextNextFilePath
     * @param fileSize
     * @return
     */
    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) 
        int canSubmitRequests = 2;
        if (this.messageStore.isTransientStorePoolEnable()) 
            if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole())  //if broker is slave, don\'t fast fail even no buffer in pool
                canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
            
        

        //封装一个AllocateRequest放在队列里,异步线程方式去获取执行
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

        if (nextPutOK) 
            if (canSubmitRequests <= 0) 
                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                    "RequestQueueSize : , StorePoolSize: ", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextFilePath);
                return null;
            
            boolean offerOK = this.requestQueue.offer(nextReq);
            if (!offerOK) 
                log.warn("never expected here, add a request to preallocate queue failed");
            
            canSubmitRequests--;
        

        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        if (nextNextPutOK) 
            if (canSubmitRequests <= 0) 
                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                    "RequestQueueSize : , StorePoolSize: ", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextNextFilePath);
             else 
                boolean offerOK = this.requestQueue.offer(nextNextReq);
                if (!offerOK) 
                    log.warn("never expected here, add a request to preallocate queue failed");
                
            
        

        if (hasException) 
            log.warn(this.getServiceName() + " service has exception. so return null");
            return null;
        
        // 阻塞等待AllocateMapFile线程创建好文件并返回
        AllocateRequest result = this.requestTable.get(nextFilePath);
        try 
            if (result != null) 
                messageStore.getPerfCounter().startTick("WAIT_MAPFILE_TIME_MS");
                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                messageStore.getPerfCounter().endTick("WAIT_MAPFILE_TIME_MS");
                if (!waitOK) 
                    log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                    return null;
                 else 
                    this.requestTable.remove(nextFilePath);
                    return result.getMappedFile();
                
             else 
                log.error("find preallocate mmap failed, this never happen");
            
         catch (InterruptedException e) 
            log.warn(this.getServiceName() + " service has exception. ", e);
        

  在 Broker 初始化时会启动管理 MappedFile 创建的 AllocateMappedFileService 异步线程。消息处理线程 和 AllocateMappedFileService 线程通过队列 requestQueue 关联。

  消息写入时调用 AllocateMappedFileService 的 putRequestAndReturnMappedFile 方法往 requestQueue 放入提交创建 MappedFile 请求,这边会同时构建两个 AllocateRequest 放入队列。

  AllocateMappedFileService 线程循环从 requestQueue 获取 AllocateRequest 来创建 MappedFile。消息处理线程通过 CountDownLatch 等待获取第一个 MappedFile 创建成功就返回。

  当消息处理线程需要再次创建 MappedFile 时,此时可以直接获取之前已预创建的 MappedFile。这样通过预创建 MappedFile ,减少文件创建等待时间。

 

  • store消息存储全流程

  

  从图上可以看到,从生产者到消费者,store扮演了重要的角色。

  生产者发送消息后,会进行消息存盘,消费者消费消息后,会进行消费进度存盘。

  下面我们详细说说store的流程

 

  • 消息存储-从生产者到磁盘

  消息被生产者创建并发送到broker后,会对消息先进行存盘。如果是异步消息,存盘是由单独的子线程定时去处理的,如果是同步消息,则会阻塞等待消息处理完成后再进行返回。

  消息首先会经过producer,组装后会通过netty发送给broker,我们只关系broker的处理流程,如果想了解生产者之前的处理方式,可参考之前的文章。

  首先,broker中processor是broker对client基于netty的一些动作通知的封装,AbstractSendMessageProcessor上层会封装一些基本功能,例如消息重试,消息发送私信队列,以及一些beforeHook和afterHook前后置处理钩子函数,在producer发送sendMessage动作后,会将req发送至SendMessageProcessor,SendMessageProcessor 是client做sendMessage动作时,broker处理发送消息的加工者。

  

public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException 
        SendMessageContext sendMessageContext;
        switch (request.getCode()) 
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.consumerSendMsgBack(ctx, request);
            default:
                //发送成功的处理
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) 
                    return null;
                
                TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true);
                RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext);
                if (rewriteResult != null) 
                    return rewriteResult;
                
                sendMessageContext = buildMsgContext(ctx, requestHeader, request);
                try 
                    //加载前置钩子函数
                    this.executeSendMessageHookBefore(sendMessageContext);
                 catch (AbortProcessException e) 
                    final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage());
                    errorResponse.setOpaque(request.getOpaque());
                    return errorResponse;
                

                RemotingCommand response;
                //针对单消息处理和批量消息处理,并执行后置钩子函数
                if (requestHeader.isBatch()) 
                    response = this.sendBatchMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
                        (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1));
                 else 
                    response = this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext,
                        (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12));
                

                return response;
        
    

  如果消息是重试消息,则将消息发送到%retry%-topic队列进行重试,并处理重试等级及重试次数。

  这里最核心的是针对单消息处理和批量消息处理,对应的是处理单消息和多消息,broker封装的MessageBatch就是批量消息。

  

public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
        final RemotingCommand request,
        final SendMessageContext sendMessageContext,
        final SendMessageRequestHeader requestHeader,
        final TopicQueueMappingContext mappingContext,
        final SendMessageCallback sendMessageCallback) throws RemotingCommandException 

        final RemotingCommand response = preSend(ctx, request, requestHeader);
        if (response.getCode() != -1) 
            return response;
        

        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
        //获取消息内容
        final byte[] body = request.getBody();

        //获取消息指定队列id
        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        //如果队列id小于0,默认是非法的id,则重新分配一个队列进行绑定
        if (queueIdInt < 0) 
            queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
        

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);

        Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        //如果是重试消息或达到最大次数进入死信队列的消息,则直接返回
        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) 
            return response;
        

        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());

        String uniqKey = oriProps.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
        if (uniqKey == null || uniqKey.length() <= 0) 
            uniqKey = MessageClientIDSetter.createUniqID();
            oriProps.put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, uniqKey);
        

        MessageAccessor.setProperties(msgInner, oriProps);

        CleanupPolicy cleanupPolicy = CleanupPolicyUtils.getDeletePolicy(Optional.of(topicConfig));
        if (Objects.equals(cleanupPolicy, CleanupPolicy.COMPACTION)) 
            if (StringUtils.isBlank(msgInner.getKeys())) 
                response.setCode(ResponseCode.MESSAGE_ILLEGAL);
                response.setRemark("Required message key is missing");
                return response;
            
        

        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);

        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

        // Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
        String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        boolean sendTransactionPrepareMessage = false;
        if (Boolean.parseBoolean(traFlag)
            && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0))  //For client under version 4.6.1
            /**
             * 如果当前消息已经被消费者消费了不止一次,或者它的消费次数大于0,说明它已经是一个重复消费的消息了,如果它是一个事务消息,这是不允许的
             */
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) 
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
                return response;
            
            sendTransactionPrepareMessage = true;
        

        long beginTimeMillis = this.brokerController.getMessageStore().now();

        /**
         * TODO 这是才是针对消息做的处理,根据broker同步或异步模型,则针对事务消息和普通消息做消息的处理
         */
        if (brokerController.getBrokerConfig().isAsyncSendEnable()) 
            CompletableFuture<PutMessageResult> asyncPutMessageFuture;
            //putMessage 是处理store 消息存储的核心
            if (sendTransactionPrepareMessage) 
                /**
                 * @see org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl.asyncPrepareMessage
                 * 将消息包装成half消息
                 */
                asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
             else 
                asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
            

            final int finalQueueIdInt = queueIdInt;
            final MessageExtBrokerInner finalMsgInner = msgInner;
            /**
             * 处理完成后,异步回调handlePutMessageResult,如果是同步模型,则阻塞handlePutMessageResult等待处理,这里跟下文else中处理方式类似,只是采用非阻塞的异步任务处理
             */
            asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> 
                RemotingCommand responseFuture =
                    handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
                        ctx, finalQueueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
                if (responseFuture != null) 
                    doResponse(ctx, request, responseFuture);
                
                sendMessageCallback.onComplete(sendMessageContext, response);
            , this.brokerController.getPutMessageFutureExecutor());
            // Returns null to release the send message thread
            return null;
         else 
            PutMessageResult putMessageResult = null;
            if (sendTransactionPrepareMessage) 
                putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
             else 
                putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
            
            handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
            sendMessageCallback.onComplete(sendMessageContext, response);
            return response;
        
    

  首先进行前期的组装,消息体,设置队列id,丢弃一部分不合法消息,如重试消息或达到死信队列的消息。

  再将消息进行分类,如果是异步消息,且消息类型为事务消息,则异步处理一个asyncHalf,如果是其他类型的消息,根据消息内容进行异步的存储

//putMessage 是处理store 消息存储的核心
            if (sendTransactionPrepareMessage) 
                /**
                 * @see org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl.asyncPrepareMessage
                 * 将消息包装成half消息
                 */
                asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
             else 
                asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
            

  等待future处理完成后,异步回调handlePutMessageResult,如果是同步模型,则阻塞handlePutMessageResult等待处理,这里跟下文else中处理方式类似,只是采用非阻塞的异步任务处理;同步方式处理的流程是一样的,只是使用主线程阻塞处理。

  如果是采取异步处理,根据上一次的刷盘时间和策略定义3000ms时间进行线程监控,监控流程类似jdk9中对completableFuture中使用get阻塞超时时间。

@Override
    public PutMessageResult putMessage(MessageExtBrokerInner msg) 
        return waitForPutResult(asyncPutMessage(msg));
    
//future异步任务的超时处理
    private PutMessageResult waitForPutResult(CompletableFuture<PutMessageResult> putMessageResultFuture) 
        try 
            int putMessageTimeout =
                Math.max(this.messageStoreConfig.getSyncFlushTimeout(),
                    this.messageStoreConfig.getSlaveTimeout()) + 5000;
            return putMessageResultFuture.get(putMessageTimeout, TimeUnit.MILLISECONDS);
         catch (ExecutionException | InterruptedException e) 
            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
         catch (TimeoutException e) 
            LOGGER.error("usually it will never timeout, putMessageTimeout is much bigger than slaveTimeout and "
                + "flushTimeout so the result can be got anyway, but in some situations timeout will happen like full gc "
                + "process hangs or other unexpected situations.");
            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
        
    

  真正对消息存储的处理,在DefaultMessageStore的asyncPutMessage中

public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) 

        //先指定初始化的前置钩子函数
        for (PutMessageHook putMessageHook : putMessageHookList) 
            PutMessageResult handleResult = putMessageHook.executeBeforePutMessage(msg);
            if (handleResult != null) 
                return CompletableFuture.completedFuture(handleResult);
            
        

        /**
         * 检查消息的格式,如果格式不合法则直接中断
         */
        if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
            && !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) 
            LOGGER.warn("[BUG]The message had property  but is not an inner batch", MessageConst.PROPERTY_INNER_NUM);
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
        

        if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) 
            Optional<TopicConfig> topicConfig = this.getTopicConfig(msg.getTopic());
            if (!QueueTypeUtils.isBatchCq(topicConfig)) 
                LOGGER.error("[BUG]The message is an inner batch but cq type is not batch cq");
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
            
        

        long beginTime = this.getSystemClock().now();
        //commitLog处理消息
        CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

        /**
         * 计算future存储消息所用的时间并将其更新
         */
        putResultFuture.thenAccept(result -> 
            long elapsedTime = this.getSystemClock().now() - beginTime;
            if (elapsedTime > 500) 
                LOGGER.warn("DefaultMessageStore#putMessage: CommitLog#putMessage cost ms, topic=, bodyLength=",
                    elapsedTime, msg.getTopic(), msg.getBody().length);
            
            this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

            if (null == result || !result.isOk()) 
                //如果处理失败,则增加一次保存消息失败的次数
                this.storeStatsService.getPutMessageFailedTimes().add(1);
            
        );

        return putResultFuture;
    

  可以看到其实asyncPutMessage将处理结果封装成completableFuture异步执行,开始先做了HookBefore的前置钩子函数,然后检查消息格式以及topic的配置,最后在处理完成后更新了处理的时间和失败次数在storeStatus的成员变量中。其中最核心的操作其实是 CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg); ,它是根据消息进行append,最核心的处理文件的方式就是mappedFileChannel

/**
     * TODO 核心存储消息的代码
     * @param msg
     * @return
     */
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) 
        // Set the storage time
        if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) 
            msg.setStoreTimestamp(System.currentTimeMillis());
        

        // Set the message body CRC (consider the most appropriate setting on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        msg.setVersion(MessageVersion.MESSAGE_VERSION_V1);
        boolean autoMessageVersionOnTopicLen =
            this.defaultMessageStore.getMessageStoreConfig().isAutoMessageVersionOnTopicLen();
        if (autoMessageVersionOnTopicLen && topic.length() > Byte.MAX_VALUE) 
            msg.setVersion(MessageVersion.MESSAGE_VERSION_V2);
        

        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) 
            msg.setBornHostV6Flag();
        

        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) 
            msg.setStoreHostAddressV6Flag();
        

        //获取本地线程的变量,并更新最大消息大小
        PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
        updateMaxMessageSize(putMessageThreadLocal);
        //根据topic和queue的messgae信息组装成一个唯一的topicQueueKey 格式为:topic-queueId
        String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg);
        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        //TODO 获取上一次操作的mapperFile 也就是最后的一个mapped
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        //如果当前没有mappedFile 说明是第一次创建,则从最开始进行位置计算
        long currOffset;
        if (mappedFile == null) 
            currOffset = 0;
         else 
            //如果有说明当前的消息应该存储在 当前commit文件名的位置加上当前指针已经偏移的位置
            currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
        

        //计算需要ack的数量以及是否需要做HA通知broker
        int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas();
        boolean needHandleHA = needHandleHA(msg);

        if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) 
            if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) 
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
            
            if (this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) 
                // -1 means all ack in SyncStateSet
                needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET;
            
         else if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) 
            int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(),
                this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset));
            needAckNums = calcNeedAckNums(inSyncReplicas);
            if (needAckNums > inSyncReplicas) 
                // Tell the producer, don\'t have enough slaves to handle the send request
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null));
            
        

        //对当前指定的key进行锁定,当前key说明是一个topic下一个队列
        topicQueueLock.lock(topicQueueKey);
        try 

            boolean needAssignOffset = true;
            if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()
                && defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) 
                needAssignOffset = false;
            
            if (needAssignOffset) 
                defaultMessageStore.assignOffset(msg, getMessageNum(msg));
            

            PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
            if (encodeResult != null) 
                return CompletableFuture.completedFuture(encodeResult);
            
            msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
            //存储消息的上下文
            PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);

            //spin或ReentrantLock,具体取决于存储配置
            putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
            try 
                //加锁成功后的时间
                long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
                this.beginTimeInLock = beginLockTimestamp;

                // Here settings are stored timestamp, in order to ensure an orderly
                // global
                //设置存储时间为加锁成功后的时间,保证顺序
                if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) 
                    msg.setStoreTimestamp(beginLockTimestamp);
                

                //如果当前没有mapped或mapped已经满了,则会创建新的mapped
                if (null == mappedFile || mappedFile.isFull()) 
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
                
                if (null == mappedFile) 
                    log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
                

                //追加写入的内容
                result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                switch (result.getStatus()) 
                    case PUT_OK:
                        onCommitLogAppend(msg, result, mappedFile);
                        break;
                    case END_OF_FILE:
                        //如果文件空间不足,重新初始化文件并尝试重新写入
                        onCommitLogAppend(msg, result, mappedFile);
                        unlockMappedFile = mappedFile;
                        // Create a new file, re-write the message
                        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                        if (null == mappedFile) 
                            // XXX: warn and notify me
                            log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                            beginTimeInLock = 0;
                            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
                        
                        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                        if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) 
                            onCommitLogAppend(msg, result, mappedFile);
                        
                        break;
                    case MESSAGE_SIZE_EXCEEDED:
                    case PROPERTIES_SIZE_EXCEEDED:
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                    case UNKNOWN_ERROR:
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                    default:
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                

                //更新使用的时间
                elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
                beginTimeInLock = 0;
             finally 
                //释放锁
                putMessageLock.unlock();
            
         finally 
            //释放锁
            topicQueueLock.unlock(topicQueueKey);
        

        if (elapsedTimeInLock > 500) 
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)=, bodyLength= AppendMessageResult=", elapsedTimeInLock, msg.getBody().length, result);
        

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) 
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        //存储缓存数据副本的更新
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

        //提交刷盘请求,提交副本请求
        return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);
    

  先设置一些基本数据,如存储时间,brokerHost,storeHost,获取本地变量LocalThread,更新最大的消息存储大小;

  根据topic和queue的messgae信息组装成一个唯一的topicQueueKey 格式为:topic-queueId;获取上一次操作的mapperFile 也就是最后的一个mapped,因为消息的写入是append追加的,消息的持久化都是集中存储的;

  如果没有获取到使用过的mappedFileChannel,说明这条消息可能是第一条,那么就创建一个fileChannel通道,如果没有消息那么消费的初始点位肯定是0,如果获取到了fileChannel,其实对应的commitlog文件的名称就是这个文件最开始的消费点位,那么当前消息对应的消费点位其实就是获取到的mappedFile的文件名称 + 当前消息所处的offSet的位置 就是这个文件存储的位置;

  校验HA和ack;

  先对 topicQueueKey进行锁定,这个key生成的规则是topic下的一个queue,计算这次消费的消费点位;

  定义存储消息的上下文 PutMessageContext:

public class PutMessageContext 
    private String topicQueueTableKey;//锁定的key
    private long[] phyPos;
    private int batchSize;//批量数据的大小

    public PutMessageContext(String topicQueueTableKey) 
        this.topicQueueTableKey = topicQueueTableKey;
    

  对putMessageLock进行锁定:这里锁定有两种方式:自旋锁和重入锁

/**
 * Spin lock Implementation to put message, suggest using this with low race conditions
 */
public class PutMessageSpinLock implements PutMessageLock 
    //true: Can lock, false : in lock.
    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);

    @Override
    public void lock() 
        boolean flag;
        do 
            flag = this.putMessageSpinLock.compareAndSet(true, false);
        
        while (!flag);
    

    @Override
    public void unlock() 
        this.putMessageSpinLock.compareAndSet(false, true);
    
/**
 * Exclusive lock implementation to put message
 */
public class PutMessageReentrantLock implements PutMessageLock 
    private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync

    @Override
    public void lock() 
        putMessageNormalLock.lock();
    

    @Override
    public void unlock() 
        putMessageNormalLock.unlock();
    

  在rocket4.X之后,应该都是默认true,异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁,调整Broker配置项`useReentrantLockWhenPutMessage`,默认为false;异步刷盘建议开启`TransientStorePoolEnable`;建议关闭transferMsgByHeap,提高拉消息效率;同步刷盘建议适当增大`sendMessageThreadPoolNums`,具体配置需要经过压测

  设置成功加锁后的时间,保证了操作的顺序。上一步获取的mappedFile如果没有获取到或者已经获取满了,则需要创建新的mappedFile;

   /**
     * TODO 预处理创建新的commitLog
     * @return
     */
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) 
        long createOffset = -1;
        /**
         * 获取最新的mappedFile
         */
        MappedFile mappedFileLast = getLastMappedFile();

        //如果获取不到,则说明是第一次创建文件
        if (mappedFileLast == null) 
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        

        /**
         * 如果文件写满了,则需要计算下一个文件的初始量 其实就是上一个文件最后的偏移量的下一个
         */
        if (mappedFileLast != null && mappedFileLast.isFull()) 
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        

        //创建新的commitLog
        if (createOffset != -1 && needCreate) 
            return tryCreateMappedFile(createOffset);
        

        return mappedFileLast;
    

  追加需要写入的数据 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);

/**
     * TODO append 统一为fileChannel 对文件的写入 提供了单消息和批量消息的写入
     */
    public AppendMessageResult appendMessage(final ByteBuffer byteBufferMsg, final CompactionAppendMsgCallback cb) 
        assert byteBufferMsg != null;
        assert cb != null;

        //获取当前写入的位置
        int currentPos = WROTE_POSITION_UPDATER.get(this);
        //当前写入的位置需要比文件最大的位数要小
        if (currentPos < this.fileSize) 
            //根据appendMessageBuffer选择是否写入writeBuffer还是mapperByteBuffer 异步刷盘应该写入writeBuffer 再定时写到mapperBuffer
            ByteBuffer byteBuffer = appendMessageBuffer().slice();
            //修改写入位置
            byteBuffer.position(currentPos);
            AppendMessageResult result = cb.doAppend(byteBuffer, this.fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
            //AtomicInteger累计更新写入的位置 WROTE_POSITION_UPDATER其实就是当前已经存储文件的字节
            WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
            //更新最后一次写入时间
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        
        log.error("MappedFile.appendMessage return null, wrotePosition:  fileSize: ", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    

  写入处理后,根据响应状态处理,store提供了 onCommitLogAppend的提交后追加处理,如果当前写入失败是因为写入的长度不满足,则尝试重新创建文件并写入

switch (result.getStatus()) 
                    case PUT_OK:
                        onCommitLogAppend(msg, result, mappedFile);
                        break;
                    case END_OF_FILE:
                        //如果文件空间不足,重新初始化文件并尝试重新写入
                        onCommitLogAppend(msg, result, mappedFile);
                        unlockMappedFile = mappedFile;
                        // Create a new file, re-write the message
                        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                        if (null == mappedFile) 
                            // XXX: warn and notify me
                            log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                            beginTimeInLock = 0;
                            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
                        
                        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                        if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) 
                            onCommitLogAppend(msg, result, mappedFile);
                        
                        break;
                    case MESSAGE_SIZE_EXCEEDED:
                    case PROPERTIES_SIZE_EXCEEDED:
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                    case UNKNOWN_ERROR:
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                    default:
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                

  处理完成后,释放锁,缓存数据副本更新,提交刷盘并提交HA

 /**
     * 通知刷盘并HA的核心代码
     * @return
     */
    private CompletableFuture<PutMessageResult> handleDiskFlushAndHA(PutMessageResult putMessageResult,
        MessageExt messageExt, int needAckNums, boolean needHandleHA) 
        /**
         * 同步刷盘或异步刷盘的任务
         */
        CompletableFuture<PutMessageStatus> flushResultFuture = handleDiskFlush(putMessageResult.getAppendMessageResult(), messageExt);
        CompletableFuture<PutMessageStatus> replicaResultFuture;
        if (!needHandleHA) 
            replicaResultFuture = CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
         else 
            replicaResultFuture = handleHA(putMessageResult.getAppendMessageResult(), putMessageResult, needAckNums);
        

        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> 
            if (flushStatus != PutMessageStatus.PUT_OK) 
                putMessageResult.setPutMessageStatus(flushStatus);
            
            if (replicaStatus != PutMessageStatus.PUT_OK) 
                putMessageResult.setPutMessageStatus(replicaStatus);
            
            return putMessageResult;
        );
    
@Override
        public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) 
            // Synchronization flush
            //同步刷盘
            if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) 
                final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
                if (messageExt.isWaitStoreMsgOK()) 
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    //将刷盘request:GroupCommitRequest放入commitRequests
                    flushDiskWatcher.add(request);
                    service.putRequest(request);
                    return request.future();
                 else 
                    //唤醒线程去消费
                    service.wakeup();
                    return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
                
            
            // Asynchronous flush
            

以上是关于打怪升级rocketMqrocket的持久化的主要内容,如果未能解决你的问题,请参考以下文章

打怪升级rocketMqproducer源码分析

Flask连接数据库打怪升级之旅

python打怪升级之路

详解Linux运维工程师打怪升级篇

SZTUOJ 1009.打怪升级

打怪升级