kafkaKafka源码解析 - LogSegment以及Log初始化

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafkaKafka源码解析 - LogSegment以及Log初始化相关的知识,希望对你有一定的参考价值。

文章目录


1.概述

转载并且补充:Kafka源码解析 - LogSegment以及Log初始化

我们先回想一下Kafka的日志结构是怎样的?

Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件,包括消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)以及已中止(Aborted)事务的索引文件(.txnindex)。当然,如果你没有使用 Kafka 事务,已中止事务的索引文件是不会被创建出来的

下面我们看一下LogSegment的实现情况,具体文件位置是 core/src/main/scala/kafka/log/LogSegment.scala。

2.LogSegment

LogSegment.scala这个文件里面定义了三个对象:

  • LogSegment class;
  • LogSegment object;
  • LogFlushStats object。LogFlushStats 结尾有个 Stats,它是做统计用的,主要负责为日志落盘进行计时。

我这里贴一下LogSegment.scala这个文件上面的注释,介绍了LogSegment的构成:

A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in any previous segment

这段注释清楚的写了每个日志段由两个核心组件构成:日志和索引。每个日志段都有一个起始位置:base offset而该位移值是此日志段所有消息中最小的位移值,同时,该值却又比前面任何日志段中消息的位移值都大

2.1.LogSegment构造参数

class LogSegment private[log] (val log: FileRecords,
                               val lazyOffsetIndex: LazyIndex[OffsetIndex],
                               val lazyTimeIndex: LazyIndex[TimeIndex],
                               val txnIndex: TransactionIndex,
                               val baseOffset: Long,
                               val indexIntervalBytes: Int,
                               val rollJitterMs: Long,
                               val time: Time) extends Logging 

FileRecords是实际保存 Kafka 消息的对象。

lazyOffsetIndex、lazyTimeIndex 和 txnIndex 分别对应位移索引文件、时间戳索引文件、已中止事务索引文件。

baseOffset是每个日志段对象的起始位移,每个 LogSegment 对象实例一旦被创建,它的起始位移就是固定的了,不能再被更改。

indexIntervalBytes 值其实就是 Broker 端参数 log.index.interval.bytes 值,它控制了日志段对象新增索引项的频率。默认情况下,日志段至少新写入 4KB 的消息数据才会新增一条索引项。

time 是用于统计计时的一个实现类。

2.2 .append

/**
   * Append the given messages starting with the given offset. Add
   * an entry to the index if needed.
   *
   * It is assumed this method is being called from within a lock.
    *
    * 在指定的 offset 处追加指定的 msgs, 需要的情况下追加相应的索引
    *
    *  将一组消息追加写入到以给定offset开始的日志段中。如果写入超过了4KB(默认的log.index.interval.bytes属性值)则额外写入一条新的索引
    *  项记录到索引文件中。这个方法不是线程安全的,所以后面调用的时候需要有锁同步机制的保护
    *
    *  在LogSegment.append0方法中实现了追加消息的功能,可能有多个Handler线程并发写人同一个LogSegment,所以调用此方法时必须保证线程安全,
    *  在后面分析Log类时会看到相应的同步代码。
    *
    *  这个方法主要做了那么几件事:
    *
    *  1. 判断日志段是否为空,不为空则往下进行操作
    *  2. 调用ensureOffsetInRange方法,确保输入参数最大位移值是合法的。
    *  3. 调用 FileRecords 的 append 方法执行真正的写入。
    *  4. 更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性。
    *  5. 更新索引项和写入的字节数,日志段每写入 4KB 数据就要写入一个索引项。当已写入字节数超过了
    *     4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数。

   *
   * @param largestOffset The last offset in the message set
   * @param largestTimestamp The largest timestamp in the message set.
   * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append.
   * @param records The log entries to append.
   * @return the physical position in the file of the appended records
   * @throws LogSegmentOffsetOverflowException if the largest offset causes index offset overflow
    *
    *
    */
  @nonthreadsafe
  def append(largestOffset: Long,
             largestTimestamp: Long,
             shallowOffsetOfMaxTimestamp: Long,
             records: MemoryRecords): Unit = 
    // 检测是否满足添加索引项的条件,判断是否日志段是否为空
    if (records.sizeInBytes > 0) 
      trace(s"Inserting $records.sizeInBytes bytes at end offset $largestOffset at position $log.sizeInBytes " +
            s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
      // 获取FileRecord 文件的末尾,他就是本次消息要写入的物理地址
      val physicalPosition = log.sizeInBytes()
      if (physicalPosition == 0)
        rollingBasedTimestamp = Some(largestTimestamp)

      // 确保输入参数最大位移值是合法的
      ensureOffsetInRange(largestOffset)

      // append the messages
      // todo:写日志文件 追加到数据文件中 内存中
      val appendedBytes = log.append(records)
      trace(s"Appended $appendedBytes to $log.file at end offset $largestOffset")
      // Update the in memory max timestamp and corresponding offset.
      // 更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性
      if (largestTimestamp > maxTimestampSoFar) 
        maxTimestampSoFar = largestTimestamp
        offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
      
      // append an entry to the index (if needed)
      // 当一些如字节数超过4kb之后,append方法会调用索引对象的append方法 新增索引项 同时清空已经写入字节数

      //note: 判断是否需要追加索引(数据每次都会添加到数据文件中,但不是每次都会添加索引的,间隔 indexIntervalBytes 大小才会写入一个索引文件)
      // todo: 优化点 这里并不是来一条数据就写入一条数据,而是达到了一定的条件才写一次索引,我们管它叫稀疏索引
      //     indexIntervalBytes 默认是4096个字节,也就是说每次写了4096个字节的消息就更新一次索引
      if (bytesSinceLastIndexEntry > indexIntervalBytes) 
        // todo:添加数据索引
        offsetIndex.append(largestOffset, physicalPosition)
        //添加时间戳索引
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
        //重置为0
        bytesSinceLastIndexEntry = 0
      
      //更新bytesSinceLastIndexEntry 添加消息的大小到 bytesSinceLastIndexEntry 变量中
      bytesSinceLastIndexEntry += records.sizeInBytes
    
  


这个方法主要做了那么几件事:

  1. 判断日志段是否为空,不为空则往下进行操作
  2. 调用ensureOffsetInRange方法,确保输入参数最大位移值是合法的。
  3. 调用 FileRecords 的 append 方法执行真正的写入。
  4. 更新日志段的最大时间戳以及最大时间戳所属消息的位移值属性。
  5. 更新索引项和写入的字节数,日志段每写入 4KB 数据就要写入一个索引项。当已写入字节数超过了
    4KB 之后,append 方法会调用索引对象的 append 方法新增索引项,同时清空已写入字节数。

我们下面再看看ensureOffsetInRange方法是怎么校验最大位移的:

/**
    * 这个方法会将offset和baseOffset做对比,当offset小于baseOffset或者当offset和baseOffset相减后大于Int的最大值,
    * 那么都是异常的情况,那么这时就会抛出LogSegmentOffsetOverflowException异常。
    * @param offset
    * @return
    */
  private def ensureOffsetInRange(offset: Long): Unit = 
    if (!canConvertToRelativeOffset(offset))
      throw new LogSegmentOffsetOverflowException(this, offset)
  

这个方法最终会调用到AbstractIndex的toRelative方法中:

/**
    * 这个方法会将offset和baseOffset做对比,当offset小于baseOffset或者当offset和baseOffset相减后大于Int的最大值,
    * 那么都是异常的情况,那么这时就会抛出LogSegmentOffsetOverflowException异常。
    * @param offset
    * @return
    */
  private def toRelative(offset: Long): Option[Int] = 
    val relativeOffset = offset - baseOffset
    if (relativeOffset < 0 || relativeOffset > Int.MaxValue)
      None
    else
      Some(relativeOffset.toInt)
  

可见这个方法会将offset和baseOffset做对比,当offset小于baseOffset或者当offset和baseOffset相减后大于Int的最大值,那么都是异常的情况,那么这时就会抛出LogSegmentOffsetOverflowException异常。

2.3 .read

/**
   * Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
   * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
   *
   * @param startOffset A lower bound on the first offset to include in the message set we read
   * @param maxOffset An optional maximum offset for the message set we read
   * @param maxSize The maximum number of bytes to include in the message set we read
   * @param maxPosition The maximum position in the log segment that should be exposed for read
   * @param minOneMessage If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists)
   *
   * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
   *         or null if the startOffset is larger than the largest offset in this log
    *
    * 读取日志分段(副本同步不会设置 maxSize)
    *
    *  startOffset : 指定读取的起始消息的offset
    *  maxOffset : 指定读取结束的offset,可以为空
    *  maxSize : 指定读取的最大字节数
    *  maxPosition : 指定读取的最大物理地址,可选参数,默认值是日志文件的大小
   */
  @threadsafe
  def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
           minOneMessage: Boolean = false): FetchDataInfo = 
    if (maxSize < 0)
      throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")

    // 日志文件测长度,log 文件物理长度
    val logSize = log.sizeInBytes // this may change, need to save a consistent copy
    //将startOffset转成对应的物理地址,以字节为单位
    val startOffsetAndSize = translateOffset(startOffset)

    // if the start position is already off the end of the log, return null
    if (startOffsetAndSize == null)
      return null

    val startPosition = startOffsetAndSize.position
    // 组装offset元数据信息
    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)

    val adjustedMaxSize =
      if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
      else maxSize

    // return a log segment but with zero size in the case below
    // 如果设置了 adjustedMaxSize ,则根据其具体计算实际需要读取的字节数
    if (adjustedMaxSize == 0)
      return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    // 计算读取的字节数
    val fetchSize: Int = maxOffset match 
      //note: 副本同步时的计算方式
      case None =>
        // no max offset, just read until the max position
        min((maxPosition - startPosition).toInt, adjustedMaxSize)
      //note: consumer 拉取时,计算方式
      case Some(offset) =>
        // there is a max offset, translate it to a file position and use that to calculate the max read size;
        // when the leader of a partition changes, it's possible for the new leader's high watermark to be less than the
        // true high watermark in the previous leader for a short window. In this window, if a consumer fetches on an
        // offset between new leader's high watermark and the log end offset, we want to return an empty response.
        if (offset < startOffset)
          return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
        //将maxOffset转成物理地址
        val mapping = translateOffset(offset, startPosition)
        val endPosition =
          // maxOffset超出此日志文件时,则使用日志文件长度
          if (mapping == null)
            logSize // the max offset is off the end of the log, use the end of the file
          else
            mapping.position
        //由maxOffsetmaxPosition和maxSize共同决定读取的长度
        min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
    

    //maxOffset通常是Replica的HW,消费者最多只能读到HW这个位置的消息。

    // todo: 封装结果 通过 log.slice 读取数据  读取数据的方法是 log.slice
    //按照读取起始位置和长度生成的一个分片的FileRecords对象,根据起始的物理位置和读取长度读取数据文件
    FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
      firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
  

这段代码中,主要做了这几件事:

  1. 调用 translateOffset 方法定位要读取的起始文件位置 (startPosition)。
    举个例子,假设 maxSize=100,maxPosition=300,startPosition=250,那么 read 方法只能读取 50 字节,因为 maxPosition - startPosition = 50。我们把它和 maxSize 参数相比较,其中的最小值就是最终能够读取的总字节数。
  • 调用 FileRecords 的 slice 方法,从指定位置读取指定大小的消息集合

2.4 .recover

这个方法是恢复日志段,Broker 在启动时会从磁盘上加载所有日志段信息到内存中,并创建相应的 LogSegment 对象实例。在这个过程中,它需要执行一系列的操作。

/**
   * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes
   * from the end of the log and index.
   *
   * @param producerStateManager Producer state corresponding to the segment's base offset. This is needed to recover
   *                             the transaction index.
   * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
   * @return The number of bytes truncated from the log
   * @throws LogSegmentOffsetOverflowException if the log segment contains an offset that causes the index offset to overflow
    *
    * 根据日志文件重建索引文件,同时验证日志文件中消息的合法性
    *
    * 在重建索引文件过程中,如果遇到了压缩消息需要进行解压,主要原因是因为索引项中保存的相对offset 是第一条消息的offset
    * ,而外层消息的offiset是压缩消息集合中的最后一条消息的offset。
    *
    * 恢复一个日志段——即根据日志文件重建索引并砍掉那些无效的字节,所谓的无效字节就是由参数限定的,任何在maxMessageSize之外的字节都是为无效状态。
    * 该方法实现也很简单,就是先将索引项全部截断并将索引文件重置为原来的大小,然后遍历该消息集合,超过indexIntervalBytes之后就追加一条索引记录从而达到重建索引的目的
    *
    * 这个方法是恢复日志段,Broker 在启动时会从磁盘上加载所有日志段信息到内存中,并创建相应的
    * LogSegment 对象实例。在这个过程中,它需要执行一系列的操作。
    *
    * 这个方法主要做了以下几件事:
    * 1. 清空索引文件
    * 2. 遍历日吹端中多有消息集合
    *    1. 校验日志段中的消息
    *    2. 获取最大时间戳及所属消息位移
    *    3. 更新索引项
    *    4. 更新总消息字节数
    *    5. 更新Porducer状态和Leader Epoch缓存
    * 3. 执行消息日志索引文件截断
    * 4. 调整索引文件大小
   */
  @nonthreadsafe
  def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = 
    offsetIndex.reset()
    timeIndex.reset()
    txnIndex.reset()
    var validBytes = 0
    var lastIndexEntry = 0
    maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
    try 
      //遍历日志段中所有消息集合
      for (batch <- log.batches.asScala) 
        // 验证消息是否合法
        batch.ensureValid()
        // 校验消息中最后一条消息的位移不能越界
        ensureOffsetInRange(batch.lastOffset)

        // The max timestamp is exposed at the batch level, so no need to iterate the records
        // 获取最大时间戳及所属消息位移
        if (batch.maxTimestamp > maxTimestampSoFar) 
          maxTimestampSoFar = batch.maxTimestamp
          offsetOfMaxTimestampSoFar = batch.lastOffset
        

        // Build offset index
        // 当已写入字节数超过了 4KB 之后,调用索引对象的 append 方法新增索引项,同时清空已写入字节数
        if (validBytes - lastIndexEntry > indexIntervalBytes) 
          offsetIndex.append(batch.lastOffset, validBytes)
          timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
          lastIndexEntry = validBytes
        
        // 更新总消息字节数
        validBytes += batch.sizeInBytes()

        // 更新Porducer状态和Leader Epoch缓存
        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) 
          leaderEpochCache.foreach  cache =>
            if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
              cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
          
          updateProducerState(producerStateManager, batch)
        
      
     catch 
      case e: CorruptRecordException =>
        warn("Found invalid messages in log segment %s at byte offset %d: %s."
          .format(log.file.getAbsolutePath, validBytes, e.getMessage))
    
    // 遍历完后将 遍历累加的值和日志总字节数比较,
    val truncated = log.sizeInBytes - validBytes
    if (truncated > 0)
      debug(s"Truncated $truncated invalid bytes at the end of segment $log.file.getAbsoluteFile during recovery")

    // 执行日志截断操作,对日志文件进行拦截,抛弃后面验证失败的Message
    log.truncateTo(validBytes)
    // 对索引文件进行相应的截断,调整索引文件大小
    offsetIndex.trimToValidSize()
    // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
    // 对索引文件进行相应的拦截
    timeIndex.trimToValidSize()
    // 返回截掉的字节数
    truncated
  

下面我们进入到truncateTo方法中,看一下截断操作是怎么做的:

/**
     * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
     * given size falls on a valid message boundary.
     * In some versions of the JDK truncating to the same size as the file message set will cause an
     * update of the files mtime, so truncate is only performed if the targetSize is smaller than the
     * size of the underlying FileChannel.
     * It is expected that no other threads will do writes to the log when this function is called.
     * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes.
     * @return The number of bytes truncated off
     *
     * 将文件消息集合截断成指定的字节大小,这个方法不保证截断位置的Message的完整性。
     *
     * Kafka 会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,
     * 说明日志段写入了一些非法消息,需要执行截断操作,将日志段大小调整回合法的数值。
     */
    public int truncateTo(int targetSize) throws IOException 
        int originalSize = sizeInBytes();
        // 检测 targetSize 的有效性, 要截断的目标大小不能超过当前文件的大小
        if (targetSize > originalSize || targetSize < 0)
            throw new KafkaException("Attempt to truncate log segment " + file + " to " + targetSize + " bytes failed, " +
                    " size of this log segment is " + originalSize + " bytes.");
        //如果目标大小小于当前文件大小,那么执行截断
        if (targetSize < (int) channel.size()) 
            // 裁剪文件
            channel.truncate(targetSize);
            // 修改 size
            size.set(targetSize);
        
        // 返回裁剪掉的字节数
        return originalSize - targetSize;
    

Kafka 会将日志段当前总字节数和刚刚累加的已读取字节数进行比较,如果发现前者比后者大,说明日志段写入了一些非法消息,需要执行截断操作,将日志段大小调整回合法的数值

2.5 .truncateTo

这个方法会将日志段中的数据强制截断到指定的位移处。

/**
   * Truncate off all index and log entries with offsets >= the given offset.
   * If the given offset is larger than the largest message in this segment, do nothing.
   *
   * @param offset The offset to truncate to
   * @return The number of log bytes truncated
    *
    *  给定一个位移,将位于该位移之后的所有索引项和日志项全部清除,如果给定的位移大于日志段本身的最大位移就什么都不做。
    *  最后函数返回日志数据总共截断的字节数。值得注意的是,如果把所有日志数据都截断了,那么需要更新这个日志段的创建日期。
    *  同时还会将检查是否增加索引项的指针清零。
    *
    *  日志截断使之保存的最大offset不会超过给定的targetOffset。当然,如果targetOffset就比现有日志的结束位移还要大自然什么都不做。
    *  另外在截断的过程中,还需要判断该log的最小位移(也就是第一个日志段的基础位移)如果比targetOffset大的话,那么直接调用
    *  truncateFullyAndStartAt方法删除所有日志数据并设置新的位移点,否则逐一删除那些起始位移比targetOffset大的日志段。
    *  此时activeSegment会自动变成当前删除之后最新的那个日志段,所以还要对activeSegment进行截断操作。这些做完之后更新下一条消息offset并重设恢复点位移
    *
    * 这个方法会将日志段中的数据强制截断到指定的位移处。
    *   1. 将位置值转换成物理文件位置
    *   2. 移动索引到指定位置,位移索引文件、时间戳索引文件、已中止事务索引文件等位置
    *   3. 将索引做一次resize操作,节省内存空间
    *   4. 调整日志段日志位置
    */
  @nonthreadsafe
  def truncateTo(offset: Long): Int = 
    // Do offset translation before truncating the index to avoid needless scanning
    // in case we truncate the full index
    // 将位置值转换成物理文件位置
    val mapping = translateOffset(offset)
    // 移动索引到指定位置
    offsetIndex.truncateTo(offset)
    timeIndex.truncateTo(offset)
    txnIndex.truncateTo(offset)

    // After truncation, reset and allocate more space for the (new currently active) index
    // 因为位置变了,为了节省内存,做一次resize操作
    offsetIndex.resize(offsetIndex.maxIndexSize)
    timeIndex.resize(timeIndex.maxIndexSize)

    val bytesTruncated = if (mapping == null) 0 else log.truncateTo(mapping.position)
    // 如果调整到初始位置,那么重新记录一下创建时间
    if (log.sizeInBytes == 0) 
      created = time.milliseconds
      rollingBasedTimestamp = None
    

    //调整索引项
    bytesSinceLastIndexEntry = 0
    //调整最大的索引位置
    if (maxTimestampSoFar >= 0)
      loadLargestTimestamp()
    bytesTruncated
  

  1. 将位置值转换成物理文件位置
  2. 移动索引到指定位置,位移索引文件、时间戳索引文件、已中止事务索引文件等位置
  3. 将索引做一次resize操作,节省内存空间
  4. 调整日志段日志位置

我们到OffsetIndex的truncateTo方法中看一下:

/**
    * 1. 根据指定位移返回消息中的槽位。
    * 2. 如果返回的槽位小于零,说明没有消息位移小于指定位移,所以newEntries返回0。
    * 3. 如果指定位移在消息位移中,那么返回slot槽位。
    * 4. 如果指定位移位置大于消息中所有位移,那么跳到消息位置中最大的一个的下一个位置。
    *
    * @param offset
    */
  override def truncateTo(offset: Long) 
    inLock(lock) 
      val idx = mmap.duplicate
      //根据指定位移返回消息中位移
      val slot = largestLowerBoundSlotFor(idx, offset, IndexSearchType.KEY)

      /* There are 3 cases for choosing the new size
       * 1) if there is no entry in the index <= the offset, delete everything
       * 2) if there is an entry for this exact offset, delete it and everything larger than it
       * 3) if there is no entry for this offset, delete everything larger than the next smallest
       */
      val newEntries =
      //如果没有消息的位移值小于指定位移值,那么就直接从头开始
        if(slot < 0)
          0
        //  跳到执行的位移位置
        else if(relativeOffset(idx, slot) == offset - baseOffset)
          slot
        //  指定位移位置大于消息中所有位移,那么跳到消息位置中最大的一个的下一个位置
        else
          slot + 1
      // 执行位置跳转
      truncateToEntries(newEntries)
    
  
  1. 根据指定位移返回消息中的槽位。
  2. 如果返回的槽位小于零,说明没有消息位移小于指定位移,所以newEntries返回0。
  3. 如果指定位移在消息位移中,那么返回slot槽位。
  4. 如果指定位移位置大于消息中所有位移,那么跳到消息位置中最大的一个的下一个位置。

讲完了LogSegment之后,我们在来看看Log。

3.Log

3.1 Log 源码结构


Log.scala定义了 10 个类和对象,图中括号里的 C 表示 Class,O 表示 Object。

我们主要看的是Log类:

3.2 Log类的定义


/**
 * An append-only log for storing messages.
 *
 * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
 *
 * New log segments are created according to a configurable policy that controls the size in bytes or time interval
 * for a given segment.
 *
 * @param dir The directory in which log segments are created.
 * @param config The log configuration settings
 * @param logStartOffset The earliest offset allowed to be exposed to kafka client.
 *                       The logStartOffset can be updated by :
 *                       - user's DeleteRecordsRequest
 *                       - broker's log retention
 *                       - broker's log truncation
 *                       The logStartOffset is used to decide the following:
 *                       - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted.
 *                         It may trigger log rolling if the active segment is deleted.
 *                       - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset,
 *                         we make sure that logStartOffset <= log's highWatermark
 *                       Other activities such as log cleaning are not affected by logStartOffset.
 * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
 * @param scheduler The thread pool scheduler used for background actions
 * @param brokerTopicStats Container for Broker Topic Yammer Metrics
 * @param time The time instance used for checking the clock
 * @param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired
 * @param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
  *
  * param dir Log对应的磁盘目录,此目录下存放了每个LogSegment对应的日志文件和索引文件。
  * param config :  日志配置信息
  * param logStartOffset 日志段集合中第一个日志段的基础位移,也就是这个日志对象的基础位移logEndOffsetMetadata
  *                      The logStartOffset can be updated by :
  *                      - user's DeleteRecordsRequest
  *                      - broker's log retention
  *                      - broker's log truncation
  *                      The logStartOffset is used to decide the following:
  *                      - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted.
  *                        It may trigger log rolling if the active segment is deleted.
  *                      - Earliest offset of the log in response to ListOffsetRequest. To avoid OffsetOutOfRange exception after user seeks to earliest offset,
  *                        we make sure that logStartOffset <= log's highWatermark
  *                      Other activities such as log cleaning are not affected by logStartOffset.
  *
  * param recoveryPoint :  恢复的起始offset——即尚未被写入磁盘的第一个offset,指定恢复操作的起始offset, recoveryPoint 之前的Message已经刷新到磁盘上持久存储,而其后的消息则不一-定,
  *                     出现宕机时可能会丢失。所以只需要恢复recoveryPoint之后的消息即可。
  * param scheduler :  用于后台操作的一个调度器线程池。主要用于异步地删除日志段和日志段切分时使用
  * param brokerTopicStats Container for Broker Topic Yammer Metrics
  * param time :  提供时间服务的对象实例
  * param maxProducerIdExpirationMs The maximum amount of time to wait before a producer id is considered expired
  * param producerIdExpirationCheckIntervalMs How often to check for producer ids which need to be expired
  *
  *
  *
  * Log是对多个LogSegment对象的顺序组合,形成一个逻辑的日志。为了实现快速定位LogSegment, Log使用跳表( SkipList )对LogSegment进行管理。
  *
  * 在Log中,将每个LogSegment的baseOffset作为key, LogSegment 对象作为value,放人segments这个跳表中管理
  *
  * 向Log中追加消息时是顺序写人的,那么只有最后一个LogSegment能够进行写人操作,在其之前的所有LogSegment都不能写入数据。
  * 最后一个LogSegment使用Log.activeSegment()方法获取,即segments集合中最后一个元素,为了描述方便,我们将此Segment对象称为“activeSegment” 。
  * 随着数据的不断写入,当activeSegment的日志文件大小到达一定阈值时,就需要创建新的activeSegment, 之后追加的消息将写人新的activeSegment。
  *
 */
@threadsafe
class Log(@volatile var dir: File,
          @volatile var config: LogConfig,
          @volatile var logStartOffset: Long以上是关于kafkaKafka源码解析 - LogSegment以及Log初始化的主要内容,如果未能解决你的问题,请参考以下文章

kafkakafka获取消费组异常 EOFException: null & KeeperErrorCode

kafkaKafka中的动态配置源码分析

KafkaKafka Producer整体架构概述及源码分析

KafkaKafka 的 Broker 与 KafkaController 介绍

kafkakafka NoSuchElementException ArrayDeque getLast

KafkaKafka指标报告器 MetricsReporter ClusterResourceListener