rocketmq的broker接收消息的时候,如何更新consumeQueue和indexfile的

Posted notlate

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq的broker接收消息的时候,如何更新consumeQueue和indexfile的相关的知识,希望对你有一定的参考价值。

首先解释下consumeQueue,由于commit-log是根据消息先后存储的,而我们消费的时候是根据topic来筛选的,所以需要一个队列根据topic来划分,所以consumeQueue就是干这个事情的。而indexfile顾名思义就是索引文件,用来做单纯查询的。

 

private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

consumeQueue就是一个topic下面一个queueID的一个具体信息,用ConsumeQueue描述,其中ConsumeQueue跟commit-log一样,也就是用mappedFileQueue描述的,说明存储数据比较多。

indexFile是直接用mappedFile描述的。

 

这两类对象都是需要持久化的,他们都是通过public void doDispatch(DispatchRequest req)进行异步处理的,之所以异步是因为都是可以通过commit-log得到,所以不急着像cmmit-log一样急着落盘。dispatch方法只有两个地方用到:有新消息产生和故障恢复。故障恢复这里不提,后面提,这里只说新消息产生。

 

ReputMessageService线程:

每当有信息提交到mappedfile以后,那么这个线程通过判断:this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();

就知道有新消息产生,那么一直遍历这个mappedfile的buff,不停的取消息,然后一个消息就对应一个DispatchRequest,然后交给dispatch处理:

 

private void doReput() {
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }

                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getMsgSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                    }

                                    this.reputFromOffset += size;
                                    readSize += size;

  

有三种dispatch:

    public void doDispatch(DispatchRequest req) {
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);
        }
    }

 

分别是consumeQueue、indexFile、bitmap,第三个默认用不到。

在这里可以构造一个新的consumeQueue:

 

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }

  

 

然后用这个新的或者老的consumeQueue来处理这条消息:

在private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) 方法中用于处理这个request

        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);

  这里的offset、size、tagscode、expectLogicOffset分别对应原始消息的全局物理offset、消息大小、tags、消费offset,这个消费offset就是commit-log的putMessage的:

  

            keyBuilder.setLength(0);
            keyBuilder.append(msgInner.getTopic());
            keyBuilder.append(‘-‘);
            keyBuilder.append(msgInner.getQueueId());
            String key = keyBuilder.toString();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }

  CQ_STORE_UNIT_SIZE就是20,也就是consumeQueue里面只存对应的topic-queueid对应的消息,并且只存物理offset、tag、总体大小,这三个指标正好占了20比特。存贮位置可以直接通过消费offset乘以20得到。由于消息很多,所以用mappedFIledQueue来存储,具体用哪个Queue也可以通过消费offset拿到指定的mappedFile。

这里的topicQueueTable其实跟consumeQueue是可以转换得到的,后面说故障恢复的时候再说。

 

 

接下来看看处理indexfile的dispatch。

为了写入indexfile,我们首先需要拿到一个indexfile,在getAndCreateLastIndexFile方法中,在这里看到只有前一个indexfile文件完全写完以后,我们才能生成新的,并且用新线程进行刷盘操作。

看下写入的操作:

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;

            try {

                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }

                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;

                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }

                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }

  每个index文件只能存入指定数量的内容,如果满了返回false,创建新的indexfile。

  indexfile文件分布是:首先是文件头、然后是slot、最后是index内容。每个index内容大小是20字节,总共4个int、一个long。

  这里的key就是topic+uniqkey,然后做哈希,再对槽数取模得到slot的位置absslotpos,每个slot占用4字节,存储的就是当前this.indexHeader.getIndexCount()的序号(从1开始涨),这个数字this.indexHeader.getIndexCount()不能超过hashSlotNum,这个保证了indexFile是满的,不会超过也不会过小。

  

  index的绝对位置通过:int absIndexPos =    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize;然后在这个位置插入:

                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

  这里的slotValue其实就是上一个冲突slot位置对应的index-count,通过这个链表办法解决哈希冲突。在看下index存储的其他内容:哈希值、物理绝对位置、时间。

 

在看看如何使用index文件的:

 

    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {
        if (this.mappedFile.hold()) {
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;
            try {
                if (lock) {
                    // fileLock = this.fileChannel.lock(absSlotPos,
                    // hashSlotSize, true);
                }

                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                // if (fileLock != null) {
                // fileLock.release();
                // fileLock = null;
                // }

                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }

                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;

                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }

                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }

                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }

                this.mappedFile.release();
            }
        }
    }

  通过int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);就可以拿到槽冲突对应的index-count,继续找下去。

  为了防止冲突找到错误的信息,还加上了过滤条件:

if (keyHash == keyHashRead && timeMatched) {phyOffsets.add(phyOffsetRead);}

 

这里面还有一个checkpoint机制,对于indexfile,在load方法里面:

 

            if (!lastExitOK) {
                        if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
                            .getIndexMsgTimestamp()) {
                            f.destroy(0);
                            continue;
                        }
                    }

  

 endtimestamp是最后一次写入数据时间,右边是刷盘时间。如果右边小,说明不是一个完整文件。一个没有及时刷盘的文件肯定是一个空文件,因为index只有两个可能:空文件或者满文件,所以直接舍弃这个文件。

 

以上是关于rocketmq的broker接收消息的时候,如何更新consumeQueue和indexfile的的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码—Broker接收消息入口源码

RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic

RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic

RocketMQ 保证消息不丢失

RocketMQ原理解析-NameServer

消息中间件—RocketMQ的RPC通信