打怪升级rocketMqrocket的持久化
Posted fisher
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了打怪升级rocketMqrocket的持久化相关的知识,希望对你有一定的参考价值。
rocket持久化保证的思想有两点:1是刷盘保证大部分数据不丢失;2是持久化文件的处理,零拷贝技术和内存页,NIO模型保证处理能力
-
文件持久化目录
├──abort:rocket broker启动检查的文件,正常启动会写入一个abort,正常退出会删除abort,通过它来判断上一次是否异常退出
├──checkpoint:随着broker启动,加载的历史检查点
├──lock:全局资源的文件锁
├──commitlog:broker存储的核心,我们都是到rocket是broker集中存储,落地存盘就存在commitlog里
│ ├──00000000000000000000(示例)rocket会对commitlog进行预创建,并将消息写入,每次创建的文件根据当前文件偏移量决定,例如第一次创建就是00000000000000000000
├──compaction:(基于rocket 5.0)
│ ├──position-checkpoint:缓存上一次消费的检查点,每次处理完成后会更新
├──config:
│ ├──consumerFilter.json:存储对应topic下的消息过滤规则:ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>
│ ├──consumerOffset.json:根据消费者组存储的每个消费者消费点位:ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>>
│ ├──consumerOrderInfo.json:顺序消息顺序:ConcurrentHashMap<String/* topic@group*/, ConcurrentHashMap<Integer/*queueId*/, OrderInfo>>
│ ├──delayOffset.json:针对消费者pull的延时队列拉取消费点位
│ ├──subscriptionGroup.json:消费者组对应订阅的消息信息,其实就是broker接收的消费者信息
│ ├──topics.json:存储对应的topic信息
│ ├──timercheck:基于定时消息的时间轮配置文件,rocket5.0以上版本
│ ├──timermetrics:基于定时消息的时间轮配置文件,rocket5.0以上版本
├──consumequeue:broker对应topic下队列的消费信息
│ ├──%topicName:主题名称
│ │ ├──%queueId:队列id
│ │ │ ├──00000000000000000000:消费点位
├──index:索引文目录
│ ├──00000000000000000000:索引文件,快速定位commitlog中的消息位置
└──timerwheel:基于时间轮算法实现定时消息的配置
这些文件是broker支持容灾的基础,rocket集群其实就是broker集群的能力,通过这些配置文件可以做到不丢失,在broker启动时会加载对应的配置。
/** * 上层抽象的配置工厂,在broker启动时会根据组件依次加载,并将文件读取到变量中。例如consumerOffsetTable * 抽象类下每一个manager加载对应的配置信息 */ public abstract class ConfigManager private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); public abstract String encode();
-
store存储
rocket基于文件的处理,底层是采用mmap的方式和NIO的byteBuffer,在store上层封装了基本的组件
/** * TODO store消息处理的核心对象 mappedFile封装了对消息处理 写入 * NIO 的文件到磁盘的处理工具 */ public class DefaultMappedFile extends AbstractMappedFile // 操作系统数据页 4K,unix系列通常是这个大小 public static final int OS_PAGE_SIZE = 1024 * 4; public static final Unsafe UNSAFE = getUnsafe(); private static final Method IS_LOADED_METHOD; public static final int UNSAFE_PAGE_SIZE = UNSAFE == null ? OS_PAGE_SIZE : UNSAFE.pageSize(); protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); // mq总共分配的映射文件内存大小 protected static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0); // mq总共创建的内存文件映射数量 protected static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0); protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> WROTE_POSITION_UPDATER; protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> COMMITTED_POSITION_UPDATER; protected static final AtomicIntegerFieldUpdater<DefaultMappedFile> FLUSHED_POSITION_UPDATER; // 当前数据的写入位置指针,下次写数据从此开始写入 protected volatile int wrotePosition; // 当前数据的提交指针,指针之前的数据已提交到fileChannel,commitPos~writePos之间的数据是还未提交到fileChannel的 protected volatile int committedPosition; // 当前数据的刷盘指针,指针之前的数据已落盘,commitPos~flushedPos之间的数据是还未落盘的 protected volatile int flushedPosition; //文件大小 字节 protected int fileSize; // TODO 磁盘文件的内存文件通道对象 也是mmap的方式体现 protected FileChannel fileChannel; /** * Message will put to here first, and then reput to FileChannel if writeBuffer is not null. */ // 异步刷盘时数据先写入writeBuf,由CommitRealTime线程定时200ms提交到fileChannel内存,再由FlushRealTime线程定时500ms刷fileChannel落盘 protected ByteBuffer writeBuffer = null; // 堆外内存池,服务于异步刷盘机制,为了减少内存申请和销毁的时间,提前向OS申请并锁定一块对外内存池,writeBuf就从这里获取 protected TransientStorePool transientStorePool = null; // 文件起始的字节 protected String fileName; // 文件的初始消费点位,跟文件的命名相关 例如 00000000000000000000 就代表从0开始,默认一个commitLog是1G 大小,那么超过之后会生成新的commitLog 文件名称就是当前文件起始的偏移量 protected long fileFromOffset; protected File file; // 磁盘文件的内存映射对象,同步刷盘时直接将数据写入到mapedBuf protected MappedByteBuffer mappedByteBuffer; // 最近操作的时间戳 protected volatile long storeTimestamp = 0; protected boolean firstCreateInQueue = false; private long lastFlushTime = -1L; protected MappedByteBuffer mappedByteBufferWaitToClean = null; protected long swapMapTime = 0L; protected long mappedByteBufferAccessCountSinceLastSwap = 0L;
首先,核心的DefaultMappedFile 使用了 FileChannel 通道,它也是基于mmap的实现零拷贝技术。
其中它定义了三个指针,分别是
wrotePosition:当前数据的写入位置指针,下次写数据从此开始写入
committedPosition:当前数据的提交指针,指针之前的数据已提交到fileChannel,commitPos~writePos之间的数据是还未提交到fileChannel的
flushedPosition:当前数据的刷盘指针,指针之前的数据已落盘,commitPos~flushedPos之间的数据是还未落盘的
同时,定义了ByteBuffer,基于NIO在异步刷盘时,先会将数据写入byteBuffer,然后会有定时线程会定时拉取到fileChannel通道,最后将fileChannel进行刷盘
/** * 根据队列中的AllocateRequest创建下一个commitLog */ public void run() log.info(this.getServiceName() + " service started"); while (!this.isStopped() && this.mmapOperation()) log.info(this.getServiceName() + " service end");
AllocateRequest封装的是对commitLog预处理的动作,AllocateRequest是对预创建commitLog的封装,会在处理时预创建并将放入队列,在store启动时会启动AllocateMappedFileService的线程监听创建
/** * TODO commitLog 创建预处理封装的核心 * @param nextFilePath * @param nextNextFilePath * @param fileSize * @return */ public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) int canSubmitRequests = 2; if (this.messageStore.isTransientStorePoolEnable()) if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) //if broker is slave, don\'t fast fail even no buffer in pool canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size(); //封装一个AllocateRequest放在队列里,异步线程方式去获取执行 AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize); boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null; if (nextPutOK) if (canSubmitRequests <= 0) log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " + "RequestQueueSize : , StorePoolSize: ", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums()); this.requestTable.remove(nextFilePath); return null; boolean offerOK = this.requestQueue.offer(nextReq); if (!offerOK) log.warn("never expected here, add a request to preallocate queue failed"); canSubmitRequests--; AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize); boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null; if (nextNextPutOK) if (canSubmitRequests <= 0) log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " + "RequestQueueSize : , StorePoolSize: ", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums()); this.requestTable.remove(nextNextFilePath); else boolean offerOK = this.requestQueue.offer(nextNextReq); if (!offerOK) log.warn("never expected here, add a request to preallocate queue failed"); if (hasException) log.warn(this.getServiceName() + " service has exception. so return null"); return null; // 阻塞等待AllocateMapFile线程创建好文件并返回 AllocateRequest result = this.requestTable.get(nextFilePath); try if (result != null) messageStore.getPerfCounter().startTick("WAIT_MAPFILE_TIME_MS"); boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS); messageStore.getPerfCounter().endTick("WAIT_MAPFILE_TIME_MS"); if (!waitOK) log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize()); return null; else this.requestTable.remove(nextFilePath); return result.getMappedFile(); else log.error("find preallocate mmap failed, this never happen"); catch (InterruptedException e) log.warn(this.getServiceName() + " service has exception. ", e);
在 Broker 初始化时会启动管理 MappedFile 创建的 AllocateMappedFileService 异步线程。消息处理线程 和 AllocateMappedFileService 线程通过队列 requestQueue 关联。
消息写入时调用 AllocateMappedFileService 的 putRequestAndReturnMappedFile 方法往 requestQueue 放入提交创建 MappedFile 请求,这边会同时构建两个 AllocateRequest 放入队列。
AllocateMappedFileService 线程循环从 requestQueue 获取 AllocateRequest 来创建 MappedFile。消息处理线程通过 CountDownLatch 等待获取第一个 MappedFile 创建成功就返回。
当消息处理线程需要再次创建 MappedFile 时,此时可以直接获取之前已预创建的 MappedFile。这样通过预创建 MappedFile ,减少文件创建等待时间。
-
store消息存储全流程
从图上可以看到,从生产者到消费者,store扮演了重要的角色。
生产者发送消息后,会进行消息存盘,消费者消费消息后,会进行消费进度存盘。
下面我们详细说说store的流程
-
消息存储-从生产者到磁盘
消息被生产者创建并发送到broker后,会对消息先进行存盘。如果是异步消息,存盘是由单独的子线程定时去处理的,如果是同步消息,则会阻塞等待消息处理完成后再进行返回。
消息首先会经过producer,组装后会通过netty发送给broker,我们只关系broker的处理流程,如果想了解生产者之前的处理方式,可参考之前的文章。
首先,broker中processor是broker对client基于netty的一些动作通知的封装,AbstractSendMessageProcessor上层会封装一些基本功能,例如消息重试,消息发送私信队列,以及一些beforeHook和afterHook前后置处理钩子函数,在producer发送sendMessage动作后,会将req发送至SendMessageProcessor,SendMessageProcessor 是client做sendMessage动作时,broker处理发送消息的加工者。
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException SendMessageContext sendMessageContext; switch (request.getCode()) case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); default: //发送成功的处理 SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) return null; TopicQueueMappingContext mappingContext = this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader, true); RemotingCommand rewriteResult = this.brokerController.getTopicQueueMappingManager().rewriteRequestForStaticTopic(requestHeader, mappingContext); if (rewriteResult != null) return rewriteResult; sendMessageContext = buildMsgContext(ctx, requestHeader, request); try //加载前置钩子函数 this.executeSendMessageHookBefore(sendMessageContext); catch (AbortProcessException e) final RemotingCommand errorResponse = RemotingCommand.createResponseCommand(e.getResponseCode(), e.getErrorMessage()); errorResponse.setOpaque(request.getOpaque()); return errorResponse; RemotingCommand response; //针对单消息处理和批量消息处理,并执行后置钩子函数 if (requestHeader.isBatch()) response = this.sendBatchMessage(ctx, request, sendMessageContext, requestHeader, mappingContext, (ctx1, response1) -> executeSendMessageHookAfter(response1, ctx1)); else response = this.sendMessage(ctx, request, sendMessageContext, requestHeader, mappingContext, (ctx12, response12) -> executeSendMessageHookAfter(response12, ctx12)); return response;
如果消息是重试消息,则将消息发送到%retry%-topic队列进行重试,并处理重试等级及重试次数。
这里最核心的是针对单消息处理和批量消息处理,对应的是处理单消息和多消息,broker封装的MessageBatch就是批量消息。
public RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader, final TopicQueueMappingContext mappingContext, final SendMessageCallback sendMessageCallback) throws RemotingCommandException final RemotingCommand response = preSend(ctx, request, requestHeader); if (response.getCode() != -1) return response; final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader(); //获取消息内容 final byte[] body = request.getBody(); //获取消息指定队列id int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); //如果队列id小于0,默认是非法的id,则重新分配一个队列进行绑定 if (queueIdInt < 0) queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()); MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); //如果是重试消息或达到最大次数进入死信队列的消息,则直接返回 if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig, oriProps)) return response; msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); String uniqKey = oriProps.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (uniqKey == null || uniqKey.length() <= 0) uniqKey = MessageClientIDSetter.createUniqID(); oriProps.put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, uniqKey); MessageAccessor.setProperties(msgInner, oriProps); CleanupPolicy cleanupPolicy = CleanupPolicyUtils.getDeletePolicy(Optional.of(topicConfig)); if (Objects.equals(cleanupPolicy, CleanupPolicy.COMPACTION)) if (StringUtils.isBlank(msgInner.getKeys())) response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark("Required message key is missing"); return response; msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags())); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); // Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); boolean sendTransactionPrepareMessage = false; if (Boolean.parseBoolean(traFlag) && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) //For client under version 4.6.1 /** * 如果当前消息已经被消费者消费了不止一次,或者它的消费次数大于0,说明它已经是一个重复消费的消息了,如果它是一个事务消息,这是不允许的 */ if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; sendTransactionPrepareMessage = true; long beginTimeMillis = this.brokerController.getMessageStore().now(); /** * TODO 这是才是针对消息做的处理,根据broker同步或异步模型,则针对事务消息和普通消息做消息的处理 */ if (brokerController.getBrokerConfig().isAsyncSendEnable()) CompletableFuture<PutMessageResult> asyncPutMessageFuture; //putMessage 是处理store 消息存储的核心 if (sendTransactionPrepareMessage) /** * @see org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl.asyncPrepareMessage * 将消息包装成half消息 */ asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); else asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner); final int finalQueueIdInt = queueIdInt; final MessageExtBrokerInner finalMsgInner = msgInner; /** * 处理完成后,异步回调handlePutMessageResult,如果是同步模型,则阻塞handlePutMessageResult等待处理,这里跟下文else中处理方式类似,只是采用非阻塞的异步任务处理 */ asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> RemotingCommand responseFuture = handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext, ctx, finalQueueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader)); if (responseFuture != null) doResponse(ctx, request, responseFuture); sendMessageCallback.onComplete(sendMessageContext, response); , this.brokerController.getPutMessageFutureExecutor()); // Returns null to release the send message thread return null; else PutMessageResult putMessageResult = null; if (sendTransactionPrepareMessage) putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); else putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader)); sendMessageCallback.onComplete(sendMessageContext, response); return response;
首先进行前期的组装,消息体,设置队列id,丢弃一部分不合法消息,如重试消息或达到死信队列的消息。
再将消息进行分类,如果是异步消息,且消息类型为事务消息,则异步处理一个asyncHalf,如果是其他类型的消息,根据消息内容进行异步的存储
//putMessage 是处理store 消息存储的核心 if (sendTransactionPrepareMessage) /** * @see org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl.asyncPrepareMessage * 将消息包装成half消息 */ asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); else asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
等待future处理完成后,异步回调handlePutMessageResult,如果是同步模型,则阻塞handlePutMessageResult等待处理,这里跟下文else中处理方式类似,只是采用非阻塞的异步任务处理;同步方式处理的流程是一样的,只是使用主线程阻塞处理。
如果是采取异步处理,根据上一次的刷盘时间和策略定义3000ms时间进行线程监控,监控流程类似jdk9中对completableFuture中使用get阻塞超时时间。
@Override public PutMessageResult putMessage(MessageExtBrokerInner msg) return waitForPutResult(asyncPutMessage(msg));
//future异步任务的超时处理 private PutMessageResult waitForPutResult(CompletableFuture<PutMessageResult> putMessageResultFuture) try int putMessageTimeout = Math.max(this.messageStoreConfig.getSyncFlushTimeout(), this.messageStoreConfig.getSlaveTimeout()) + 5000; return putMessageResultFuture.get(putMessageTimeout, TimeUnit.MILLISECONDS); catch (ExecutionException | InterruptedException e) return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null); catch (TimeoutException e) LOGGER.error("usually it will never timeout, putMessageTimeout is much bigger than slaveTimeout and " + "flushTimeout so the result can be got anyway, but in some situations timeout will happen like full gc " + "process hangs or other unexpected situations."); return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
真正对消息存储的处理,在DefaultMessageStore的asyncPutMessage中
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) //先指定初始化的前置钩子函数 for (PutMessageHook putMessageHook : putMessageHookList) PutMessageResult handleResult = putMessageHook.executeBeforePutMessage(msg); if (handleResult != null) return CompletableFuture.completedFuture(handleResult); /** * 检查消息的格式,如果格式不合法则直接中断 */ if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM) && !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) LOGGER.warn("[BUG]The message had property but is not an inner batch", MessageConst.PROPERTY_INNER_NUM); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) Optional<TopicConfig> topicConfig = this.getTopicConfig(msg.getTopic()); if (!QueueTypeUtils.isBatchCq(topicConfig)) LOGGER.error("[BUG]The message is an inner batch but cq type is not batch cq"); return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); long beginTime = this.getSystemClock().now(); //commitLog处理消息 CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg); /** * 计算future存储消息所用的时间并将其更新 */ putResultFuture.thenAccept(result -> long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) LOGGER.warn("DefaultMessageStore#putMessage: CommitLog#putMessage cost ms, topic=, bodyLength=", elapsedTime, msg.getTopic(), msg.getBody().length); this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); if (null == result || !result.isOk()) //如果处理失败,则增加一次保存消息失败的次数 this.storeStatsService.getPutMessageFailedTimes().add(1); ); return putResultFuture;
可以看到其实asyncPutMessage将处理结果封装成completableFuture异步执行,开始先做了HookBefore的前置钩子函数,然后检查消息格式以及topic的配置,最后在处理完成后更新了处理的时间和失败次数在storeStatus的成员变量中。其中最核心的操作其实是 CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg); ,它是根据消息进行append,最核心的处理文件的方式就是mappedFileChannel
/** * TODO 核心存储消息的代码 * @param msg * @return */ public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) // Set the storage time if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body CRC (consider the most appropriate setting on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); msg.setVersion(MessageVersion.MESSAGE_VERSION_V1); boolean autoMessageVersionOnTopicLen = this.defaultMessageStore.getMessageStoreConfig().isAutoMessageVersionOnTopicLen(); if (autoMessageVersionOnTopicLen && topic.length() > Byte.MAX_VALUE) msg.setVersion(MessageVersion.MESSAGE_VERSION_V2); InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost(); if (bornSocketAddress.getAddress() instanceof Inet6Address) msg.setBornHostV6Flag(); InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost(); if (storeSocketAddress.getAddress() instanceof Inet6Address) msg.setStoreHostAddressV6Flag(); //获取本地线程的变量,并更新最大消息大小 PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get(); updateMaxMessageSize(putMessageThreadLocal); //根据topic和queue的messgae信息组装成一个唯一的topicQueueKey 格式为:topic-queueId String topicQueueKey = generateKey(putMessageThreadLocal.getKeyBuilder(), msg); long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; //TODO 获取上一次操作的mapperFile 也就是最后的一个mapped MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); //如果当前没有mappedFile 说明是第一次创建,则从最开始进行位置计算 long currOffset; if (mappedFile == null) currOffset = 0; else //如果有说明当前的消息应该存储在 当前commit文件名的位置加上当前指针已经偏移的位置 currOffset = mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(); //计算需要ack的数量以及是否需要做HA通知broker int needAckNums = this.defaultMessageStore.getMessageStoreConfig().getInSyncReplicas(); boolean needHandleHA = needHandleHA(msg); if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) if (this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset) < this.defaultMessageStore.getMessageStoreConfig().getMinInSyncReplicas()) return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null)); if (this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) // -1 means all ack in SyncStateSet needAckNums = MixAll.ALL_ACK_IN_SYNC_STATE_SET; else if (needHandleHA && this.defaultMessageStore.getBrokerConfig().isEnableSlaveActingMaster()) int inSyncReplicas = Math.min(this.defaultMessageStore.getAliveReplicaNumInGroup(), this.defaultMessageStore.getHaService().inSyncReplicasNums(currOffset)); needAckNums = calcNeedAckNums(inSyncReplicas); if (needAckNums > inSyncReplicas) // Tell the producer, don\'t have enough slaves to handle the send request return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH, null)); //对当前指定的key进行锁定,当前key说明是一个topic下一个队列 topicQueueLock.lock(topicQueueKey); try boolean needAssignOffset = true; if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable() && defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) needAssignOffset = false; if (needAssignOffset) defaultMessageStore.assignOffset(msg, getMessageNum(msg)); PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); if (encodeResult != null) return CompletableFuture.completedFuture(encodeResult); msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer()); //存储消息的上下文 PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey); //spin或ReentrantLock,具体取决于存储配置 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try //加锁成功后的时间 long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; // Here settings are stored timestamp, in order to ensure an orderly // global //设置存储时间为加锁成功后的时间,保证顺序 if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) msg.setStoreTimestamp(beginLockTimestamp); //如果当前没有mapped或mapped已经满了,则会创建新的mapped if (null == mappedFile || mappedFile.isFull()) mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise if (null == mappedFile) log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null)); //追加写入的内容 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); switch (result.getStatus()) case PUT_OK: onCommitLogAppend(msg, result, mappedFile); break; case END_OF_FILE: //如果文件空间不足,重新初始化文件并尝试重新写入 onCommitLogAppend(msg, result, mappedFile); unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) onCommitLogAppend(msg, result, mappedFile); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); case UNKNOWN_ERROR: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); default: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); //更新使用的时间 elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; finally //释放锁 putMessageLock.unlock(); finally //释放锁 topicQueueLock.unlock(topicQueueKey); if (elapsedTimeInLock > 500) log.warn("[NOTIFYME]putMessage in lock cost time(ms)=, bodyLength= AppendMessageResult=", elapsedTimeInLock, msg.getBody().length, result); if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) this.defaultMessageStore.unlockMappedFile(unlockMappedFile); PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics //存储缓存数据副本的更新 storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum()); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes()); //提交刷盘请求,提交副本请求 return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);
先设置一些基本数据,如存储时间,brokerHost,storeHost,获取本地变量LocalThread,更新最大的消息存储大小;
根据topic和queue的messgae信息组装成一个唯一的topicQueueKey 格式为:topic-queueId;获取上一次操作的mapperFile 也就是最后的一个mapped,因为消息的写入是append追加的,消息的持久化都是集中存储的;
如果没有获取到使用过的mappedFileChannel,说明这条消息可能是第一条,那么就创建一个fileChannel通道,如果没有消息那么消费的初始点位肯定是0,如果获取到了fileChannel,其实对应的commitlog文件的名称就是这个文件最开始的消费点位,那么当前消息对应的消费点位其实就是获取到的mappedFile的文件名称 + 当前消息所处的offSet的位置 就是这个文件存储的位置;
校验HA和ack;
先对 topicQueueKey进行锁定,这个key生成的规则是topic下的一个queue,计算这次消费的消费点位;
定义存储消息的上下文 PutMessageContext:
public class PutMessageContext private String topicQueueTableKey;//锁定的key private long[] phyPos; private int batchSize;//批量数据的大小 public PutMessageContext(String topicQueueTableKey) this.topicQueueTableKey = topicQueueTableKey;
对putMessageLock进行锁定:这里锁定有两种方式:自旋锁和重入锁
/** * Spin lock Implementation to put message, suggest using this with low race conditions */ public class PutMessageSpinLock implements PutMessageLock //true: Can lock, false : in lock. private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true); @Override public void lock() boolean flag; do flag = this.putMessageSpinLock.compareAndSet(true, false); while (!flag); @Override public void unlock() this.putMessageSpinLock.compareAndSet(false, true);
/** * Exclusive lock implementation to put message */ public class PutMessageReentrantLock implements PutMessageLock private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync @Override public void lock() putMessageNormalLock.lock(); @Override public void unlock() putMessageNormalLock.unlock();
在rocket4.X之后,应该都是默认true,异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁,调整Broker配置项`useReentrantLockWhenPutMessage`,默认为false;异步刷盘建议开启`TransientStorePoolEnable`;建议关闭transferMsgByHeap,提高拉消息效率;同步刷盘建议适当增大`sendMessageThreadPoolNums`,具体配置需要经过压测。
设置成功加锁后的时间,保证了操作的顺序。上一步获取的mappedFile如果没有获取到或者已经获取满了,则需要创建新的mappedFile;
/** * TODO 预处理创建新的commitLog * @return */ public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) long createOffset = -1; /** * 获取最新的mappedFile */ MappedFile mappedFileLast = getLastMappedFile(); //如果获取不到,则说明是第一次创建文件 if (mappedFileLast == null) createOffset = startOffset - (startOffset % this.mappedFileSize); /** * 如果文件写满了,则需要计算下一个文件的初始量 其实就是上一个文件最后的偏移量的下一个 */ if (mappedFileLast != null && mappedFileLast.isFull()) createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize; //创建新的commitLog if (createOffset != -1 && needCreate) return tryCreateMappedFile(createOffset); return mappedFileLast;
追加需要写入的数据 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
/** * TODO append 统一为fileChannel 对文件的写入 提供了单消息和批量消息的写入 */ public AppendMessageResult appendMessage(final ByteBuffer byteBufferMsg, final CompactionAppendMsgCallback cb) assert byteBufferMsg != null; assert cb != null; //获取当前写入的位置 int currentPos = WROTE_POSITION_UPDATER.get(this); //当前写入的位置需要比文件最大的位数要小 if (currentPos < this.fileSize) //根据appendMessageBuffer选择是否写入writeBuffer还是mapperByteBuffer 异步刷盘应该写入writeBuffer 再定时写到mapperBuffer ByteBuffer byteBuffer = appendMessageBuffer().slice(); //修改写入位置 byteBuffer.position(currentPos); AppendMessageResult result = cb.doAppend(byteBuffer, this.fileFromOffset, this.fileSize - currentPos, byteBufferMsg); //AtomicInteger累计更新写入的位置 WROTE_POSITION_UPDATER其实就是当前已经存储文件的字节 WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes()); //更新最后一次写入时间 this.storeTimestamp = result.getStoreTimestamp(); return result; log.error("MappedFile.appendMessage return null, wrotePosition: fileSize: ", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
写入处理后,根据响应状态处理,store提供了 onCommitLogAppend的提交后追加处理,如果当前写入失败是因为写入的长度不满足,则尝试重新创建文件并写入
switch (result.getStatus()) case PUT_OK: onCommitLogAppend(msg, result, mappedFile); break; case END_OF_FILE: //如果文件空间不足,重新初始化文件并尝试重新写入 onCommitLogAppend(msg, result, mappedFile); unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result)); result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) onCommitLogAppend(msg, result, mappedFile); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result)); case UNKNOWN_ERROR: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result)); default: beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
处理完成后,释放锁,缓存数据副本更新,提交刷盘并提交HA
/** * 通知刷盘并HA的核心代码 * @return */ private CompletableFuture<PutMessageResult> handleDiskFlushAndHA(PutMessageResult putMessageResult, MessageExt messageExt, int needAckNums, boolean needHandleHA) /** * 同步刷盘或异步刷盘的任务 */ CompletableFuture<PutMessageStatus> flushResultFuture = handleDiskFlush(putMessageResult.getAppendMessageResult(), messageExt); CompletableFuture<PutMessageStatus> replicaResultFuture; if (!needHandleHA) replicaResultFuture = CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); else replicaResultFuture = handleHA(putMessageResult.getAppendMessageResult(), putMessageResult, needAckNums); return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> if (flushStatus != PutMessageStatus.PUT_OK) putMessageResult.setPutMessageStatus(flushStatus); if (replicaStatus != PutMessageStatus.PUT_OK) putMessageResult.setPutMessageStatus(replicaStatus); return putMessageResult; );
@Override public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) // Synchronization flush //同步刷盘 if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); //将刷盘request:GroupCommitRequest放入commitRequests flushDiskWatcher.add(request); service.putRequest(request); return request.future(); else //唤醒线程去消费 service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); // Asynchronous flush以上是关于打怪升级rocketMqrocket的持久化的主要内容,如果未能解决你的问题,请参考以下文章