kafka源码剖析之日志管理-LogManager

Posted 全栈攻城

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka源码剖析之日志管理-LogManager相关的知识,希望对你有一定的参考价值。

## 1 入口

```

/* start log manager */

        // 启动日志管理模块

        logManager = LogManager(config, zkUtils, brokerState, kafkaScheduler, time, brokerTopicStats)

        logManager.startup()


```

## 2 开启代码

```

/**

   *  Start the background threads to flush logs and do log cleanup

    *  启动后台线程来冲洗日志和日志清理 依赖多线程

   */

  def startup() {

    /* Schedule the cleanup task to delete old logs */

    if(scheduler != null) {

      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))

      scheduler.schedule("kafka-log-retention",

                         cleanupLogs _,

                         delay = InitialTaskDelayMs,

                         period = retentionCheckMs,

                         TimeUnit.MILLISECONDS)

      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))

      scheduler.schedule("kafka-log-flusher",

                         flushDirtyLogs _,

                         delay = InitialTaskDelayMs,

                         period = flushCheckMs,

                         TimeUnit.MILLISECONDS)

      scheduler.schedule("kafka-recovery-point-checkpoint",

                         checkpointRecoveryPointOffsets _,

                         delay = InitialTaskDelayMs,

                         period = flushRecoveryOffsetCheckpointMs,

                         TimeUnit.MILLISECONDS)

      scheduler.schedule("kafka-log-start-offset-checkpoint",

                         checkpointLogStartOffsets _,

                         delay = InitialTaskDelayMs,

                         period = flushStartOffsetCheckpointMs,

                         TimeUnit.MILLISECONDS)

      scheduler.schedule("kafka-delete-logs",

                         deleteLogs _,

                         delay = InitialTaskDelayMs,

                         period = defaultConfig.fileDeleteDelayMs,

                         TimeUnit.MILLISECONDS)

    }

    if(cleanerConfig.enableCleaner)

      cleaner.startup()

  }


```

## 3核心代码


![image.png](http://upload-images.jianshu.io/upload_images/1731949-3cfdfc76991bb2a5.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

### 3.1 相关配置信息

- 配置项log.cleaner.threads,默认值1.用于配置清理过期日志的线程个数(用于日志合并).


- 配置项log.cleaner.dedupe.buffer.size,默认值128MB,用于配置清理过期数据的内存缓冲区,这个用于数据清理时,选择的压缩方式时,用于对重复数据的清理排序内存,用于日志合并.


- 配置项log.cleaner.io.buffer.load.factor,默认值0.9,用于配置清理内存缓冲区的数据装载因子,主要是用于hash,这个因子越小,对桶的重复可能越小,但内存占用越大,用于日志合并.


- 配置项log.cleaner.io.buffer.size,默认值512KB,用于清理过期数据的IO缓冲区大小,用于日志合并.


- 配置项message.max.bytes,默认值1000012字节,用于设置单条数据的最大大小.


- 配置项log.cleaner.io.max.bytes.per.second,用于控制过期数据清理时的IO速度限制,默认不限制速度,用于日志合并.


- 配置项log.cleaner.backoff.ms,用于定时检查日志是否需要清理的时间间隔(这个主要是在日志合并时使用),默认是15秒.


- 配置项log.cleaner.enable,是否启用日志的定时清理,默认是启用.


- 配置项num.recovery.threads.per.data.dir,用于在启动时,用于日志恢复的线程个数,默认是1.


- 配置项log.flush.scheduler.interval.ms,用于检查日志是否被flush到磁盘,默认不检查.


- 配置项log.flush.offset.checkpoint.interval.ms,用于定时对partition的offset进行保存的时间间隔,默认值60000ms.


- 配置项log.retention.check.interval.ms,定期检查保留日志的时间间隔,默认值5分钟.


### 3.2 启动步骤zk 模块

 ```

 // 首先先在zk 读取日志  这块就不多解释了 

    val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,

      dedupeBufferSize = config.logCleanerDedupeBufferSize,

      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,

      ioBufferSize = config.logCleanerIoBufferSize,

      maxMessageSize = config.messageMaxBytes,

      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,

      backOffMs = config.logCleanerBackoffMs,

      enableCleaner = config.logCleanerEnable)


    new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,

      topicConfigs = topicConfigs,

      defaultConfig = defaultLogConfig,

      cleanerConfig = cleanerConfig,

      ioThreads = config.numRecoveryThreadsPerDataDir,

      flushCheckMs = config.logFlushSchedulerIntervalMs,

      flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,

      flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,

      retentionCheckMs = config.logCleanupIntervalMs,

      maxPidExpirationMs = config.transactionIdExpirationMs,

      scheduler = kafkaScheduler,

      brokerState = brokerState,

      time = time,

      brokerTopicStats = brokerTopicStats)

  }

```


### 3.3 启动运行流程


```

threadsafe

class LogManager(val logDirs: Array[File],

                 val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation

                 val defaultConfig: LogConfig,

                 val cleanerConfig: CleanerConfig,

                 ioThreads: Int,

                 val flushCheckMs: Long,

                 val flushRecoveryOffsetCheckpointMs: Long,

                 val flushStartOffsetCheckpointMs: Long,

                 val retentionCheckMs: Long,

                 val maxPidExpirationMs: Int,

                 scheduler: Scheduler,

                 val brokerState: BrokerState,

                 brokerTopicStats: BrokerTopicStats,

                 time: Time) extends Logging {

  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"

  val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"

  val LockFile = ".lock"

  val InitialTaskDelayMs = 30*1000


  private val logCreationOrDeletionLock = new Object

  private val logs = new Pool[TopicPartition, Log]()

  private val logsToBeDeleted = new LinkedBlockingQueue[Log]()

//  检查日志目录是否被创建,如果没有创建目录,同时检查目录是否有读写的权限.

  createAndValidateLogDirs(logDirs)

//  生成每个目录的.lock文件,并通过这个文件锁定这个目录.

  private val dirLocks = lockLogDirs(logDirs)

//  根据每个目录下的recovery-point-offset-checkpoint文件,生成出checkpoints的集合.这个用于定期更新每个partition的offset记录.

  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile)))).toMap

  private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile)))).toMap

//  根据每一个目录,生成一个线程池,线程池的大小是num.recovery.threads.per.data.dir配置的值,

//  读取每个目录下的topic-partitionid的目录,并根据zk中针对此topic的配置文件(或者默认的配置文件),通过offset-checkpoint中记录的此partition对应的offset,生成Log实例.并通过线程池来执行Log实例的加载,也就是日志的恢复.


  loadLogs()


  // public, so we can access this from kafka.admin.DeleteTopicTest

  val cleaner: LogCleaner =

    if(cleanerConfig.enableCleaner)

      new LogCleaner(cleanerConfig, logDirs, logs, time = time)

    else

      null

```


### 3.4 清理过期日志

 

 ```

/**

   * Runs through the log removing segments older than a certain age

   */

  private def cleanupExpiredSegments(log: Log): Int = {

    if (log.config.retentionMs < 0)

      return 0

    val startMs = time.milliseconds

    log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)

  }

```

这块又涉及到一个配置:retention.ms,这个参数表示日志保存的时间。如果小于0,表示永不失效,也就没有了删除这一说。


当然,如果文件的修改时间跟当前时间差,大于设置的日志保存时间,就要执行删除动作了。具体的删除方法为:


```

  /**

   * Delete any log segments matching the given predicate function,

   * starting with the oldest segment and moving forward until a segment doesn't match.

   * @param predicate A function that takes in a single log segment and returns true iff it is deletable

   * @return The number of segments deleted

   */

  def deleteOldSegments(predicate: LogSegment => Boolean): Int = {

    lock synchronized {

      //find any segments that match the user-supplied predicate UNLESS it is the final segment

      //and it is empty (since we would just end up re-creating it)

      val lastEntry = segments.lastEntry

      val deletable =

        if (lastEntry == null) Seq.empty

        else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))

      val numToDelete = deletable.size

      if (numToDelete > 0) {

        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first

        if (segments.size == numToDelete)

          roll()

        // remove the segments for lookups

        deletable.foreach(deleteSegment(_))

      }

      numToDelete

    }

  }

```

这块的逻辑是:根据传入的predicate来判断哪些日志符合被删除的要求,放入到deletable中,最后遍历deletable,进行删除操作。


  ```

private def deleteSegment(segment: LogSegment) {

    info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))

    lock synchronized {

      segments.remove(segment.baseOffset)

      asyncDeleteSegment(segment)

    }

  }

  

    private def asyncDeleteSegment(segment: LogSegment) {

    segment.changeFileSuffixes("", Log.DeletedFileSuffix)

    def deleteSeg() {

      info("Deleting segment %d from log %s.".format(segment.baseOffset, name))

      segment.delete()

    }

    scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)

  }


```


这块是一个异步删除文件的过程,包含一个配置:file.delete.delay.ms。表示每隔多久删除一次日志文件。删除的过程是先把日志的后缀改为.delete,然后定时删除。


### 3.5 清理过大日志


 ```

 /**

   *  Runs through the log removing segments until the size of the log

   *  is at least logRetentionSize bytes in size

   */

  private def cleanupSegmentsToMaintainSize(log: Log): Int = {

    if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)

      return 0

    var diff = log.size - log.config.retentionSize

    def shouldDelete(segment: LogSegment) = {

      if(diff - segment.size >= 0) {

        diff -= segment.size

        true

      } else {

        false

      }

    }

    log.deleteOldSegments(shouldDelete)

  }

```

这块代码比较清晰,如果日志大小大于retention.bytes,那么就会被标记为待删除,然后调用的方法是一样的,也是deleteOldSegments。就不赘述了。


### 3.6 定期对log的磁盘缓冲区进行flush:

这个通过后台的调度组件定期去执行LogManager中的flushDirtyLogs的函数,


这个函数中迭代所有的partition的log,并执行flush的操作,这个操作中通过当前最后一个offset找到上一次进行checkpoint的offset与当前的offset中间的segment,并执行segment中log与index的flush操作.对应log文件执行文件管道的force函数,对于index文件,执行文件管道map的force函数.


``` 

private def flushDirtyLogs() = {

  debug("Checking for dirty logs to flush...")


  for ((topicAndPartition, log) <- logs) {

    try {

      val timeSinceLastFlush = time.milliseconds - log.lastFlushTime

      debug("Checking if flush is needed on " + topicAndPartition.topic 


           + " flush interval  " + log.config.flushMs +

            " last flushed " + log.lastFlushTime + " time since last flush: " 


           + timeSinceLastFlush)

      if(timeSinceLastFlush >= log.config.flushMs)

        log.flush

    } catch {

      case e: Throwable =>

        error("Error flushing topic " + topicAndPartition.topic, e)

    }

  }

}


```

 


### 3.7 定期对partition的offset进行checkpoint操作:

这个通过后台的调度组件定期去


   执行LogManager中的checkpointRecoveryPointOffsets的函数,


```

def checkpointRecoveryPointOffsets() {

  this.logDirs.foreach(checkpointLogsInDir)

}

```


这里对每个dir中存储的partition的最后一个offset进行checkpoint的操作.


在这个函数中,迭代每个dir中对应的partition的offset记录到对应目录下的checkpoint文件中.


第一行写入一个0,表示是checkpoint文件的版本.


第二行写入的是partition的个数,当前checkpoint时,这个dir已经存在数据的partition的个数.


后面对应第二个的值个数的条数的数据,每条数据写入topic partition offset的值.


```

private def checkpointLogsInDir(dir: File): Unit = {

  val recoveryPoints = this.logsByDir.get(dir.toString)

  if (recoveryPoints.isDefined) {

    this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(


        _.recoveryPoint))

  }

}



``` 


LogCleaner实例中,定期执行的日志压缩:

这个实例中,通过CleanerThread的线程进行处理:



1. 配置项log.cleaner.io.max.bytes.per.second,用于控制这个线程操作的IO速度,默认不控制速度

1. 配置项log.cleaner.dedupe.buffer.size,默认值128MB,用于配置清理过期数据的内存缓冲区,这个用于数据清理时,选择的压缩方式时,用于对重复数据的清理排序内存.

1. 配置项log.cleaner.threads,默认值1.用于配置清理过期日志的线程个数.

1. 配置项log.cleaner.backoff.ms,用于定时检查日志是否需要清理的时间间隔,默认是15秒.

1.


以上是关于kafka源码剖析之日志管理-LogManager的主要内容,如果未能解决你的问题,请参考以下文章

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之实例化与启动 GroupCoordinator

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之实例化与启动 GroupCoordinator

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之实例化与启动 GroupCoordinator

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR