kafka源码ReassignPartitionsCommand源码原理分析(附配套教学视频)

Posted 石臻臻的杂货铺

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka源码ReassignPartitionsCommand源码原理分析(附配套教学视频)相关的知识,希望对你有一定的参考价值。

日常运维问题排查 怎么能够少了滴滴开源的

滴滴开源LogiKM一站式Kafka监控与管控平台

(后续的视频会在 公众号[全套视频首发]、CSDN、B站等各平台同名号[石臻臻的杂货铺]上上传 )

【kafka源码】分区副本重分配源码分析(上)

【kafka源码】分区副本重分配源码分析(下)

阿B: 石臻臻的杂货铺

文章目录

1.脚本的使用

请看 【kafka运维】副本扩缩容、数据迁移、分区重分配

2.源码解析

如果阅读源码太枯燥,可以直接跳转到 源码总结和Q&A部分

2.1--generate 生成分配策略分析

配置启动类--zookeeper xxxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list "0,1,2,3" --generate

配置move-json-file.json文件

启动,调试:
ReassignPartitionsCommand.generateAssignment

  1. 获取入参的数据
  2. 校验--broker-list传入的BrokerId是否有重复的,重复就报错
  3. 开始进行分配

ReassignPartitionsCommand.generateAssignment

  def generateAssignment(zkClient: KafkaZkClient, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]]) = 
    //解析出游哪些Topic
    val topicsToReassign = parseTopicsData(topicsToMoveJsonString)
    //检查是否有重复的topic
    val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
    if (duplicateTopicsToReassign.nonEmpty)
      throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
      //获取topic当前的副本分配情况 /brokers/topics/topicName
    val currentAssignment = zkClient.getReplicaAssignmentForTopics(topicsToReassign.toSet)

    val groupedByTopic = currentAssignment.groupBy  case (tp, _) => tp.topic 
    //机架感知模式
    val rackAwareMode = if (disableRackAware) RackAwareMode.Disabled else RackAwareMode.Enforced
    val adminZkClient = new AdminZkClient(zkClient)
    val brokerMetadatas = adminZkClient.getBrokerMetadatas(rackAwareMode, Some(brokerListToReassign))

    val partitionsToBeReassigned = mutable.Map[TopicPartition, Seq[Int]]()
    groupedByTopic.foreach  case (topic, assignment) =>
      val (_, replicas) = assignment.head
      val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
      partitionsToBeReassigned ++= assignedReplicas.map  case (partition, replicas) =>
        new TopicPartition(topic, partition) -> replicas
      
    
    (partitionsToBeReassigned, currentAssignment)
  
  1. 检查是否有重复的topic,重复则抛出异常
  2. 从zk节点/brokers/topics/topicName获取topic当前的副本分配情况
  3. 从zk节点brokers/ids中获取所有在线节点,并跟--broker-list参数传入的取个交集
  4. 获取Brokers元数据,如果机架感知模式RackAwareMode.Enforced(默认)&&上面3中获取到的交集列表brokers不是都有机架信息或者都没有机架信息的话就抛出异常; 因为要根据机架信息做分区分配的话,必须要么都有机架信息,要么都没有机架信息; 出现这种情况怎么办呢? 那就将机架感知模式RackAwareMode设置为RackAwareMode.Disabled ;只需要加上一个参数--disable-rack-aware就行了
  5. 调用AdminUtils.assignReplicasToBrokers 计算分配情况;

    我们在【kafka源码】创建Topic的时候是如何分区和副本的分配规则里面分析过就不再赘述了, AdminUtils.assignReplicasToBrokers(要分配的Broker们的元数据, 分区数, 副本数)
    需要注意的是副本数是通过assignment.head.replicas.size获取的,意思是第一个分区的副本数量,正常情况下分区副本都会相同,但是也不一定,也可能被设置为了不同

根据这条信息我们是不是就可以直接调用这个接口来实现其他功能? 比如副本的扩缩容

2.2--execute 执行阶段分析

使用脚本执行
--zookeeper xxx --reassignment-json-file config/reassignment-json-file.json --execute --throttle 10000

ReassignPartitionsCommand.executeAssignment

  def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L): Unit = 
    //对json文件进行校验和解析
    val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)
    val adminZkClient = new AdminZkClient(zkClient)
    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient)

     //检查是否已经存在副本重分配进程, 则尝试限流
     if (zkClient.reassignPartitionsInProgress())       
      reassignPartitionsCommand.maybeLimit(throttle)
     else 
      //打印当前的副本分配方式,方便回滚
      printCurrentAssignment(zkClient, partitionAssignment.map(_._1.topic))
      if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)
        println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
        //开始进行重分配进程
      if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs)) 
        println("Successfully started reassignment of partitions.")
       else
        println("Failed to reassign partitions %s".format(partitionAssignment))
    
  
  1. 解析json文件并做些校验
    1. (partition、replica非空校验,partition重复校验)
    2. 校验partition是否有不存在的分区;(新增分区请用kafka-topic)
    3. 检查配置中的Brokers-id是否都存在
  2. 如果发现已经存在副本重分配进程(检查是否有节点/admin/reassign_partitions),则检查是否需要更改限流; 如果有参数(--throttle,--replica-alter-log-dirs-throttle) 则设置限流信息; 而后不再执行下一步
  3. 如果当前没有执行中的副本重分配任务(检查是否有节点/admin/reassign_partitions),则开始进行副本重分配任务;

2.2.1 已有任务,尝试限流

如果zk中有节点/admin/reassign_partitions; 则表示当前已有一个任务在进行,那么当前操作就不继续了,如果有参数
--throttle:
--replica-alter-log-dirs-throttle:
则进行限制

限制当前移动副本的节流阀。请注意,此命令可用于更改节流阀,但如果某些代理已完成重新平衡,则它可能不会更改最初设置的所有限制。所以后面需要将这个限制给移除掉 通过--verify

maybeLimit

  def maybeLimit(throttle: Throttle): Unit = 
    if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0) 
      //当前存在的broker
      val existingBrokers = existingAssignment().values.flatten.toSeq
      //期望的broker
      val proposedBrokers = proposedPartitionAssignment.values.flatten.toSeq ++ proposedReplicaAssignment.keys.toSeq.map(_.brokerId())
      //前面broker相加去重
      val brokers = (existingBrokers ++ proposedBrokers).distinct

      //遍历与之相关的Brokers, 添加限流配置写入到zk节点/config/broker/brokerId中 
      for (id <- brokers) 
        //获取broker的配置 /config/broker/brokerId
        val configs = adminZkClient.fetchEntityConfig(ConfigType.Broker, id.toString)
        if (throttle.interBrokerLimit >= 0) 
          configs.put(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.interBrokerLimit.toString)
          configs.put(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.interBrokerLimit.toString)
        
        if (throttle.replicaAlterLogDirsLimit >= 0)
          configs.put(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, throttle.replicaAlterLogDirsLimit.toString)

        adminZkClient.changeBrokerConfig(Seq(id), configs)
      
    
  

/config/brokers/brokerId节点配置是Broker端的动态配置,不需要重启Broker实时生效;

  1. 如果传入了参数--throttle: 则从zk节点/config/brokers/BrokerId节点获取Broker们的配置信息,然后再加上以下两个配置重新写入到节点/config/brokers/BrokerId
    leader.replication.throttled.rate 控制leader副本端处理FETCH请求的速率
    follower.replication.throttled.rate 控制follower副本发送FETCH请求的速率
  2. 如果传入了参数--replica-alter-log-dirs-throttle: 则将如下配置也写入节点中;
    replica.alter.log.dirs.io.max.bytes.per.second: broker内部目录之间迁移数据流量限制功能,限制数据拷贝从一个目录到另外一个目录带宽上限

例如写入之后的数据

"version":1,"config":"leader.replication.throttled.rate":"1","follower.replication.throttled.rate":"1"

注意: 这里写入的限流配置,是写入所有与之相关的Broker的限流配置;

2.2.2 当前未有执行任务,开始执行副本重分配任务

ReassignPartitionsCommand.reassignPartitions

  def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean = 
    //写入一些限流数据
    maybeThrottle(throttle)
    try 
     //验证分区是否存在
      val validPartitions = proposedPartitionAssignment.groupBy(_._1.topic())
        .flatMap  case (topic, topicPartitionReplicas) =>
          validatePartition(zkClient, topic, topicPartitionReplicas)
        
      if (validPartitions.isEmpty) false
      else 
        if (proposedReplicaAssignment.nonEmpty && adminClientOpt.isEmpty)
          throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory")
        val startTimeMs = System.currentTimeMillis()

        // Send AlterReplicaLogDirsRequest to allow broker to create replica in the right log dir later if the replica has not been created yet.
        if (proposedReplicaAssignment.nonEmpty)
          alterReplicaLogDirsIgnoreReplicaNotAvailable(proposedReplicaAssignment, adminClientOpt.get, timeoutMs)

        // Create reassignment znode so that controller will send LeaderAndIsrRequest to create replica in the broker
        zkClient.createPartitionReassignment(validPartitions.map(case (key, value) => (new TopicPartition(key.topic, key.partition), value)).toMap)

        // Send AlterReplicaLogDirsRequest again to make sure broker will start to move replica to the specified log directory.
        // It may take some time for controller to create replica in the broker. Retry if the replica has not been created.
        var remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis()
        val replicasAssignedToFutureDir = mutable.Set.empty[TopicPartitionReplica]
        while (remainingTimeMs > 0 && replicasAssignedToFutureDir.size < proposedReplicaAssignment.size) 
          replicasAssignedToFutureDir ++= alterReplicaLogDirsIgnoreReplicaNotAvailable(
            proposedReplicaAssignment.filter  case (replica, _) => !replicasAssignedToFutureDir.contains(replica) ,
            adminClientOpt.get, remainingTimeMs)
          Thread.sleep(100)
          remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis()
        
        replicasAssignedToFutureDir.size == proposedReplicaAssignment.size
      
     catch 
      case _: NodeExistsException =>
        val partitionsBeingReassigned = zkClient.getPartitionReassignment()
        throw new AdminCommandFailedException("Partition reassignment currently in " +
          "progress for %s. Aborting operation".format(partitionsBeingReassigned))
    
  
  1. maybeThrottle(throttle) 设置副本移动时候的限流配置,这个方法只用于任务初始化的时候

      private def maybeThrottle(throttle: Throttle): Unit = 
        if (throttle.interBrokerLimit >= 0)
          assignThrottledReplicas(existingAssignment(), proposedPartitionAssignment, adminZkClient)
        maybeLimit(throttle)
        if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)
          throttle.postUpdateAction()
        if (throttle.interBrokerLimit >= 0)
          println(s"The inter-broker throttle limit was set to $throttle.interBrokerLimit B/s")
        if (throttle.replicaAlterLogDirsLimit >= 0)
          println(s"The replica-alter-dir throttle limit was set to $throttle.replicaAlterLogDirsLimit B/s")
      
    
    

    1.1 将一些topic的限流配置写入到节点/config/topics/topicName

    将计算得到的leader、follower 值写入到/config/topics/topicName
    leader: 找到 TopicPartition中有新增的副本的 那个分区;数据= 分区号:副本号,分区号:副本号
    follower: 遍历 预期 TopicPartition,副本= 预期副本-现有副本;数据= 分区号:副本号,分区号:副本号
    leader.replication.throttled.replicas: leader
    follower.replication.throttled.replicas: follower

    1.2. 执行 《2.2.1 已有任务,尝试限流》流程

  2. 从zk中获取/broker/topics/topicName数据来验证给定的分区是否存在,如果分区不存在则忽略此分区的配置,继续流程

  3. 如果Json文件中存在指定Log Dir的情况,则发送 AlterReplicaLogDirsRequest 以允许代理稍后在正确的日志目录中创建副本。
    比如:log_dirs 指定了文件存放目录

"version":1,"partitions":["topic":"Topic1","partition":2,"replicas":[1],"log_dirs":["/Users/shirenchuang/work/IdeaPj/didi_source/kafka/k0"]]

那么 AlterReplicaLogDirsRequest 请求就会先去创建对应的副本。具体的 跨目录数据迁移请看跨目录数据迁移

  1. 将重分配的数据写入到zk的节点/admin/reassign_partitions中;数据内容如:
    "version":1,"partitions":["topic":"test_create_topic1","partition":0,"replicas":[0,1,2,3],"topic":"test_create_topic1","partition":1,"replicas":[1,2,0,3],"topic":"test_create_topic1","partition":2,"replicas":[2,1,0,3]]
    
  2. 再次发送 AlterReplicaLogDirsRequest以确保代理将开始将副本移动到指定的日志目录。控制器在代理中创建副本可能需要一些时间。如果尚未创建副本,请重试。
    1. 像Broker发送alterReplicaLogDirs请求

2.2.3 Controller监听/admin/reassign_partitions节点变化

KafkaController.processZkPartitionReassignment

private def processZkPartitionReassignment(): Set[TopicPartition] = 
    // We need to register the watcher if the path doesn't exist in order to detect future
    // reassignments and we get the `path exists` check for free
    if (isActive && zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) 
      val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
      val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]

      zkClient.getPartitionReassignment().foreach  case (tp, targetReplicas) =>
        maybeBuildReassignment(tp, Some(targetReplicas)) match 
          case Some(context) => partitionsToReassign.put(tp, context)
          case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
        
      

      reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign)
      val (partitionsReassigned, partitionsFailed) = reassignmentResults.partition(_._2.error == Errors.NONE)
      if (partitionsFailed.nonEmpty) 
        warn(s"Failed reassignment through zk with the following errors: $partitionsFailed")
        maybeRemoveFromZkReassignment((tp, _) => partitionsFailed.contains(tp))
      
      partitionsReassigned.keySet
     else 
      Set.empty
    
  

  1. 判断是否是Controller角色并且是否存在节点/admin/reassign_partitions
  2. maybeTriggerPartitionReassignment 重分配,如果topic已经被标记为删除了,则此topic流程终止;
  3. maybeRemoveFromZkReassignment将执行失败的一些分区信息从zk中删除;(覆盖信息)
onPartitionReassignment

KafkaController.onPartitionReassignment

private def onPartitionReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit = 
    // 暂停一些正在删除的Topic操作
   topicDeletionManager.markTopicIneligibleForDeletion(Set(topicPartition.topic), reason = "topic reassignment in progress")
    //更新当前的分配
    updateCurrentReassignment(topicPartition, reassignment)

    val addingReplicas = reassignment.addingReplicas
    val removingReplicas = reassignment.removingReplicas

    if (!isReassignmentComplete(topicPartition, reassignment)) 
      // A1. Send LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR).
      updateLeaderEpochAndSendRequest(topicPartition, reassignment)
      // A2. replicas in AR -> NewReplica
      startNewReplicasForReassignedPartition(topicPartition, addingReplicas)
     else 
      // B1. replicas in AR -> OnlineReplica
      replicaStateMachine.handleStateChanges(addingReplicas.map(PartitionAndReplica(topicPartition, _)), OnlineReplica)
      // B2. Set RS = TRS, AR = [], RR = [] in memory.
      val completedReassignment = ReplicaAssignment(reassignment.targetReplicas)
      controllerContext.updatePartitionFullReplicaAssignment(topicPartition, completedReassignment)
      // B3. Send LeaderAndIsr request with a potential new leader (if current leader not in TRS) and
      //   a new RS (using TRS) and same isr to every broker in ORS + TRS or TRS
      moveReassignedPartitionLeaderIfRequired(topicPartition, completedReassignment)
      // B4. replicas in RR -> Offline (force those replicas out of isr)
      // B5. replicas in RR -> NonExistentReplica (force those replicas to be deleted)
      stopRemovedReplicasOfReassignedPartition(topicPartition, removingReplicas)
      // B6. Update ZK with RS = TRS, AR = [], RR = [].
      updateReplicaAssignmentForPartition(topicPartition, completedReassignment)
      // B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it.
      removePartitionFromReassigningPartitions(topicPartition, completedReassignment)
      // B8. After electing a leader in B3, the replicas and isr information changes, so resend the update metadata request to every broker
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
      // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
      topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic))
    
  

  1. 暂停一些正在删除的Topic操作
  2. 更新 Zk节点brokers/topics/topicName,和内存中的当前分配状态。如果重新分配已经在进行中,那么新的重新分配将取代它并且一些副本将被关闭。
    2.1 更新zk中的topic节点信息brokers/topics/topicName,这里会标记AR哪些副本是新增的,RR哪些副本是要删除的;例如:

    2.2 更新当前内存
    2.3 如果重新分配已经在进行中,那么一些当前新增加的副本有可能被立即删除,在这种情况下,我们需要停止副本。
    2.4 注册一个监听节点/brokers/topics/topicName/partitions/分区号/state变更的处理器PartitionReassignmentIsrChangeHandler
  3. 如果该分区的重新分配还没有完成(根据/brokers/topics/topicName/partitions/分区号/state里面的isr来判断是否已经包含了新增的BrokerId了);则
    以下几个名称说明:
    ORS: OriginReplicas 原先的副本
    TRS: targetReplicas 将要变更成的目标副本
    AR: adding_replicas 正在添加的副本
    RR:removing_replicas 正在移除的副本
    3.1 向 ORS + TRS 中的每个副本发送LeaderAndIsr请求(带有新的 RS、AR 和 RR)。
    3.2 给新增加的AR副本 进行状态变更成NewReplica ; 这个过程有发送LeaderAndIsrRequest详细请看【kafka源码】Controller中的状态机

2.2.4 Controller监听节点brokers/topics/topicName变化,检查是否有新增分区

这一个流程可以不必在意,因为在这里没有做任何事情;

上面的 2.2.3 的第2小段中不是有将新增的和删掉的副本写入到了 zk中吗
例如:


"version":2,"partitions":"2":[0,1],"1":[0,1],"0":[0,1],"adding_replicas":"2":[1],"1":[1],"0":[1],"removing_replicas":

Controller监听到这个节点之后,执行方法processPartitionModifications
KafkaController.processPartitionModifications

  private def processPartitionModifications(topic: String): Unit = 
    def restorePartitionReplicaAssignment(以上是关于kafka源码ReassignPartitionsCommand源码原理分析(附配套教学视频)的主要内容,如果未能解决你的问题,请参考以下文章

如何在windows下查看kafka源码

聊聊 Kafka:编译 Kafka 源码并搭建源码环境

聊聊 Kafka:编译 Kafka 源码并搭建源码环境

kafka源码分析 消费消息

kafka源码解读

kafka源码分析之一server启动分析