kafka源码走读-controller (创建topic过程)

Posted hudeqi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka源码走读-controller (创建topic过程)相关的知识,希望对你有一定的参考价值。

    晚上刚刚被媳妇骂,难过之余,还是要坚持继续写一篇kafka源码走读的博客,心情难过,原谅我开头发下牢骚。。。

    源码版本依然是0.10.2.1,我们都知道,kafka在0.8版本前没有提供Partition的Replication机制,一旦Broker宕机,其上的所有Partition就都无法提供服务,而Partition又没有备份数据,数据的可用性就大大降低了,所以0.8后提供了Replication机制来保证Broker的failover,而controller则是实现副本机制的核心。

    controller要实现副本机制,它是极度依赖于zookeeper(以下简称zk)服务的,简单来说就是:利用zk来监听zk目录,一旦的监听到发生变化,在controller里实现的监听器的处理逻辑就会被触发,controller再将处理结果分发给相关的或所有的brokers,这样一来所有的brokers就会都知道发生的改变以及处理结果。可能还是有些抽象,接下来就以创建topic的过程为例,分析一下zk、controller、broker的server端发生的处理逻辑。

1. 常用名词解释:

  • leader:副本中的主副本,producer和consumer都是直接与leader进行交互的。
  • follower:所有副本中除了leader的其它副本,它不断地从leader来fetch数据以保持数据同步,一旦leader挂掉,会从剩下的follower中选出一个所为新的leader,数据可靠性的保证。
  • AR:是一个partition的所有副本的集合。OAR是之前的副本集合,RAR是重新分配的副本集合。
  • ISR:follower们向leader进行数据同步时会有一定的延迟,如果延迟在设定的延迟阈值里,那么该副本就属于isr(俗称进队),所以isr是leader和那些进队的follower。
  • OSR:与ISR相反,如果延迟超过阈值,那么就属于osr(俗称掉队)。因此,AR=ISR+OSR。
  • LEO:消息生产的最高位置,也就是leader最后一条消息的offset值。
  • HW:高水位的简称,表达的含义就是consumer从leader最多能消费的位置,因为producer是直接和leader进行交互的,当一条消息被append到leader上,但isr里的follower还没有将该条消息fetch过去,那么此时如果leader挂掉,可能面临着数据丢失消费不到的问题,只有当isr里的所有副本都同步完该条消息,才会将leader的hw值更新。

    还是不多说废话,我们直接上代码吧。

2. 代码走读:

    创建topic的命令一定不会陌生: ~/software/kafka/bin/kafka-topics.sh --topic xx_log --replication-factor 3 --partitions 240 --create --zookeeper 127.0.0.1:2181,首先从解析这条命令开始:

技术图片
 1 def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
 2     val topic = opts.options.valueOf(opts.topicOpt)
 3     val configs = parseTopicConfigsToBeAdded(opts)
 4     val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
 5     if (Topic.hasCollisionChars(topic))
 6       println("WARNING: Due to limitations in metric names, topics with a period (‘.‘) or underscore (‘_‘) could collide. To avoid issues it is best to use either, but not both.")
 7     try {
 8       if (opts.options.has(opts.tagOpt)) {
 9         val tag = opts.options.valueOf(opts.tagOpt).toString
10         if (zkUtils.checkTagExist(tag)) {
11           if (opts.options.has(opts.replicaAssignmentOpt)) {  // 创建时指定tp在哪个broker上
12             val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
13             AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false, tag = tag)
14           } else {
15             CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt, opts.tagOpt)
16             val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
17             val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
18             // 默认不会关闭机架感知副本策略
19             val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
20             else RackAwareMode.Enforced
21             AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode, tag = tag)
22           }
23           println("Created topic "%s".".format(topic))
24         } else {
25           println(s"Tag ‘$tag‘ not exists.")
26         }
27       } else {
28         if (opts.options.has(opts.replicaAssignmentOpt)) {
29           val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
30           AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
31         } else {
32           CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
33           val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
34           val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
35           val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
36           else RackAwareMode.Enforced
37           AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
38         }
39         println("Created topic "%s".".format(topic))
40       }
41     } catch  {
42       case e: TopicExistsException => if (!ifNotExists) throw e
43     }
44   }
View Code

可以看到首先会解析这条创建topic的命令,一般不会在创建时就指定哪些tp在哪个broker上,这个过程一般默认靠特定的分配算法。也默认不会关闭机架感知副本策略,因为通过将副本分布在不同的机架位上,也是处于高可用的考虑上。核心是AdminUtils.createTopic,进去看下干了啥。

技术图片
 1 def createTopic(zkUtils: ZkUtils,
 2                   topic: String,
 3                   partitions: Int,
 4                   replicationFactor: Int,
 5                   topicConfig: Properties = new Properties,
 6                   rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
 7                   tag: String = "") {
 8     //1、创建默认quota
 9     createDefaultGroupQuota(zkUtils,topic)
10 
11     //2、然后再去创建topic
12     var currentTag = tag
13     var currentrReplicationFactor = replicationFactor
14     info("createTopic topic=" + topic + ", tag=" + tag + ", partitions=" + partitions + ", replicationFactor=" + replicationFactor)
15     if (Topic.GroupMetadataTopicName.equals(topic)) {
16       val brokers = zkUtils.getLiveBrokersInTag(Topic.GroupMetadataTagName)
17       if (!brokers.isEmpty) {
18         currentTag = Topic.GroupMetadataTagName
19         currentrReplicationFactor = Math.min(brokers.size, replicationFactor)
20       }
21     } else {
22       if (currentTag != "") {
23         if (! zkUtils.checkTagExist(currentTag)) {
24           throw new AdminOperationException("Tag " + currentTag + " not exists.")
25         }
26       }
27     }
28     val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode, tag = currentTag)
29     // 根据是否有机架感知信息,来对tp进行broker上的分配
30     val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, currentrReplicationFactor)
31     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig, tag = currentTag)
32   }
View Code

核心是最后两行,首先计算要创建的topic的tp如何分配在哪些brokers上,然后将分配结果写入zk。tp分配的算法不是本文的重点,它的基本思想就是取模轮询计算,使得所有tp尽可能分布在不同的broker和机架位上。重点是写zk部分,进去看一下。

技术图片
 1 def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
 2                                                      topic: String,
 3                                                      partitionReplicaAssignment: Map[Int, Seq[Int]],
 4                                                      config: Properties = new Properties,
 5                                                      update: Boolean = false,
 6                                                      tag: String = "") {
 7     // 根据参数判断这个create是否合法
 8     validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update)
 9 
10     // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
11     if (!update) {
12       // write out the config if there is any, this isn‘t transactional(有关的) with the partition assignments
13       // 写zk过程-/config/topic/topicName目录
14       writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config)
15     }
16 
17     // create the partition assignment
18     // 写zk过程-topic下的层级
19     writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update, tag = tag)
20   }
View Code

可以看到,在判断这个create topic是否合法之后,就开始写zk了,首先写/config/topic/topicName目录,然后写写zk过程-topic下的层级,具体目录路径读者可继续深入。到这里,通过创建topic的command对zk的操作就完成了,由于监听zk的监听器存在,就会触发感知controller,具体开始注册zk监听的过程是在选举controller的过程中完成的,首先目光转到controller层,先看下针对于topic目录的监听器的实现:

技术图片
 1 /**
 2    * This is the zookeeper listener that triggers all the state transitions for a partition
 3    */
 4   class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {
 5 
 6     protected def logName = "TopicChangeListener"
 7 
 8     // zk写完topic的目录后会被感知,是topic创建后controller反应的入口
 9     def doHandleChildChange(parentPath: String, children: Seq[String]) {
10       inLock(controllerContext.controllerLock) {
11         if (hasStarted.get) {
12           try {
13             val currentChildren = {
14               debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
15               children.toSet
16             }
17             val newTopics = currentChildren -- controllerContext.allTopics
18             val deletedTopics = controllerContext.allTopics -- currentChildren
19             controllerContext.allTopics = currentChildren
20 
21             val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
22             // 需要更新上下文信息,才能完成之后的主副本选举过程
23             controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
24               !deletedTopics.contains(p._1.topic))
25             controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
26             info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
27               deletedTopics, addedPartitionReplicaAssignment))
28             if (newTopics.nonEmpty)
29               controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
30           } catch {
31             case e: Throwable => error("Error while handling new topic", e)
32           }
33         }
34       }
35     }
36   }
View Code

可以看到,这是个监听集群topic变化的监听器,它首先会通过zk目录的变化计算出新创建的topic以及删除的topic有哪些,在分析创建topic的时候,我们可以先只关注新创建的topic。然后获取新创建topic的tp的assignment,并更新controllerContext.partitionReplicaAssignment,至于为什么要更新这个值,后面会介绍,暂且记作mark1吧。然后就是调用onNewTopicCreation。

1   def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
2     info("New topic creation callback for %s".format(newPartitions.mkString(",")))
3     // subscribe to partition changes
4     // 先给新创建的topic添加tp change的listener
5     topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
6     onNewPartitionCreation(newPartitions)
7   }

可以看到里面首先给新创建的topic添加tp change的listener,然后对partition进行onNewPartitionCreation。

 1 /**
 2    * This callback is invoked by the topic change callback with the list of failed brokers as input.
 3    * It does the following -
 4    * 1. Move the newly created partitions to the NewPartition state
 5    * 2. Move the newly created partitions from NewPartition->OnlinePartition state
 6    */
 7   def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
 8     info("New partition creation callback for %s".format(newPartitions.mkString(",")))
 9     // 将所有tp的state转为NewPartition
10     partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
11     // 将所有replica的state转为NewReplica
12     replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
13     // 将所有tp的state转为online上线
14     partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
15     // 将所有replica的state转为online上线
16     replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
17   }

这四步就是创建topic的tp的四大步,简要概括就是针对于partition和replica有各自对应的状态机,先将所有partition和replica的state初始化为NewPartition,然后再分别online上线。我们再针对每一步去做细致的分析,首先是partitionStateMachine.handleStateChanges(newPartitions, NewPartition):

技术图片
 1 /**
 2    * This API is invoked by the partition change zookeeper listener
 3    * @param partitions   The list of partitions that need to be transitioned to the target state
 4    * @param targetState  The state that the partitions should be moved to
 5    */
 6   def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
 7                          leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
 8                          callbacks: Callbacks = (new CallbackBuilder).build) {
 9     debug("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
10     try {
11       brokerRequestBatch.newBatch()
12       partitions.foreach { topicAndPartition =>
13         // 给每个tp改了状态,分好了leader和isr,还写了zk,为下面的给server端发leaderAndIsr请求和update metadata请求做铺垫
14         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
15       }
16       // controller给brokers发leaderAndIsr请求和update metadata请求
17       brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
18     }catch {
19       case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
20       // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
21     }
22   }
View Code

其中的两大步的中文注释在这一步先不用看,因为对于partition和replica的状态机变化的方法是复用的,注释是针对partition上线,对每个partition的state进行初始化的过程如下:

技术图片
 1 private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
 2                                 leaderSelector: PartitionLeaderSelector,
 3                                 callbacks: Callbacks) {
 4     val topicAndPartition = TopicAndPartition(topic, partition)
 5     if (!hasStarted.get)
 6       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
 7                                             "the partition state machine has not started")
 8                                               .format(controllerId, controller.epoch, topicAndPartition, targetState))
 9     // 初始给每个tp都置为NonExistentPartition状态,为了下面进行validation
10     val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
11     try {
12       targetState match {
13         case NewPartition =>
14           // pre: partition did not exist before this
15           assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
16           partitionState.put(topicAndPartition, NewPartition)
17           val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
18           stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
19                                     .format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
20                                             assignedReplicas))
View Code

可以看到这一步状态更改很简单,只是单纯的在partitionState里添加新partition的状态,为NewPartition,然后出来,看看brokerRequestBatch.sendRequestsToBrokers的实现:

技术图片
  1 def sendRequestsToBrokers(controllerEpoch: Int) {
  2     try {
  3       leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
  4         partitionStateInfos.foreach { case (topicPartition, state) =>
  5           val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
  6           stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
  7                                    "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
  8                                                                    state.leaderIsrAndControllerEpoch, broker,
  9                                                                    topicPartition.topic, topicPartition.partition))
 10         }
 11         val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
 12         val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
 13           _.getNode(controller.config.interBrokerListenerName)
 14         }
 15         val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
 16           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
 17           val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
 18             leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
 19             partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
 20           topicPartition -> partitionState
 21         }
 22         val leaderAndIsrRequest = new LeaderAndIsrRequest.
 23             Builder(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
 24         // leaderAndIsrRequest只会给涉及到的brokers发送
 25         controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, null)
 26       }
 27       leaderAndIsrRequestMap.clear()
 28 
 29       // 必须先处理leaderAndIsrRequest,再处理updateMetadataRequest
 30       updateMetadataRequestPartitionInfoMap.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
 31         "to brokers %s for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
 32         updateMetadataRequestBrokerSet.toString(), p._1)))
 33       val partitionStates = updateMetadataRequestPartitionInfoMap.map { case (topicPartition, partitionStateInfo) =>
 34         val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
 35         val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
 36           leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
 37           partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
 38         topicPartition -> partitionState
 39       }
 40 
 41       val version: Short =
 42         if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
 43         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2
 44         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
 45         else 0
 46 
 47       val updateMetadataRequest = {
 48         val liveBrokers = if (version == 0) {
 49           // Version 0 of UpdateMetadataRequest only supports PLAINTEXT.
 50           controllerContext.liveOrShuttingDownBrokers.map { broker =>
 51             val securityProtocol = SecurityProtocol.PLAINTEXT
 52             val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
 53             val node = broker.getNode(listenerName)
 54             val endPoints = Seq(new EndPoint(node.host, node.port, securityProtocol, listenerName))
 55             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
 56           }
 57         } else {
 58           controllerContext.liveOrShuttingDownBrokers.map { broker =>
 59             val endPoints = broker.endPoints.map { endPoint =>
 60               new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName)
 61             }
 62             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
 63           }
 64         }
 65         new UpdateMetadataRequest.Builder(
 66           controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava).
 67           setVersion(version)
 68       }
 69 
 70       // 给所有存活的brokers发updateMetadataRequest
 71       updateMetadataRequestBrokerSet.foreach { broker =>
 72         controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, updateMetadataRequest, null)
 73       }
 74       updateMetadataRequestBrokerSet.clear()
 75       updateMetadataRequestPartitionInfoMap.clear()
 76 
 77       stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
 78         val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
 79         val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
 80         debug("The stop replica request (delete = true) sent to broker %d is %s"
 81           .format(broker, stopReplicaWithDelete.mkString(",")))
 82         debug("The stop replica request (delete = false) sent to broker %d is %s"
 83           .format(broker, stopReplicaWithoutDelete.mkString(",")))
 84 
 85         val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null)
 86 
 87         // Send one StopReplicaRequest for all partitions that require neither delete nor callback. This potentially
 88         // changes the order in which the requests are sent for the same partitions, but that‘s OK.
 89         val stopReplicaRequest = new StopReplicaRequest.Builder(controllerId, controllerEpoch, false,
 90           replicasToGroup.map(r => new TopicPartition(r.replica.topic, r.replica.partition)).toSet.asJava)
 91         controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest)
 92 
 93         replicasToNotGroup.foreach { r =>
 94           val stopReplicaRequest = new StopReplicaRequest.Builder(
 95               controllerId, controllerEpoch, r.deletePartition,
 96               Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
 97           controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest, r.callback)
 98         }
 99       }
100       stopReplicaRequestMap.clear()
101     } catch {
102       case e: Throwable =>
103         if (leaderAndIsrRequestMap.nonEmpty) {
104           error("Haven‘t been able to send leader and isr requests, current state of " +
105               s"the map is $leaderAndIsrRequestMap. Exception message: $e")
106         }
107         if (updateMetadataRequestBrokerSet.nonEmpty) {
108           error(s"Haven‘t been able to send metadata update requests to brokers $updateMetadataRequestBrokerSet, " +
109                 s"current state of the partition info is $updateMetadataRequestPartitionInfoMap. Exception message: $e")
110         }
111         if (stopReplicaRequestMap.nonEmpty) {
112           error("Haven‘t been able to send stop replica requests, current state of " +
113               s"the map is $stopReplicaRequestMap. Exception message: $e")
114         }
115         throw new IllegalStateException(e)
116     }
117   }
View Code

可以看到这步的操作主要是针对leaderAndIsrRequestMap和leaderAndIsrRequestMap进行操作的,细看这两个集合的操作完成之后都会clear,所以对于刚初始化的partition这两个集合是空的,什么也不做。到此partition的状态机转为newPartition就做完了,然后就是将所有replica的state转为NewReplica。

技术图片
 1 def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
 2                          callbacks: Callbacks = (new CallbackBuilder).build) {
 3     if(replicas.nonEmpty) {
 4       debug("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
 5       try {
 6         brokerRequestBatch.newBatch()
 7         replicas.foreach(r => handleStateChange(r, targetState, callbacks))
 8         // 已经将updateMetadataRequestBrokerSet和leaderAndIsrRequestMap在tp状态机上线完成server请求后clear
 9         // 此轮什么也不做
10         brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
11       }catch {
12         case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
13       }
14     }
15   }
View Code

同partition的初始化状态一样,这步的中文注释也是online上线,进去看下对于replica状态机的初始化的处理:

技术图片
 1 def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
 2                         callbacks: Callbacks) {
 3     val topic = partitionAndReplica.topic
 4     val partition = partitionAndReplica.partition
 5     val replicaId = partitionAndReplica.replica
 6     val topicAndPartition = TopicAndPartition(topic, partition)
 7     if (!hasStarted.get)
 8       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
 9                                             "to %s failed because replica state machine has not started")
10                                               .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
11     val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
12     try {
13       val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
14       targetState match {
15         case NewReplica =>
16           assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
17           // start replica as a follower to the current leader for its partition
18           val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
19           leaderIsrAndControllerEpochOpt match {
20             case Some(leaderIsrAndControllerEpoch) =>
21               if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
22                 throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
23                   .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
24               brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
25                                                                   topic, partition, leaderIsrAndControllerEpoch,
26                                                                   replicaAssignment)
27             case None => // new leader request will be sent to this replica when one gets elected
28           }
29           replicaState.put(partitionAndReplica, NewReplica)
30           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
31                                     .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
32                                             targetState))
View Code

相比partition的单纯在partitionState里添加新partition的状态,它还多了一步brokerRequestBatch.addLeaderAndIsrRequestForBrokers,但由于此时还没有选出leader,所以这一步将被跳过,这就完成了replica状态机的初始化。

然后就该对partition的状态进行online上线了,这一步非常重要,80%的处理都在这部分,一起进去看看吧。

技术图片
 1 def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
 2                          leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
 3                          callbacks: Callbacks = (new CallbackBuilder).build) {
 4     debug("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
 5     try {
 6       brokerRequestBatch.newBatch()
 7       partitions.foreach { topicAndPartition =>
 8         // 给每个tp改了状态,分好了leader和isr,还写了zk,为下面的给server端发leaderAndIsr请求和update metadata请求做铺垫
 9         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
10       }
11       // controller给brokers发leaderAndIsr请求和update metadata请求
12       brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
13     }catch {
14       case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
15       // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
16     }
17   }
View Code
技术图片
 1 private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
 2                                 leaderSelector: PartitionLeaderSelector,
 3                                 callbacks: Callbacks) {
 4     val topicAndPartition = TopicAndPartition(topic, partition)
 5     if (!hasStarted.get)
 6       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
 7                                             "the partition state machine has not started")
 8                                               .format(controllerId, controller.epoch, topicAndPartition, targetState))
 9     // 初始给每个tp都置为NonExistentPartition状态,为了下面进行validation
10     val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
11     try {
12       targetState match {
13         case NewPartition =>
14           // pre: partition did not exist before this
15           assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
16           partitionState.put(topicAndPartition, NewPartition)
17           val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
18           stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
19                                     .format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
20                                             assignedReplicas))
21           // post: partition has been assigned replicas
22         case OnlinePartition =>
23           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
24           partitionState(topicAndPartition) match {
25             case NewPartition =>
26               // initialize leader and isr path for new partition
27               // 不仅建leader和isr的zk path,还更新leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap,
28               // 还更新了partitionLeadershipInfo
29               initializeLeaderAndIsrForPartition(topicAndPartition)
30             case OfflinePartition =>
31               electLeaderForPartition(topic, partition, leaderSelector)
32             case OnlinePartition => // invoked when the leader needs to be re-elected
33               electLeaderForPartition(topic, partition, leaderSelector)
34             case _ => // should never come here since illegal previous states are checked above
35           }
36           partitionState.put(topicAndPartition, OnlinePartition)
37           val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
38           stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
39                                     .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
40 }
41 }
42 {
View Code

回到这个复用的方法里,可以看到除了将partition的状态从newPartition改为onlinePartition之外,最重要的就是initializeLeaderAndIsrForPartition,在里面,选了leader和isr,并且为其写了zk路径,还更新leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap,以及partitionLeadershipInfo。细节需要进去看:

技术图片
 1 private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
 2     val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
 3     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
 4     liveAssignedReplicas.size match {
 5       case 0 =>
 6         val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " +
 7                        "live brokers are [%s]. No assigned replica is alive.")
 8                          .format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)
 9         stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
10         throw new StateChangeFailedException(failMsg)
11       case _ =>
12         debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
13         // make the first replica in the list of assigned replicas, the leader
14         val leader = liveAssignedReplicas.head
15         val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
16           controller.epoch)
17         debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
18         try {
19           zkUtils.createPersistentPath(
20             getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
21             zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
22           // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
23           // took over and initialized this partition. This can happen if the current controller went into a long
24           // GC pause
25           controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
26           // 更新leaderAndIsrRequestMap和更新updateMetadataRequestPartitionInfoMap,这两个
27           // 是用来后面的server端处理leaderAndIsr请求用的,只给tp的相关brokers添加LeaderAndIsrRequest
28           brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
29             topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
30         } catch {
31           case _: ZkNodeExistsException =>
32             // read the controller epoch
33             val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
34               topicAndPartition.partition).get
35             val failMsg = ("encountered error while changing partition %s‘s state from New to Online since LeaderAndIsr path already " +
36                            "exists with value %s and controller epoch %d")
37                              .format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)
38             stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
39             throw new StateChangeFailedException(failMsg)
40         }
41     }
42   }
View Code

首先是获取了该partition的replica分配,这正是从controllerContext.partitionReplicaAssignment里得到,与mark1呼应。在创建topic时replica的选举leader也很简单,就是对一个partition的replica中选第一个作为leader,所有在存活brokers上的relica都默认进isr队列,并且记录当前controller的epoch,这个值大有用处,在后面会有用到,暂且记作mark2。然后就是在zk上leader和isr的相关数据,并且更新到controllerContext.partitionLeadershipInfo,最后就是为该partition的所有replica相关的brokers添加LeaderAndIsrRequest:

 1 def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
 2                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
 3                                        replicas: Seq[Int], callback: AbstractResponse => Unit = null) {
 4     val topicPartition = new TopicPartition(topic, partition)
 5 
 6     brokerIds.filter(_ >= 0).foreach { brokerId =>
 7       val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
 8       result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
 9     }
10 
11     // 更新updateMetadataRequestPartitionInfoMap,需要更新meta data的那些tp的request,可以看到给所有存活的brokers都添加
12     addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
13                                        Set(TopicAndPartition(topic, partition)))
14   }
 1 /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
 2   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
 3                                          partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
 4                                          callback: AbstractResponse => Unit = null) {
 5     def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean) {
 6       val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
 7       leaderIsrAndControllerEpochOpt match {
 8         case Some(leaderIsrAndControllerEpoch) =>
 9           val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
10           val partitionStateInfo = if (beingDeleted) {
11             val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
12             PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
13           } else {
14             PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
15           }
16           updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
17         // replica由不存在转为new时
18         case None =>
19           info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
20       }
21     }
22 
23     val filteredPartitions = {
24       val givenPartitions = if (partitions.isEmpty)
25         controllerContext.partitionLeadershipInfo.keySet
26       else
27         partitions
28       if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
29         givenPartitions
30       else
31         givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
32     }
33 
34     // updateMetadataRequest会给所有存活的broker发送
35     updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
36     filteredPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = false))
37     controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
38   }

可以看到添加的请求有两种:LeaderAndIsrRequest和updateMetadataRequest,它们实质上都更新的是leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap,因为最后给server端发送的request都是从这两个集合里取的,还有一点就是,LeaderAndIsrRequest只会给该partition的相应replica的brokerids添加,而updateMetadataRequest会给所有的存活的brokers添加,换句话说就是LeaderAndIsr的请求会发给相关的broker处理,updateMetadata的请求会发给所有broker的更新。当相应的request的map包装完添加到相应的broker上之后就可以发送请求给server端处理了,目光回到partition的handleStateChanges:

 1 def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
 2                          leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
 3                          callbacks: Callbacks = (new CallbackBuilder).build) {
 4     debug("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
 5     try {
 6       brokerRequestBatch.newBatch()
 7       partitions.foreach { topicAndPartition =>
 8         // 给每个tp改了状态,分好了leader和isr,还写了zk,为下面的给server端发leaderAndIsr请求和update metadata请求做铺垫
 9         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
10       }
11       // controller给brokers发leaderAndIsr请求和update metadata请求
12       brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
13     }catch {
14       case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
15       // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
16     }
17   }

brokerRequestBatch.sendRequestsToBrokers(controller.epoch)完成的就是发送给server端处理的过程,进去看一下:

技术图片
  1 def sendRequestsToBrokers(controllerEpoch: Int) {
  2     try {
  3       leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
  4         partitionStateInfos.foreach { case (topicPartition, state) =>
  5           val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
  6           stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
  7                                    "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
  8                                                                    state.leaderIsrAndControllerEpoch, broker,
  9                                                                    topicPartition.topic, topicPartition.partition))
 10         }
 11         val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
 12         val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
 13           _.getNode(controller.config.interBrokerListenerName)
 14         }
 15         val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
 16           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
 17           val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
 18             leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
 19             partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
 20           topicPartition -> partitionState
 21         }
 22         val leaderAndIsrRequest = new LeaderAndIsrRequest.
 23             Builder(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
 24         // leaderAndIsrRequest只会给涉及到的brokers发送
 25         controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, null)
 26       }
 27       leaderAndIsrRequestMap.clear()
 28 
 29       // 必须先处理leaderAndIsrRequest,再处理updateMetadataRequest
 30       updateMetadataRequestPartitionInfoMap.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
 31         "to brokers %s for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
 32         updateMetadataRequestBrokerSet.toString(), p._1)))
 33       val partitionStates = updateMetadataRequestPartitionInfoMap.map { case (topicPartition, partitionStateInfo) =>
 34         val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
 35         val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
 36           leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
 37           partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
 38         topicPartition -> partitionState
 39       }
 40 
 41       val version: Short =
 42         if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
 43         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2
 44         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
 45         else 0
 46 
 47       val updateMetadataRequest = {
 48         val liveBrokers = if (version == 0) {
 49           // Version 0 of UpdateMetadataRequest only supports PLAINTEXT.
 50           controllerContext.liveOrShuttingDownBrokers.map { broker =>
 51             val securityProtocol = SecurityProtocol.PLAINTEXT
 52             val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
 53             val node = broker.getNode(listenerName)
 54             val endPoints = Seq(new EndPoint(node.host, node.port, securityProtocol, listenerName))
 55             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
 56           }
 57         } else {
 58           controllerContext.liveOrShuttingDownBrokers.map { broker =>
 59             val endPoints = broker.endPoints.map { endPoint =>
 60               new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName)
 61             }
 62             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
 63           }
 64         }
 65         new UpdateMetadataRequest.Builder(
 66           controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava).
 67           setVersion(version)
 68       }
 69 
 70       // 给所有存活的brokers发updateMetadataRequest
 71       updateMetadataRequestBrokerSet.foreach { broker =>
 72         controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, updateMetadataRequest, null)
 73       }
 74       updateMetadataRequestBrokerSet.clear()
 75       updateMetadataRequestPartitionInfoMap.clear()
 76 
 77       stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
 78         val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
 79         val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
 80         debug("The stop replica request (delete = true) sent to broker %d is %s"
 81           .format(broker, stopReplicaWithDelete.mkString(",")))
 82         debug("The stop replica request (delete = false) sent to broker %d is %s"
 83           .format(broker, stopReplicaWithoutDelete.mkString(",")))
 84 
 85         val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null)
 86 
 87         // Send one StopReplicaRequest for all partitions that require neither delete nor callback. This potentially
 88         // changes the order in which the requests are sent for the same partitions, but that‘s OK.
 89         val stopReplicaRequest = new StopReplicaRequest.Builder(controllerId, controllerEpoch, false,
 90           replicasToGroup.map(r => new TopicPartition(r.replica.topic, r.replica.partition)).toSet.asJava)
 91         controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest)
 92 
 93         replicasToNotGroup.foreach { r =>
 94           val stopReplicaRequest = new StopReplicaRequest.Builder(
 95               controllerId, controllerEpoch, r.deletePartition,
 96               Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
 97           controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest, r.callback)
 98         }
 99       }
100       stopReplicaRequestMap.clear()
101     } catch {
102       case e: Throwable =>
103         if (leaderAndIsrRequestMap.nonEmpty) {
104           error("Haven‘t been able to send leader and isr requests, current state of " +
105               s"the map is $leaderAndIsrRequestMap. Exception message: $e")
106         }
107         if (updateMetadataRequestBrokerSet.nonEmpty) {
108           error(s"Haven‘t been able to send metadata update requests to brokers $updateMetadataRequestBrokerSet, " +
109                 s"current state of the partition info is $updateMetadataRequestPartitionInfoMap. Exception message: $e")
110         }
111         if (stopReplicaRequestMap.nonEmpty) {
112           error("Haven‘t been able to send stop replica requests, current state of " +
113               s"the map is $stopReplicaRequestMap. Exception message: $e")
114         }
115         throw new IllegalStateException(e)
116     }
117   }
View Code

代码虽多,但核心就是对上面的两个集合leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap进行取元素发请求,然后跳到server端先看下leaderAndIsrRequest是如何处理的:

技术图片
 1 def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
 2     // ensureTopicExists is only for client facing requests
 3     // We can‘t have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
 4     // stop serving data to clients for the topic being deleted
 5     val correlationId = request.header.correlationId
 6     val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
 7 
 8     try {
 9       // 如果是__consumer_offsets相关,还需要处理consumer group的数据迁移,包括log数据的迁移和group metadata的缓存迁移
10       // log数据迁移已经在make leader和make follower中ready,所以这部分主要做group metadata的缓存迁移
11       def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
12         // for each new leader or follower, call coordinator to handle consumer group migration.
13         // this callback is invoked under the replica state change lock to ensure proper order of
14         // leadership changes
15         updatedLeaders.foreach { partition =>
16           LimitTopicsManager.checkAndPutData(partition.topic)
17           if (partition.topic == Topic.GroupMetadataTopicName)
18             coordinator.handleGroupImmigration(partition.partitionId)
19         }
20         updatedFollowers.foreach { partition =>
21           LimitTopicsManager.checkAndPutData(partition.topic)
22           if (partition.topic == Topic.GroupMetadataTopicName)
23             coordinator.handleGroupEmigration(partition.partitionId)
24         }
25       }
26 
27       val leaderAndIsrResponse =
28         if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
29           val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
30           new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
31         } else {
32           val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
33           new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
34         }
35 
36       requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
37     } catch {
38       case e: KafkaStorageException =>
39         fatal("Disk error during leadership change.", e)
40         Runtime.getRuntime.halt(1)
41     }
42   }
View Code
技术图片
 1 def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
 2                              metadataCache: MetadataCache,
 3                              onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
 4     info("becomeLeaderOrFollower correlationId=" + correlationId + " leaderAndISRRequest=" + leaderAndISRRequest)
 5     leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
 6       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
 7                                 .format(localBrokerId, stateInfo, correlationId,
 8                                         leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
 9     }
10     replicaStateChangeLock synchronized {
11       val responseMap = new mutable.HashMap[TopicPartition, Short]
12       // 请求里的controller leader epoch小于当前的值,说明是无效的
13       if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
14         stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
15           "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
16           correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
17         BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
18       } else {
19         val controllerId = leaderAndISRRequest.controllerId
20         controllerEpoch = leaderAndISRRequest.controllerEpoch
21 
22         // First check partition‘s leader epoch
23         val partitionState = new mutable.HashMap[Partition, PartitionState]()
24         leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
25           val partition = getOrCreatePartition(topicPartition)
26           // 当前内存里获取到的tp leader的epoch值,初始是0-1=-1
27           val partitionLeaderEpoch = partition.getLeaderEpoch
28           // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
29           // This is useful while updating the isr to maintain the decision maker controller‘s epoch in the zookeeper path
30           // 请求里的tp leader的epoch值要大于当前值才有效,并且只留下本broker的tp
31           if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
32             if(stateInfo.replicas.contains(localBrokerId))
33               partitionState.put(partition, stateInfo)
34             else {
35               stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
36                 "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
37                 .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
38                   topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
39               responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
40             }
41           } else {
42             // Otherwise record the error code in response
43             stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
44               "epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d")
45               .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
46                 topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
47             responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
48           }
49         }
50 
51         val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
52           stateInfo.leader == localBrokerId
53         }
54         val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
55 
56         val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
57           makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
58         else
59           Set.empty[Partition]
60         val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
61           makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
62         else
63           Set.empty[Partition]
64 
65         // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
66         // have been completely populated before starting the checkpointing there by avoiding weird race conditions
67         if (!hwThreadInitialized) {
68           startHighWaterMarksCheckPointThread()
69           hwThreadInitialized = true
70         }
71         // make完leaders和followers后,清理掉无用的fetch threads
72         replicaFetcherManager.shutdownIdleFetcherThreads()
73 
74         // 处理数据迁移
75         onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
76         BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
77       }
78     }
79   }
View Code

进入到becomeLeaderOrFollower方法中,首先判断了请求中的controllerEpoch的值与当前值的大小关系,这个用处与mark2呼应,如果请求的值小于当前值,那么说明这个controller发来的请求作废,因为controllerEpoch代表的是controller的版本,每当controller选主都会更新该值加1,如果请求里的该标记值小于内存中的当前值,则说明发送请求的controller是过期的。epoch值有效的话就直接更新controllerEpoch,然后再对所有partition做如下操作:将该tp加到allPartitions中,获取该tp的leaderEpoch值,作用同controller的epoch值一样,其次过滤出有副本在本broker上的所有tp,然后在这些tp中,找出tp的leaderid等于该brokerid的所有tp,表示这些tp将在该broker上完成makeLeader的过程,同理剩余tp将完成makeFollower的过程。当makeLeader和makeFollower完成之后,就需要清理下无用的fetch线程,如果涉及到的topic是__consumer_offsets,那么还需要完成相关的数据迁移。接下来重点讨论下makeLeaders和makeFollowers的过程,首先是makeLeaders:

技术图片
 1 private def makeLeaders(controllerId: Int,
 2                           epoch: Int,
 3                           partitionState: Map[Partition, PartitionState],
 4                           correlationId: Int,
 5                           responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {
 6     info("makeLeaders controllerId=" + controllerId + " epoch=" + epoch + " correlationId=" + correlationId)
 7     partitionState.keys.foreach { partition =>
 8       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
 9         "starting the become-leader transition for partition %s")
10         .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
11     }
12 
13     for (partition <- partitionState.keys)
14       responseMap.put(partition.topicPartition, Errors.NONE.code)
15 
16     val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
17 
18     try {
19       // First stop fetchers for all the partitions
20       // 要成为leader,首先停掉follower的fetch线程
21       replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
22       // Update the partition information to be the leader
23       partitionState.foreach{ case (partition, partitionStateInfo) =>
24         // make leader的行为
25         if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
26           partitionsToMakeLeaders += partition
27         else
28           stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
29             "controller %d epoch %d for partition %s since it is already the leader for the partition.")
30             .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
31       }
32       partitionsToMakeLeaders.foreach { partition =>
33         stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
34           "%d epoch %d with correlation id %d for partition %s")
35           .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
36       }
37 
38       // Init partition newOffsetMetaDataMap for Leader
39       val metaData = new mutable.HashMap[TopicPartition, NewOffsetMetaData]
40       partitionsToMakeLeaders.map{ partition =>
41         val leo: Long = partition.leaderReplicaIfLocal.get.logEndOffset.messageOffset
42         val lst: Long = partition.logManager.getLog(partition.topicPartition).get.segments.firstEntry().getValue.log.creationTime()
43         val let: Long = partition.logManager.getLog(partition.topicPartition).get.segments.lastEntry().getValue.log.file.lastModified()
44         val lso: Long = partition.leaderReplicaIfLocal.get.logStartOffset
45         metaData.put(partition.topicPartition, new NewOffsetMetaData(partition.leaderReplicaIdOpt.get, leo, lst, let, lso))
46       }
47 
48       info("makeLeaders updateNewOffsetMetaData broker=" + localBrokerId + " metaData=" + metaData)
49       updateNewOffsetMetaData(localBrokerId, metaData)
50     } catch {
51       case e: Throwable =>
52         partitionState.keys.foreach { partition =>
53           val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
54             " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)
55           stateChangeLogger.error(errorMsg, e)
56         }
57         // Re-throw the exception for it to be caught in KafkaApis
58         throw e
59     }
60 
61     partitionState.keys.foreach { partition =>
62       stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
63         "for the become-leader transition for partition %s")
64         .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
65     }
66 
67     partitionsToMakeLeaders
68   }
View Code

这里一共有两步:既然要成为leader,首先停掉follower的fetch线程,其次就是更新partition信息,而更新partition信息的具体过程就在makeLeader中,这里实现的是一个partition成为leader时要做的工作,具体如下:

技术图片
 1 /**
 2    * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset
 3    * from the time when this broker was the leader last time) and setting the new leader and ISR.
 4    * If the leader replica id does not change, return false to indicate the replica manager.
 5    */
 6   def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
 7     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
 8       val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
 9       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
10       // to maintain the decision maker controller‘s epoch in the zookeeper path
11       // 维护最新的controllerEpoch,方便在特殊情况下出现多leader controller时做决策
12       controllerEpoch = partitionStateInfo.controllerEpoch
13       // add replicas that are new
14       // 重新make leader replica时,更新offset到hw值(截断至hw),放到assignedReplicas中,如果是local broker的
15       // replica,还会附带log数据
16       allReplicas.foreach(replica => getOrCreateReplica(replica))
17       val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
18       // remove assigned replicas that have been removed by the controller
19       // 此tp之前在本broker上的的follower replica都要移除,oar+rar-rar,要删除oar
20       (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
21       inSyncReplicas = newInSyncReplicas
22       leaderEpoch = partitionStateInfo.leaderEpoch
23       zkVersion = partitionStateInfo.zkVersion
24       val isNewLeader =
25         if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
26           false
27         } else {
28           // 主副本编号不存在或现主副本编号与原主副本编号不一致(备份副本转为主副本)
29           info("makeLeader true, before leaderReplicaIdOpt=" + leaderReplicaIdOpt +
30             " after leaderReplicaIdOpt=" + localBrokerId + " topic="+ topicPartition)
31           leaderReplicaIdOpt = Some(localBrokerId)
32           true
33         }
34       val leaderReplica = getReplica().get
35       val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
36       val curTimeMs = time.milliseconds
37       // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
38       // 对其余replica reset最近的进队时间
39       (assignedReplicas - leaderReplica).foreach { replica =>
40         val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
41         replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
42       }
43       // we may need to increment high watermark since ISR could be down to 1
44       if (isNewLeader) {
45         // construct the high watermark metadata for the new leader replica
46         leaderReplica.convertHWToLocalOffsetMetadata()
47         // reset log end offset for remote replicas
48         // 由于是新leader,需要重置下远程replica的几个offset值
49         assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
50       }
51       // 比较现有replica的leo值,看是否更新leader的hw值
52       (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
53     }
54     // some delayed operations may be unblocked after HW changed
55     if (leaderHWIncremented)
56       // hw更新后,需要尝试完成fetch和完成同步response
57       tryCompleteDelayedRequests()
58     isNewLeader
59   }
View Code

具体controllerEpoch的使用同上面所说,接下来就是对该partition的所有replica进行创建,对于本broker的replica(也就是leader),需要更新offset到hw值(截断至hw),放到assignedReplicas中,还会附带log数据,而不在本broker的replica,也就是follower,只会建立一个远程副本。具体创建的实现如下:

技术图片
 1 def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
 2     assignedReplicaMap.getAndMaybePut(replicaId, {
 3       if (isReplicaLocal(replicaId)) {
 4         val config = LogConfig.fromProps(logManager.defaultConfig.originals,
 5                                          AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
 6         val log = logManager.createLog(topicPartition, config)
 7         val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
 8         val offsetMap = checkpoint.read
 9         if (!offsetMap.contains(topicPartition))
10           info(s"No checkpointed highwatermark is found for partition $topicPartition")
11         val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
12         new Replica(replicaId, this, time, offset, Some(log))
13       } else new Replica(replicaId, this, time) // 这是在本broker中建远程副本
14     })
15   }
View Code

回到makeLeader中来,所有的replica创建完之后,然后将之前旧的replica移出,具体就是assignedReplicas包含的是之前的oar加上了新的rar,allReplicas就是新的rar,所以oar+rar-rar=oar,也就是就的replica集合要被删除。然后更新该tp的isr和leaderEpoch信息,值得注意的是,这里的leaderEpoch的使用同controller的LeaderEpoch。然后判断选出来的leader是不是新leader,因为可能存在之前该tp的leader就在该broker上,也就是前后一致,这样就不是新选出的leader。之后就是对isr里的其它replica重置进队时间(当前时间),如果是新leader的情况,还需要重置下远程replica的几个offset值。然后比较现有replica的leo值,看是否更新leader的hw值,如果leader更新了leo值,那么之前的delay fetch和完成客户端对发送消息的response,这两个delay operation的具体实现读者可以自行深入研究,由于篇幅有限,不做展开。到这里makeLeader中更新partition信息就完成了,makeLeaders也就完成了,本broker上的所有leaders已经处理完成,接下来就是处理makeFollowers了,进去看下具体实现:

技术图片
  1 private def makeFollowers(controllerId: Int,
  2                             epoch: Int,
  3                             partitionState: Map[Partition, PartitionState],
  4                             correlationId: Int,
  5                             responseMap: mutable.Map[TopicPartition, Short],
  6                             metadataCache: MetadataCache) : Set[Partition] = {
  7     info("makeFollowers controllerId=" + controllerId + " epoch=" + epoch + " correlationId=" + correlationId)
  8     partitionState.keys.foreach { partition =>
  9       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
 10         "starting the become-follower transition for partition %s")
 11         .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
 12     }
 13 
 14     for (partition <- partitionState.keys)
 15       responseMap.put(partition.topicPartition, Errors.NONE.code)
 16 
 17     val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
 18 
 19     try {
 20 
 21       // TODO: Delete leaders from LeaderAndIsrRequest
 22       partitionState.foreach{ case (partition, partitionStateInfo) =>
 23         val newLeaderBrokerId = partitionStateInfo.leader
 24         metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
 25           // Only change partition state when the leader is available
 26           case Some(_) =>
 27             if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
 28               partitionsToMakeFollower += partition
 29             else
 30               stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
 31                 "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")
 32                 .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
 33                 partition.topicPartition, newLeaderBrokerId))
 34           case None =>
 35             // The leader broker should always be present in the metadata cache.
 36             // If not, we should record the error message and abort the transition process for this partition
 37             stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
 38               " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")
 39               .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
 40               partition.topicPartition, newLeaderBrokerId))
 41             // Create the local replica even if the leader is unavailable. This is required to ensure that we include
 42             // the partition‘s high watermark in the checkpoint file (see KAFKA-1647)
 43             partition.getOrCreateReplica()
 44         }
 45       }
 46 
 47       // 停掉现有的fetch
 48       replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
 49       partitionsToMakeFollower.foreach { partition =>
 50         stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " +
 51           "%d epoch %d with correlation id %d for partition %s")
 52           .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
 53       }
 54 
 55       // 将log截断至hw位置
 56       logManager.truncateTo(partitionsToMakeFollower.map { partition =>
 57         (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)
 58       }.toMap)
 59 
 60       // 之前可能是leader,需要完成之前作为leader时的一些延时请求
 61       partitionsToMakeFollower.foreach { partition =>
 62           val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
 63           tryCompleteDelayedProduce(topicPartitionOperationKey)
 64           tryCompleteDelayedFetch(topicPartitionOperationKey)
 65         }
 66 
 67       partitionsToMakeFollower.foreach { partition =>
 68         stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " +
 69           "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
 70           partition.topicPartition, correlationId, controllerId, epoch))
 71       }
 72 
 73       if (isShuttingDown.get()) {
 74         partitionsToMakeFollower.foreach { partition =>
 75           stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +
 76             "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId,
 77             controllerId, epoch, partition.topicPartition))
 78         }
 79       } else {
 80         debug("makefollowers config.smartExtendEnable=" + config.smartExtendEnable)
 81         if (config.smartExtendEnable) {
 82           try {
 83             // 1. clear NewOffsetMetaData.
 84             clearNewOffsetMetaData(partitionsToMakeFollower)
 85 
 86             // 2. get offset from leader.
 87             val ResponseOffsetMap = getBestOffset(partitionsToMakeFollower, metadataCache, config.getStartOffsetRetries)
 88 
 89             if (ResponseOffsetMap.nonEmpty) {
 90               val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
 91                 partition.topicPartition -> BrokerAndInitialOffset(
 92                   metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
 93                   ResponseOffsetMap.get(partition.topicPartition).get.baseOffset)).toMap
 94 
 95               // 3. trunc log
 96               val partitionOffsets: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
 97               ResponseOffsetMap.map { partition =>
 98                 if (partition._2.error == Errors.OFFSET_HW) {
 99                   partitionOffsets.put(partition._1, partition._2.baseOffset)
100                 }
101               }
102 
103               if (partitionOffsets.nonEmpty) {
104                 info("makefollowers trunc log, partitionOffsets size=" + partitionOffsets.size + " ResponseOffsetMap size=" +
105                     ResponseOffsetMap.size + " partitionOffsets=" + partitionOffsets + " ResponseOffsetMap=" + ResponseOffsetMap)
106                 partitionOffsets.map { partitionInfo =>
107                   // 按照new metadata返回的offset信息截断
108                   logManager.truncateFullyAndStartAt(partitionInfo._1, partitionInfo._2)
109                 }
110               }
111               replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
112             } else {
113               // 走老策略兜底
114               val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
115                 partition.topicPartition -> BrokerAndInitialOffset(
116                   metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
117                   partition.getReplica().get.logEndOffset.messageOffset)).toMap
118               error("makefollowers getStartOffset fail, and use old mode partitionsToMakeFollowerWithLeaderAndOffset=" + partitionsToMakeFollowerWithLeaderAndOffset)
119               replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
120             }
121           } catch {
122             case e: Exception =>
123               val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
124                 partition.topicPartition -> BrokerAndInitialOffset(
125                   metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
126                   partition.getReplica().get.logEndOffset.messageOffset)).toMap
127               error("ReplicaManager makefollowers getStartOffset fail, and use old mode partitionsToMakeFollowerWithLeaderAndOffset=" + partitionsToMakeFollowerWithLeaderAndOffset
128                 + " Exception=" + e.getMessage)
129               replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
130           }
131         } else {
132           //we do not need to check if the leader exists again since this has been done at the beginning of this process
133           val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
134             partition.topicPartition -> BrokerAndInitialOffset(
135               metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
136               partition.getReplica().get.logEndOffset.messageOffset)).toMap
137           replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
138         }
139 
140         partitionsToMakeFollower.foreach { partition =>
141           stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " +
142             "%d epoch %d with correlation id %d for partition %s")
143             .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
144         }
145       }
146     } catch {
147       case e: Throwable =>
148         val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " +
149           "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
150         stateChangeLogger.error(errorMsg, e)
151         // Re-throw the exception for it to be caught in KafkaApis
152         throw e
153     }
154 
155     partitionState.keys.foreach { partition =>
156       stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
157         "for the become-follower transition for partition %s")
158         .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
159     }
160 
161     partitionsToMakeFollower
162   }
View Code

和makeLeaders的makeLeader一样,首先也会有一个makeFollower的操作,它的功能几乎一样,也是创建所有replica,将旧replica删除,更新该tp的isr和leaderEpoch信息,值得注意的是,如果该tp的leader还和老的tp leader一致,就不会对该broker上的replica做接下来的操作:先停掉现有的fetch线程,然后将log截断至hw的位置,可能之前是leader,所以需要该replica作为leader完成之前delay的客户端对发送消息的response和对它的fetch,最后就是用新leader的leo值重置下该replica作为新follower的初始offset值,并添加fetch线程。至此,makeFollowers的过程也结束了。

然后,回到becomeLeaderOrFollower,接下来就是清理掉无用的fetch threads,最后就是处理数据迁移。代码如下:

技术图片
 1 // 如果是__consumer_offsets相关,还需要处理consumer group的数据迁移,包括log数据的迁移和group metadata的缓存迁移
 2       // log数据迁移已经在make leader和make follower中ready,所以这部分主要做group metadata的缓存迁移
 3       def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
 4         // for each new leader or follower, call coordinator to handle consumer group migration.
 5         // this callback is invoked under the replica state change lock to ensure proper order of
 6         // leadership changes
 7         updatedLeaders.foreach { partition =>
 8           LimitTopicsManager.checkAndPutData(partition.topic)
 9           if (partition.topic == Topic.GroupMetadataTopicName)
10             coordinator.handleGroupImmigration(partition.partitionId)
11         }
12         updatedFollowers.foreach { partition =>
13           LimitTopicsManager.checkAndPutData(partition.topic)
14           if (partition.topic == Topic.GroupMetadataTopicName)
15             coordinator.handleGroupEmigration(partition.partitionId)
16         }
17       }
View Code

也就是说如果涉及到的topic是内置topic __consumer_offsets的话,还需要处理数据迁移,对于在本broker上成为leader的partition,需要『迁移进来group metadata』,在本broker上成为follower的partition,需要『迁出去metadata』,具体怎么迁移的,进去看下吧,首先是handleGroupImmigration:

 1 def handleGroupImmigration(offsetTopicPartitionId: Int) { 2 groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded) 3 } 

1   private def onGroupLoaded(group: GroupMetadata) {
2     group synchronized {
3       info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
4       assert(group.is(Stable) || group.is(Empty))
5       group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
6     }
7   }
技术图片
 1 def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) {
 2     val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
 3 
 4     def doLoadGroupsAndOffsets() {
 5       info(s"Loading offsets and group metadata from $topicPartition")
 6 
 7       inLock(partitionLock) {
 8         if (loadingPartitions.contains(offsetsPartition)) {
 9           info(s"Offset load from $topicPartition already in progress.")
10           return
11         } else {
12           loadingPartitions.add(offsetsPartition)
13         }
14       }
15 
16       try {
17         loadGroupsAndOffsets(topicPartition, onGroupLoaded)
18       } catch {
19         case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
20       } finally {
21         inLock(partitionLock) {
22           ownedPartitions.add(offsetsPartition)
23           loadingPartitions.remove(offsetsPartition)
24         }
25       }
26     }
27 
28     // 加载和清理meta data cache的一个线程
29     scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets)
30   }
View Code

可以看到在这一步,将在本broker的相应的leader partition(这里操作的都是__consumer_offsets)加载到loadingPartitions中,然后执行loadGroupsAndOffsets的过程,执行完毕,将该partition放入ownedPartitions,再将其从loadingPartitions移出,这样就完成了leader partition的数据迁入过程。其中loadGroupsAndOffsets的具体实现如下:

技术图片
 1 private[coordinator] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) {
 2     def highWaterMark = replicaManager.getHighWatermark(topicPartition).getOrElse(-1L)
 3 
 4     val startMs = time.milliseconds()
 5     replicaManager.getLog(topicPartition) match {
 6       case None =>
 7         warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
 8 
 9       case Some(log) =>
10         var currOffset = log.logStartOffset
11         val buffer = ByteBuffer.allocate(config.loadBufferSize)
12         // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
13         val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
14         val removedOffsets = mutable.Set[GroupTopicPartition]()
15         val loadedGroups = mutable.Map[String, GroupMetadata]()
16         val removedGroups = mutable.Set[String]()
17 
18         while (currOffset < highWaterMark && !shuttingDown.get()) {
19           buffer.clear()
20           val fileRecords = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true)
21             .records.asInstanceOf[FileRecords]
22           val bufferRead = fileRecords.readInto(buffer, 0)
23 
24           MemoryRecords.readableRecords(bufferRead).deepEntries.asScala.foreach { entry =>
25             val record = entry.record
26             require(record.hasKey, "Group metadata/offset entry key should not be null")
27 
28             // 两类数据分别处理:offset数据和group元数据
29             GroupMetadataManager.readMessageKey(record.key) match {
30               case offsetKey: OffsetKey =>
31                 // load offset
32                 val key = offsetKey.key
33                 if (record.hasNullValue) {
34                   loadedOffsets.remove(key)
35                   removedOffsets.add(key)
36                 } else {
37                   val value = GroupMetadataManager.readOffsetMessageValue(record.value)
38                   loadedOffsets.put(key, value)
39                   removedOffsets.remove(key)
40                 }
41 
42               case groupMetadataKey: GroupMetadataKey =>
43                 // load group metadata
44                 val groupId = groupMetadataKey.key
45                 val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
46                 if (groupMetadata != null) {
47                   trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}")
48                   removedGroups.remove(groupId)
49                   loadedGroups.put(groupId, groupMetadata)
50                 } else {
51                   loadedGroups.remove(groupId)
52                   removedGroups.add(groupId)
53                 }
54 
55               case unknownKey =>
56                 throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
57             }
58 
59             currOffset = entry.nextOffset
60           }
61         }
62 
63         val (groupOffsets, emptyGroupOffsets) = loadedOffsets
64           .groupBy(_._1.group)
65           .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)} )
66           .partition { case (group, _) => loadedGroups.contains(group) }
67 
68         loadedGroups.values.foreach { group =>
69           val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata])
70           // 真正更新metadata,包括group metadata和offset
71           loadGroup(group, offsets)
72           // 更新完所有members都心跳一下,表示能读到group metadata,group处于stable状态
73           onGroupLoaded(group)
74         }
75 
76         // load groups which store offsets in kafka, but which have no active members and thus no group
77         // metadata stored in the log
78         emptyGroupOffsets.foreach { case (groupId, offsets) =>
79           val group = new GroupMetadata(groupId)
80           loadGroup(group, offsets)
81           onGroupLoaded(group)
82         }
83 
84         removedGroups.foreach { groupId =>
85           // if the cache already contains a group which should be removed, raise an error. Note that it
86           // is possible (however unlikely) for a consumer group to be removed, and then to be used only for
87           // offset storage (i.e. by "simple" consumers)
88           if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
89             throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
90               s"loading partition $topicPartition")
91         }
92 
93         if (!shuttingDown.get())
94           info("Finished loading offsets from %s in %d milliseconds."
95             .format(topicPartition, time.milliseconds() - startMs))
96     }
97   }
View Code

整体来说,就是读取这个partition的log数据,分为两类数据分别处理:offset数据和group元数据,然后调用loadGroup来真正更新group的offset数据和metadata。其次调用onGroupLoaded使得该group的所有members都心跳一次,这个在上一篇博客coordinator中细讲过。这样leader的数据迁入就完成了。接下来是follower的数据迁出:

 1 def handleGroupEmigration(offsetTopicPartitionId: Int) { 2 groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded) 3 } 

技术图片
 1 private def onGroupUnloaded(group: GroupMetadata) {
 2     group synchronized {
 3       info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
 4       val previousState = group.currentState
 5       group.transitionTo(Dead)
 6       group.metrics.close()
 7 
 8       previousState match {
 9         case Empty | Dead =>
10         case PreparingRebalance =>
11           for (member <- group.allMemberMetadata) {
12             if (member.awaitingJoinCallback != null) {
13               member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
14               member.awaitingJoinCallback = null
15             }
16           }
17           joinPurgatory.checkAndComplete(GroupKey(group.groupId))
18 
19         case Stable | AwaitingSync =>
20           for (member <- group.allMemberMetadata) {
21             if (member.awaitingSyncCallback != null) {
22               member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP.code)
23               member.awaitingSyncCallback = null
24             }
25             heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
26           }
27       }
28     }
29   }
View Code
技术图片
 1 def removeGroupsForPartition(offsetsPartition: Int,
 2                                onGroupUnloaded: GroupMetadata => Unit) {
 3     val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
 4     scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
 5 
 6     def removeGroupsAndOffsets() {
 7       var numOffsetsRemoved = 0
 8       var numGroupsRemoved = 0
 9 
10       inLock(partitionLock) {
11         // we need to guard the group removal in cache in the loading partition lock
12         // to prevent coordinator‘s check-and-get-group race condition
13         ownedPartitions.remove(offsetsPartition)
14 
15         for (group <- groupMetadataCache.values) {
16           if (partitionFor(group.groupId) == offsetsPartition) {
17             // 将group转为dead状态
18             onGroupUnloaded(group)
19             group.metrics.close()
20             groupMetadataCache.remove(group.groupId, group)
21             numGroupsRemoved += 1
22             numOffsetsRemoved += group.numOffsets
23           }
24         }
25       }
26 
27       if (numOffsetsRemoved > 0)
28         info(s"Removed $numOffsetsRemoved cached offsets for $topicPartition on follower transition.")
29 
30       if (numGroupsRemoved > 0)
31         info(s"Removed $numGroupsRemoved cached groups for $topicPartition on follower transition.")
32     }
33   }
View Code

可以看到在本broker的__consumer_offsets partition成为follower之后,需要从ownedPartitions移除,然后将属于该partition的group转为dead状态(这个通过onGroupUnloaded完成),最后将该group的metadata从groupMetadataCache中删除,这样follower的数据迁出也完成了。至此,server端的leaderAndIsr请求就完成了,回到controller那端,接下来就该处理updateMetadataRequest,上面提到过,updateMetadataRequest是发给所有存活的broker的,然后再去server端看下是怎么处理updateMetadataRequest的:

技术图片
 1 def handleUpdateMetadataRequest(request: RequestChannel.Request) {
 2     val correlationId = request.header.correlationId
 3     val updateMetadataRequest = request.body.asInstanceOf[UpdateMetadataRequest]
 4 
 5     val updateMetadataResponse =
 6       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
 7         // 这一步更新了metadata cache
 8         val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
 9         if (deletedPartitions.nonEmpty)
10           // 需要从group metadata中删除
11           coordinator.handleDeletedPartitions(deletedPartitions)
12 
13         // 已经更新完meta data,可以完成delayed topic operations了
14         if (adminManager.hasDelayedTopicOperations) {
15           updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
16             adminManager.tryCompleteDelayedTopicOperations(topic)
17           }
18         }
19         new UpdateMetadataResponse(Errors.NONE.code)
20       } else {
21         new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code)
22       }
23 
24     if (config.useHdfsFunction) {
25       replicaManager.taskManager.addDiffTask()
26     }
27 
28     requestChannel.sendResponse(new Response(request, updateMetadataResponse))
29   }
View Code
技术图片
 1 def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) : Seq[TopicPartition] =  {
 2     replicaStateChangeLock synchronized {
 3       if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
 4         val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
 5           "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId,
 6           correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
 7           controllerEpoch)
 8         stateChangeLogger.warn(stateControllerEpochErrorMessage)
 9         throw new ControllerMovedException(stateControllerEpochErrorMessage)
10       } else {
11         // 更新metadata
12         val deletedPartitions = metadataCache.updateCache(correlationId, updateMetadataRequest)
13         controllerEpoch = updateMetadataRequest.controllerEpoch
14         metadataCache.setControllerEpoch(controllerEpoch)
15         deletedPartitions
16       }
17     }
18   }
View Code
技术图片
 1 def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
 2     inWriteLock(partitionMetadataLock) {
 3       controllerId = updateMetadataRequest.controllerId match {
 4           case id if id < 0 => None
 5           case id => Some(id)
 6         }
 7       aliveNodes.clear()
 8       aliveBrokers.clear()
 9       updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
10         // `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which
11         // is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could
12         // move to `AnyRefMap`, which has comparable performance.
13         val nodes = new java.util.HashMap[ListenerName, Node]
14         val endPoints = new mutable.ArrayBuffer[EndPoint]
15         broker.endPoints.asScala.foreach { ep =>
16           endPoints += EndPoint(ep.host, ep.port, ep.listenerName, ep.securityProtocol)
17           nodes.put(ep.listenerName, new Node(broker.id, ep.host, ep.port))
18         }
19         aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack))
20         aliveNodes(broker.id) = nodes.asScala
21       }
22 
23       val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
24       updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
25         val controllerId = updateMetadataRequest.controllerId
26         val controllerEpoch = updateMetadataRequest.controllerEpoch
27         if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
28           removePartitionInfo(tp.topic, tp.partition)
29           stateChangeLogger.trace(s"Broker $brokerId deleted partition $tp from metadata cache in response to UpdateMetadata " +
30             s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
31           deletedPartitions += tp
32         } else {
33           val partitionInfo = partitionStateToPartitionStateInfo(info)
34           // 实际更新metadata动作
35           addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo)
36           stateChangeLogger.trace(s"Broker $brokerId cached leader info $partitionInfo for partition $tp in response to " +
37             s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
38         }
39       }
40       deletedPartitions
41     }
42   }
View Code

可以看到实际的更新metadata动作在addOrUpdatePartitionInfo中完成,在这几个过程中,发现如果有partition的leader处于被删除状态,还会顺便删除它的元信息,进而把它从group的metadata中删除,这就是updateMetadataRequest完成的事情,相比leaderAndIsrRequest简单很多。再回到controller端,完成了updateMetadata之后,就是stopReplicaRequestMap,由于创建topic过程不会涉及到这块,所以本文暂不深入进去。这样一来,将所有tp的state转为online上线的过程就完成了,最后就是将所有replica的state转为online上线:

 1 def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
 2                          callbacks: Callbacks = (new CallbackBuilder).build) {
 3     if(replicas.nonEmpty) {
 4       debug("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
 5       try {
 6         brokerRequestBatch.newBatch()
 7         replicas.foreach(r => handleStateChange(r, targetState, callbacks))
 8         // 已经将updateMetadataRequestBrokerSet和leaderAndIsrRequestMap在tp状态机上线完成server请求后clear
 9         // 此轮什么也不做
10         brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
11       }catch {
12         case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
13       }
14     }
15   }

还是一样的流程,先handleStateChange,再sendRequestsToBrokers,由于已经将updateMetadataRequestBrokerSet和leaderAndIsrRequestMap在tp状态机上线完成server请求后clear,所以在replica上线时sendRequestsToBrokers什么也不做,进去handleStateChange看看做了啥:

技术图片
  1 def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
  2                         callbacks: Callbacks) {
  3     val topic = partitionAndReplica.topic
  4     val partition = partitionAndReplica.partition
  5     val replicaId = partitionAndReplica.replica
  6     val topicAndPartition = TopicAndPartition(topic, partition)
  7     if (!hasStarted.get)
  8       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
  9                                             "to %s failed because replica state machine has not started")
 10                                               .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
 11     val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
 12     try {
 13       val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
 14       targetState match {
 15         case NewReplica =>
 16           assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
 17           // start replica as a follower to the current leader for its partition
 18           val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
 19           leaderIsrAndControllerEpochOpt match {
 20             case Some(leaderIsrAndControllerEpoch) =>
 21               if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
 22                 throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
 23                   .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
 24               brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
 25                                                                   topic, partition, leaderIsrAndControllerEpoch,
 26                                                                   replicaAssignment)
 27             case None => // new leader request will be sent to this replica when one gets elected
 28           }
 29           replicaState.put(partitionAndReplica, NewReplica)
 30           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 31                                     .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
 32                                             targetState))
 33         case ReplicaDeletionStarted =>
 34           assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
 35           replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
 36           // send stop replica command
 37           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
 38             callbacks.stopReplicaResponseCallback)
 39           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 40             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
 41         case ReplicaDeletionIneligible =>
 42           assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
 43           replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
 44           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 45             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
 46         case ReplicaDeletionSuccessful =>
 47           assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
 48           replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
 49           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 50             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
 51         case NonExistentReplica =>
 52           assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
 53           // remove this replica from the assigned replicas list for its partition
 54           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
 55           controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
 56           replicaState.remove(partitionAndReplica)
 57           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 58             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
 59         case OnlineReplica =>
 60           assertValidPreviousStates(partitionAndReplica,
 61             List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
 62           replicaState(partitionAndReplica) match {
 63             case NewReplica =>
 64               // add this replica to the assigned replicas list for its partition
 65               val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
 66               if(!currentAssignedReplicas.contains(replicaId))
 67                 controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
 68               stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 69                                         .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
 70                                                 targetState))
 71             case _ =>
 72               // check if the leader for this partition ever existed
 73               controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
 74                 case Some(leaderIsrAndControllerEpoch) =>
 75                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
 76                     replicaAssignment)
 77                   replicaState.put(partitionAndReplica, OnlineReplica)
 78                   stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 79                     .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
 80                 case None => // that means the partition was never in OnlinePartition state, this means the broker never
 81                   // started a log for that partition and does not have a high watermark value for this partition
 82               }
 83           }
 84           replicaState.put(partitionAndReplica, OnlineReplica)
 85         case OfflineReplica =>
 86           assertValidPreviousStates(partitionAndReplica,
 87             List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
 88           // send stop replica command to the replica so that it stops fetching from the leader
 89           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
 90           // As an optimization, the controller removes dead replicas from the ISR
 91           val leaderAndIsrIsEmpty: Boolean =
 92             controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
 93               case Some(_) =>
 94                 controller.removeReplicaFromIsr(topic, partition, replicaId) match {
 95                   case Some(updatedLeaderIsrAndControllerEpoch) =>
 96                     // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
 97                     val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
 98                     if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
 99                       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
100                         topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
101                     }
102                     replicaState.put(partitionAndReplica, OfflineReplica)
103                     stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
104                       .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
105                     false
106                   case None =>
107                     true
108                 }
109               case None =>
110                 true
111             }
112           if (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
113             throw new StateChangeFailedException(
114               "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
115               .format(replicaId, topicAndPartition))
116       }
117     }
118     catch {
119       case t: Throwable =>
120         stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
121                                   .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t)
122     }
123   }
View Code

可以看到仅仅时将controllerContext.partitionReplicaAssignment和replicaState更新了一下,也就是可以理解为仅仅将replica的状态由NewReplica改为OnlineReplica。

这样一来,partition和replica的状态机转变就完成了,整个topic创建的过程也就完成了。回头看其实最主要的还是在partition从new到online上线的那一段逻辑。

 

以上是关于kafka源码走读-controller (创建topic过程)的主要内容,如果未能解决你的问题,请参考以下文章

netty源码走读(服务端Channel创建流程)

Kafka 源码解析之 Topic 的新建/扩容/删除

我的Spark源码核心SparkContext走读全纪录

kcp源码走读

JAVA源码走读 HashMap与ArrayList

JAVA进阶之路-CountDownLatch源码走读