二分查找在RocketMQ和Kafka中的应用

Posted 不识君的荒漠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了二分查找在RocketMQ和Kafka中的应用相关的知识,希望对你有一定的参考价值。

二分查找

二分查找是在有序元素中找到目标元素或最终未找到返回。

算法模板如下:

    public int indexOf(int[] nums, int target) 
        int left = 0, right = nums.length - 1;
        while (left <= right) 
            // 实际情况下担心溢出,可以这样写:mid = left + (right - left) / 2;
            int mid = (left + right) / 2;
            // 找到目标值了,返回该索引
            if (nums[mid] == target) 
                return mid;
                // 目标值可能在nums[mid] ~ nums[right] 之间
             else if (nums[mid] < target) 
                left = mid + 1;
             else 
                // 目标值可能在nums[left] ~ nums[mid] 之间
                right = mid - 1;
            
        

        return -1;
    

二分查找也算是比较基本的查找算法,这里不再赘述了。

在RocketMQ中的应用

RocketMQ的消费队列

在聊之前,还是先简单的看下RocketMQ的存储结构,下面这张图片来自官方文档:https://github.com/apache/rocketmq/blob/develop/docs/cn/image/rocketmq_design_1.png

本文关注的重点是消费队列这部分,官方说明如下:

(2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/topic/queueId/fileName。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;

p.s. 其实RocketMQ的源码里的文档很完善的,每次有同学需要了解 rocketmq的时候,我都会推荐他们先去把源码下载下来看下里面的文档。

简而言之,消费队列可以认为存储的是实际消息的索引信息,消息内容实际是存储在commit log里。就有点类似字典的目录,方便在书本里检索想要查找的汉字,和字典不同的是,每一条消息都会在消费队列里有对应的索引信息。

二分查找的使用

RocketMQ在根据时间戳查询消息的时候,采用的是二分查找的算法。

因为消费队列里存储的每条消息的索引信息,都是根据时间有序存储的。

在1个消费队列里,我们可以通过比对索引信息对应消息的时间戳,采用2分查找的算法检索相关的索引信息来找到目标消息的偏移。

源码位置:org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime

这部分代码我已经加好了注释,如下,感兴趣可以耐心看一下:

    public long getOffsetInQueueByTime(final long timestamp) 
        // 获取最后访问时间小于当前时间戳的消费队列文件,这里我更好奇的是如果把所有消费队列文件清完重新生成,会有什么问题
        MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
        if (mappedFile != null) 
            // 下面所有的offset表示的是"逻辑"物理位置,打个比方,每条索引信息大小为20个字节,第3条消息的索引位置为(3-1)*20=40,表示的是文件的物理写入位置
            // 而不是第几条消息的索引信息,所以这个变量很误导人,消息偏移用offset,文件写入位置用pos多好,这里也用offset
            long offset = 0;
            // 消费队列文件的命名方式和commit log一样,都是以写入位置命名,所以是一个"逻辑"的物理位置。
            // 打个比方,第一个文件,从第一条消息写入,写入位置为0,文件名就是0,写入3条消息后,当前文件满了,开始写入下一个文件
            // 下一个文件的名则是60,因为要写入的第4条消息,在逻辑上的物理位置是(4-1)*20,当然的在当前的文件中是从物理位置0开始从头写入的
            int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
            int high = 0;
            int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
            // 我们是要根据时间戳来查找消息,但是很有可能这个时间点是没有消息的,所以这两个变量是用来标记离查询的时间戳最近的前、后两个时间,到时候谁更接近就返回谁的
            long leftIndexValue = -1L, rightIndexValue = -1L;
            // 目前commit log中最小的写入位置,其实就是最早那个的commit log文件 的文件名表示的位置
            long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
            SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
            if (null != sbr) 
                ByteBuffer byteBuffer = sbr.getByteBuffer();
                // 表示当前消费队列中,最后一条索引消息的写入位置,比如文件大小为100,可以写入5要索引数据,最后一条的起始位置是80
                high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
                try 
                    while (high >= low) 
                        // 这是计算mid的那条索引的写入的起始物理位置
                        midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
                        byteBuffer.position(midOffset);
                        // 每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode
                        // 下面拿到该索引的消息的在commit log中的逻辑位置
                        long phyOffset = byteBuffer.getLong();
                        int size = byteBuffer.getInt();
                        // 这说明该消息对应的commit log文件可能已经过期删除了,所以该消息已经不存在了
                        if (phyOffset < minPhysicOffset) 
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                            continue;
                        

                        // 根据索引里的逻辑位置,可以计算出在哪个commit log及物理位置,根据大小,获取这条消息,然后得出存储的时间戳
                        long storeTime =
                            this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                        if (storeTime < 0) 
                            // 如果找不到或者错误,返回的是-1,这里就直接返回了
                            return 0;
                            // 下面这一部分就是2分查找了,和我们学习的简单2分查找的区别只是,这里的步长是一个索引的长度,20个字节
                         else if (storeTime == timestamp)  // 刚好找到了
                            targetOffset = midOffset;
                            break;
                         else if (storeTime > timestamp) 
                            high = midOffset - CQ_STORE_UNIT_SIZE;
                            rightOffset = midOffset;
                            rightIndexValue = storeTime;
                         else 
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                            leftIndexValue = storeTime;
                        
                    

                    if (targetOffset != -1) 
                        // 不是-1,这是找到了
                        offset = targetOffset;
                     else 
                        // 下面是没有找到,就获取离目标时间最近的那个时间的偏移
                        if (leftIndexValue == -1) 

                            offset = rightOffset;
                         else if (rightIndexValue == -1) 

                            offset = leftOffset;
                         else 
                            offset =
                                Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
                                    - rightIndexValue) ? rightOffset : leftOffset;
                        
                    

                    return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
                 finally 
                    sbr.release();
                
            
        
        return 0;
    

在这里再说几句题外话,其实上面这段代码我很早之前第一次看的时候,对于里面的变量名,感觉是很懵的。

在rocketmq里消息偏移有不少地方都用的是offset之类表示的(看得多了,我潜意识就这样认为了),而消息的物理(逻辑)文件位置用的是position之类的变量名,在这个地方计算的都是逻辑或物理写入位置,但变量名上都带个offset,第一次差点看蒙了。

还有个问题是,这次在写这篇博文时,我给代码加注释的时候,想起我很早之前提到过的一个issue,有兴趣可以自己看下:getOffsetInQueueByTime return the max offset when time greater than the storetime of last message #2661

因为下面还有Kafka部分的内容,为了和kafka形成对比,就再说明下,如何根据消费偏移检索消息。

一条消费队列内的索引是连续有序(写入顺序)的,类似一个数组,直接通过索引信息的大小乘以对应的偏移量,便可以得到对应的索引位置获取索引数据,进而得到实际的消息信息。

获取消费队列的索引代码如下,很简单,就不加注释说明了:

    public SelectMappedBufferResult getIndexBuffer(final long startIndex) 
        int mappedFileSize = this.mappedFileSize;
        long offset = startIndex * CQ_STORE_UNIT_SIZE;
        if (offset >= this.getMinLogicOffset()) 
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) 
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
                return result;
            
        
        return null;
    

在Kafka中的应用

Kafka的索引

关于kafka的存储结构看官方文档:Apache Kafka,官方文档这里因为不涉及对索引部分的详细说明,我就简单说下,下图来自官方文档:

kafka每个topic的每个分区目录下有单独的存储消息数据的log文件和对应的索引文件。

  • 0.log存储当前分区实际的消息数据
  • 0.index是位移索引
  • 0.timeindex是时间戳索引

位移索引里每条数据的格式大概是这样:<消息相对位移值,消息在物理文件上的写入位置>

时间戳索引第条数据的格式是:<时间戳,消息相对位移值>

所以根据时间戳检索消息的时候,是先找到该时间对应的消息位移,再读取具体的消息内容。

p.s. 因为我没启用事务,所以上面没事务索引文件。

下面的所有说明都是来自我几个月前下载的trunk分支的代码,当时版本是3.1.0-SNAPSHOT,kafka代码变动太快,所以这里注明一下。

二分查找的使用

和rocketmq不同的是,无论是根据位移还是时间戳,kafka的消息检索都是采用二分查找,而且调用的是同一个方法,看一下方法调用关系:

这个方法的源码在:kafka.log.AbstractIndex#indexSlotRangeFor

这部分代码我也加了注释,感兴趣可以耐心看下:

  private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchType): (Int, Int) = 
    // 索引项是空的
    if(_entries == 0)
      return (-1, -1)

    // 二分查找
    def binarySearch(begin: Int, end: Int) : (Int, Int) = 
      // binary search for the entry
      var lo = begin
      var hi = end
      while(lo < hi) 
        // >>> 无符号右移一位,我估计是为了避免溢出吧。(lo + hi + 1)可能溢出,这应该是lo + (hi - lo) / 2的替代方案吧
        val mid = (lo + hi + 1) >>> 1
        // parseEntry在位移索引和时间戳索引中是不同的实现
        val found = parseEntry(idx, mid)
        val compareResult = compareIndexEntry(found, target, searchEntity)
        if(compareResult > 0)
          hi = mid - 1
        else if(compareResult < 0)
          lo = mid
        else
          return (mid, mid)
      
      // 如果是最后一条,hi是-1,不要多想,没太多深意,就是一个标记,上层调用这个方法的时候,看见-1,知道是没有比它更大的条目了
      (lo, if (lo == _entries - 1) -1 else lo + 1)
    

    // kafka在这里做了个冷热分区的查找优化,_warmEntries=8192/(index entry size),这个索引大小,位移索引应该是8,时间戳索引应该是12
    // 关于为什么选择8192也就是8KB大小作为热区,看代码最上面的注释,写的比较清楚了
    val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
    // check if the target offset is in the warm section of the index
    // 在热区查找
    if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) 
      return binarySearch(firstHotEntry, _entries - 1)
    

    // check if the target offset is smaller than the least offset
    if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
      return (-1, 0)
    // 冷区查找
    binarySearch(0, firstHotEntry)
  

关于kafka的这个二分查找,比较突出的就是这个冷热分区查找的优化。

我们现在用的大部分操作系统,page cache的置换策略大多是LRU或它的变体。对于冷热分区的这个优化,注释里有一个示例进行了说明,如下:

For example, in an index with 13 pages, to lookup an entry in the last page (page #12), the standard binary search
algorithm will read index entries in page #0, 6, 9, 11, and 12.
page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 |
steps:       |1| | | | | |3| | |4|  |5 |2/6|
In each page, there are hundreds log entries, corresponding to hundreds to thousands of kafka messages. When the
index gradually growing from the 1st entry in page #12 to the last entry in page #12, all the write (append)
operations are in page #12, and all the in-sync follower / consumer lookups read page #0,6,9,11,12. As these pages
are always used in each in-sync lookup, we can assume these pages are fairly recently used, and are very likely to be
in the page cache. When the index grows to page #13, the pages needed in a in-sync lookup change to #0, 7, 10, 12,
and 13:
page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 |
steps:       |1| | | | | | |3| | | 4|5 | 6|2/7|
Page #7 and page #10 have not been used for a very long time. They are much less likely to be in the page cache, than
the other pages. The 1st lookup, after the 1st index entry in page #13 is appended, is likely to have to read page #7
and page #10 from disk (page fault), which can take up to more than a second. In our test, this can cause the
at-least-once produce latency to jump to about 1 second from a few ms.

可能有的同学,也没耐心看这个,我简单说下,写入的最新消息等待副本同步,消费者拉取消费,这些最新消息的索引会经常访问,肯定是在操作系统的缓存里,所以把最新消息的8KB索引信息作为热区,其它的索引信息是在冷区,因为不经常访问的可能被操作系统置换出去了,如果查找的时候,会出现缺页错误,操作系统要重新从磁盘加载到缓存里。

所以查找索引数据的时候先看下有没有在热区,就是这个索引是不是最新的那几百条索引里,如果在热区,就对热区进行二分查找。

如果在冷区,可能这些索引在进行二分查找的时候,需要重新从磁盘加载到操作系统的page cache,比较影响性能。

一般情况下,最新的消息同步或者是没有积压,消费最新的消息,这些消息的索引都是在热区的。

p.s. kafka这个冷热分区的方案,让我在参加去年的天池大赛时,对于冷热数据处理上提供了一个很好的idea。

以上是关于二分查找在RocketMQ和Kafka中的应用的主要内容,如果未能解决你的问题,请参考以下文章

Kafka中改进的二分查找算法

kafka根据offset查找消息流程

RocketMQ和Kafka到底选哪个

RocketMq在SparkStreaming中的应用

赛道解析针对冷热读写场景的 RocketMQ 存储系统设计思路拆解

RocketMq在SparkStreaming中的应用总结