kafka源码走读-controller (创建topic过程)
Posted hudeqi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka源码走读-controller (创建topic过程)相关的知识,希望对你有一定的参考价值。
晚上刚刚被媳妇骂,难过之余,还是要坚持继续写一篇kafka源码走读的博客,心情难过,原谅我开头发下牢骚。。。
源码版本依然是0.10.2.1,我们都知道,kafka在0.8版本前没有提供Partition的Replication机制,一旦Broker宕机,其上的所有Partition就都无法提供服务,而Partition又没有备份数据,数据的可用性就大大降低了,所以0.8后提供了Replication机制来保证Broker的failover,而controller则是实现副本机制的核心。
controller要实现副本机制,它是极度依赖于zookeeper(以下简称zk)服务的,简单来说就是:利用zk来监听zk目录,一旦的监听到发生变化,在controller里实现的监听器的处理逻辑就会被触发,controller再将处理结果分发给相关的或所有的brokers,这样一来所有的brokers就会都知道发生的改变以及处理结果。可能还是有些抽象,接下来就以创建topic的过程为例,分析一下zk、controller、broker的server端发生的处理逻辑。
1. 常用名词解释:
- leader:副本中的主副本,producer和consumer都是直接与leader进行交互的。
- follower:所有副本中除了leader的其它副本,它不断地从leader来fetch数据以保持数据同步,一旦leader挂掉,会从剩下的follower中选出一个所为新的leader,数据可靠性的保证。
- AR:是一个partition的所有副本的集合。OAR是之前的副本集合,RAR是重新分配的副本集合。
- ISR:follower们向leader进行数据同步时会有一定的延迟,如果延迟在设定的延迟阈值里,那么该副本就属于isr(俗称进队),所以isr是leader和那些进队的follower。
- OSR:与ISR相反,如果延迟超过阈值,那么就属于osr(俗称掉队)。因此,AR=ISR+OSR。
- LEO:消息生产的最高位置,也就是leader最后一条消息的offset值。
- HW:高水位的简称,表达的含义就是consumer从leader最多能消费的位置,因为producer是直接和leader进行交互的,当一条消息被append到leader上,但isr里的follower还没有将该条消息fetch过去,那么此时如果leader挂掉,可能面临着数据丢失消费不到的问题,只有当isr里的所有副本都同步完该条消息,才会将leader的hw值更新。
还是不多说废话,我们直接上代码吧。
2. 代码走读:
创建topic的命令一定不会陌生: ~/software/kafka/bin/kafka-topics.sh --topic xx_log --replication-factor 3 --partitions 240 --create --zookeeper 127.0.0.1:2181,首先从解析这条命令开始:
1 def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) { 2 val topic = opts.options.valueOf(opts.topicOpt) 3 val configs = parseTopicConfigsToBeAdded(opts) 4 val ifNotExists = opts.options.has(opts.ifNotExistsOpt) 5 if (Topic.hasCollisionChars(topic)) 6 println("WARNING: Due to limitations in metric names, topics with a period (‘.‘) or underscore (‘_‘) could collide. To avoid issues it is best to use either, but not both.") 7 try { 8 if (opts.options.has(opts.tagOpt)) { 9 val tag = opts.options.valueOf(opts.tagOpt).toString 10 if (zkUtils.checkTagExist(tag)) { 11 if (opts.options.has(opts.replicaAssignmentOpt)) { // 创建时指定tp在哪个broker上 12 val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) 13 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false, tag = tag) 14 } else { 15 CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt, opts.tagOpt) 16 val partitions = opts.options.valueOf(opts.partitionsOpt).intValue 17 val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue 18 // 默认不会关闭机架感知副本策略 19 val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled 20 else RackAwareMode.Enforced 21 AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode, tag = tag) 22 } 23 println("Created topic "%s".".format(topic)) 24 } else { 25 println(s"Tag ‘$tag‘ not exists.") 26 } 27 } else { 28 if (opts.options.has(opts.replicaAssignmentOpt)) { 29 val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) 30 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false) 31 } else { 32 CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) 33 val partitions = opts.options.valueOf(opts.partitionsOpt).intValue 34 val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue 35 val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled 36 else RackAwareMode.Enforced 37 AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode) 38 } 39 println("Created topic "%s".".format(topic)) 40 } 41 } catch { 42 case e: TopicExistsException => if (!ifNotExists) throw e 43 } 44 }
可以看到首先会解析这条创建topic的命令,一般不会在创建时就指定哪些tp在哪个broker上,这个过程一般默认靠特定的分配算法。也默认不会关闭机架感知副本策略,因为通过将副本分布在不同的机架位上,也是处于高可用的考虑上。核心是AdminUtils.createTopic,进去看下干了啥。
1 def createTopic(zkUtils: ZkUtils, 2 topic: String, 3 partitions: Int, 4 replicationFactor: Int, 5 topicConfig: Properties = new Properties, 6 rackAwareMode: RackAwareMode = RackAwareMode.Enforced, 7 tag: String = "") { 8 //1、创建默认quota 9 createDefaultGroupQuota(zkUtils,topic) 10 11 //2、然后再去创建topic 12 var currentTag = tag 13 var currentrReplicationFactor = replicationFactor 14 info("createTopic topic=" + topic + ", tag=" + tag + ", partitions=" + partitions + ", replicationFactor=" + replicationFactor) 15 if (Topic.GroupMetadataTopicName.equals(topic)) { 16 val brokers = zkUtils.getLiveBrokersInTag(Topic.GroupMetadataTagName) 17 if (!brokers.isEmpty) { 18 currentTag = Topic.GroupMetadataTagName 19 currentrReplicationFactor = Math.min(brokers.size, replicationFactor) 20 } 21 } else { 22 if (currentTag != "") { 23 if (! zkUtils.checkTagExist(currentTag)) { 24 throw new AdminOperationException("Tag " + currentTag + " not exists.") 25 } 26 } 27 } 28 val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode, tag = currentTag) 29 // 根据是否有机架感知信息,来对tp进行broker上的分配 30 val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, currentrReplicationFactor) 31 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig, tag = currentTag) 32 }
核心是最后两行,首先计算要创建的topic的tp如何分配在哪些brokers上,然后将分配结果写入zk。tp分配的算法不是本文的重点,它的基本思想就是取模轮询计算,使得所有tp尽可能分布在不同的broker和机架位上。重点是写zk部分,进去看一下。
1 def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils, 2 topic: String, 3 partitionReplicaAssignment: Map[Int, Seq[Int]], 4 config: Properties = new Properties, 5 update: Boolean = false, 6 tag: String = "") { 7 // 根据参数判断这个create是否合法 8 validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update) 9 10 // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported 11 if (!update) { 12 // write out the config if there is any, this isn‘t transactional(有关的) with the partition assignments 13 // 写zk过程-/config/topic/topicName目录 14 writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config) 15 } 16 17 // create the partition assignment 18 // 写zk过程-topic下的层级 19 writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update, tag = tag) 20 }
可以看到,在判断这个create topic是否合法之后,就开始写zk了,首先写/config/topic/topicName目录,然后写写zk过程-topic下的层级,具体目录路径读者可继续深入。到这里,通过创建topic的command对zk的操作就完成了,由于监听zk的监听器存在,就会触发感知controller,具体开始注册zk监听的过程是在选举controller的过程中完成的,首先目光转到controller层,先看下针对于topic目录的监听器的实现:
1 /** 2 * This is the zookeeper listener that triggers all the state transitions for a partition 3 */ 4 class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener { 5 6 protected def logName = "TopicChangeListener" 7 8 // zk写完topic的目录后会被感知,是topic创建后controller反应的入口 9 def doHandleChildChange(parentPath: String, children: Seq[String]) { 10 inLock(controllerContext.controllerLock) { 11 if (hasStarted.get) { 12 try { 13 val currentChildren = { 14 debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(","))) 15 children.toSet 16 } 17 val newTopics = currentChildren -- controllerContext.allTopics 18 val deletedTopics = controllerContext.allTopics -- currentChildren 19 controllerContext.allTopics = currentChildren 20 21 val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq) 22 // 需要更新上下文信息,才能完成之后的主副本选举过程 23 controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => 24 !deletedTopics.contains(p._1.topic)) 25 controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment) 26 info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, 27 deletedTopics, addedPartitionReplicaAssignment)) 28 if (newTopics.nonEmpty) 29 controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet) 30 } catch { 31 case e: Throwable => error("Error while handling new topic", e) 32 } 33 } 34 } 35 } 36 }
可以看到,这是个监听集群topic变化的监听器,它首先会通过zk目录的变化计算出新创建的topic以及删除的topic有哪些,在分析创建topic的时候,我们可以先只关注新创建的topic。然后获取新创建topic的tp的assignment,并更新controllerContext.partitionReplicaAssignment,至于为什么要更新这个值,后面会介绍,暂且记作mark1吧。然后就是调用onNewTopicCreation。
1 def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) { 2 info("New topic creation callback for %s".format(newPartitions.mkString(","))) 3 // subscribe to partition changes 4 // 先给新创建的topic添加tp change的listener 5 topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic)) 6 onNewPartitionCreation(newPartitions) 7 }
可以看到里面首先给新创建的topic添加tp change的listener,然后对partition进行onNewPartitionCreation。
1 /** 2 * This callback is invoked by the topic change callback with the list of failed brokers as input. 3 * It does the following - 4 * 1. Move the newly created partitions to the NewPartition state 5 * 2. Move the newly created partitions from NewPartition->OnlinePartition state 6 */ 7 def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) { 8 info("New partition creation callback for %s".format(newPartitions.mkString(","))) 9 // 将所有tp的state转为NewPartition 10 partitionStateMachine.handleStateChanges(newPartitions, NewPartition) 11 // 将所有replica的state转为NewReplica 12 replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica) 13 // 将所有tp的state转为online上线 14 partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector) 15 // 将所有replica的state转为online上线 16 replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica) 17 }
这四步就是创建topic的tp的四大步,简要概括就是针对于partition和replica有各自对应的状态机,先将所有partition和replica的state初始化为NewPartition,然后再分别online上线。我们再针对每一步去做细致的分析,首先是partitionStateMachine.handleStateChanges(newPartitions, NewPartition):
1 /** 2 * This API is invoked by the partition change zookeeper listener 3 * @param partitions The list of partitions that need to be transitioned to the target state 4 * @param targetState The state that the partitions should be moved to 5 */ 6 def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState, 7 leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector, 8 callbacks: Callbacks = (new CallbackBuilder).build) { 9 debug("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(","))) 10 try { 11 brokerRequestBatch.newBatch() 12 partitions.foreach { topicAndPartition => 13 // 给每个tp改了状态,分好了leader和isr,还写了zk,为下面的给server端发leaderAndIsr请求和update metadata请求做铺垫 14 handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks) 15 } 16 // controller给brokers发leaderAndIsr请求和update metadata请求 17 brokerRequestBatch.sendRequestsToBrokers(controller.epoch) 18 }catch { 19 case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e) 20 // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions 21 } 22 }
其中的两大步的中文注释在这一步先不用看,因为对于partition和replica的状态机变化的方法是复用的,注释是针对partition上线,对每个partition的state进行初始化的过程如下:
1 private def handleStateChange(topic: String, partition: Int, targetState: PartitionState, 2 leaderSelector: PartitionLeaderSelector, 3 callbacks: Callbacks) { 4 val topicAndPartition = TopicAndPartition(topic, partition) 5 if (!hasStarted.get) 6 throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " + 7 "the partition state machine has not started") 8 .format(controllerId, controller.epoch, topicAndPartition, targetState)) 9 // 初始给每个tp都置为NonExistentPartition状态,为了下面进行validation 10 val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition) 11 try { 12 targetState match { 13 case NewPartition => 14 // pre: partition did not exist before this 15 assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) 16 partitionState.put(topicAndPartition, NewPartition) 17 val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",") 18 stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s" 19 .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, 20 assignedReplicas))
可以看到这一步状态更改很简单,只是单纯的在partitionState里添加新partition的状态,为NewPartition,然后出来,看看brokerRequestBatch.sendRequestsToBrokers的实现:
1 def sendRequestsToBrokers(controllerEpoch: Int) { 2 try { 3 leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) => 4 partitionStateInfos.foreach { case (topicPartition, state) => 5 val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" 6 stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " + 7 "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, 8 state.leaderIsrAndControllerEpoch, broker, 9 topicPartition.topic, topicPartition.partition)) 10 } 11 val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet 12 val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map { 13 _.getNode(controller.config.interBrokerListenerName) 14 } 15 val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) => 16 val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch 17 val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader, 18 leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion, 19 partitionStateInfo.allReplicas.map(Integer.valueOf).asJava) 20 topicPartition -> partitionState 21 } 22 val leaderAndIsrRequest = new LeaderAndIsrRequest. 23 Builder(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava) 24 // leaderAndIsrRequest只会给涉及到的brokers发送 25 controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, null) 26 } 27 leaderAndIsrRequestMap.clear() 28 29 // 必须先处理leaderAndIsrRequest,再处理updateMetadataRequest 30 updateMetadataRequestPartitionInfoMap.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " + 31 "to brokers %s for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, 32 updateMetadataRequestBrokerSet.toString(), p._1))) 33 val partitionStates = updateMetadataRequestPartitionInfoMap.map { case (topicPartition, partitionStateInfo) => 34 val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch 35 val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader, 36 leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion, 37 partitionStateInfo.allReplicas.map(Integer.valueOf).asJava) 38 topicPartition -> partitionState 39 } 40 41 val version: Short = 42 if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3 43 else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2 44 else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1 45 else 0 46 47 val updateMetadataRequest = { 48 val liveBrokers = if (version == 0) { 49 // Version 0 of UpdateMetadataRequest only supports PLAINTEXT. 50 controllerContext.liveOrShuttingDownBrokers.map { broker => 51 val securityProtocol = SecurityProtocol.PLAINTEXT 52 val listenerName = ListenerName.forSecurityProtocol(securityProtocol) 53 val node = broker.getNode(listenerName) 54 val endPoints = Seq(new EndPoint(node.host, node.port, securityProtocol, listenerName)) 55 new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull) 56 } 57 } else { 58 controllerContext.liveOrShuttingDownBrokers.map { broker => 59 val endPoints = broker.endPoints.map { endPoint => 60 new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName) 61 } 62 new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull) 63 } 64 } 65 new UpdateMetadataRequest.Builder( 66 controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava). 67 setVersion(version) 68 } 69 70 // 给所有存活的brokers发updateMetadataRequest 71 updateMetadataRequestBrokerSet.foreach { broker => 72 controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, updateMetadataRequest, null) 73 } 74 updateMetadataRequestBrokerSet.clear() 75 updateMetadataRequestPartitionInfoMap.clear() 76 77 stopReplicaRequestMap.foreach { case (broker, replicaInfoList) => 78 val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet 79 val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet 80 debug("The stop replica request (delete = true) sent to broker %d is %s" 81 .format(broker, stopReplicaWithDelete.mkString(","))) 82 debug("The stop replica request (delete = false) sent to broker %d is %s" 83 .format(broker, stopReplicaWithoutDelete.mkString(","))) 84 85 val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null) 86 87 // Send one StopReplicaRequest for all partitions that require neither delete nor callback. This potentially 88 // changes the order in which the requests are sent for the same partitions, but that‘s OK. 89 val stopReplicaRequest = new StopReplicaRequest.Builder(controllerId, controllerEpoch, false, 90 replicasToGroup.map(r => new TopicPartition(r.replica.topic, r.replica.partition)).toSet.asJava) 91 controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest) 92 93 replicasToNotGroup.foreach { r => 94 val stopReplicaRequest = new StopReplicaRequest.Builder( 95 controllerId, controllerEpoch, r.deletePartition, 96 Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava) 97 controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest, r.callback) 98 } 99 } 100 stopReplicaRequestMap.clear() 101 } catch { 102 case e: Throwable => 103 if (leaderAndIsrRequestMap.nonEmpty) { 104 error("Haven‘t been able to send leader and isr requests, current state of " + 105 s"the map is $leaderAndIsrRequestMap. Exception message: $e") 106 } 107 if (updateMetadataRequestBrokerSet.nonEmpty) { 108 error(s"Haven‘t been able to send metadata update requests to brokers $updateMetadataRequestBrokerSet, " + 109 s"current state of the partition info is $updateMetadataRequestPartitionInfoMap. Exception message: $e") 110 } 111 if (stopReplicaRequestMap.nonEmpty) { 112 error("Haven‘t been able to send stop replica requests, current state of " + 113 s"the map is $stopReplicaRequestMap. Exception message: $e") 114 } 115 throw new IllegalStateException(e) 116 } 117 }
可以看到这步的操作主要是针对leaderAndIsrRequestMap和leaderAndIsrRequestMap进行操作的,细看这两个集合的操作完成之后都会clear,所以对于刚初始化的partition这两个集合是空的,什么也不做。到此partition的状态机转为newPartition就做完了,然后就是将所有replica的state转为NewReplica。
1 def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState, 2 callbacks: Callbacks = (new CallbackBuilder).build) { 3 if(replicas.nonEmpty) { 4 debug("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(","))) 5 try { 6 brokerRequestBatch.newBatch() 7 replicas.foreach(r => handleStateChange(r, targetState, callbacks)) 8 // 已经将updateMetadataRequestBrokerSet和leaderAndIsrRequestMap在tp状态机上线完成server请求后clear 9 // 此轮什么也不做 10 brokerRequestBatch.sendRequestsToBrokers(controller.epoch) 11 }catch { 12 case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e) 13 } 14 } 15 }
同partition的初始化状态一样,这步的中文注释也是online上线,进去看下对于replica状态机的初始化的处理:
1 def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState, 2 callbacks: Callbacks) { 3 val topic = partitionAndReplica.topic 4 val partition = partitionAndReplica.partition 5 val replicaId = partitionAndReplica.replica 6 val topicAndPartition = TopicAndPartition(topic, partition) 7 if (!hasStarted.get) 8 throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " + 9 "to %s failed because replica state machine has not started") 10 .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState)) 11 val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica) 12 try { 13 val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) 14 targetState match { 15 case NewReplica => 16 assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) 17 // start replica as a follower to the current leader for its partition 18 val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) 19 leaderIsrAndControllerEpochOpt match { 20 case Some(leaderIsrAndControllerEpoch) => 21 if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) 22 throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica" 23 .format(replicaId, topicAndPartition) + "state as it is being requested to become leader") 24 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), 25 topic, partition, leaderIsrAndControllerEpoch, 26 replicaAssignment) 27 case None => // new leader request will be sent to this replica when one gets elected 28 } 29 replicaState.put(partitionAndReplica, NewReplica) 30 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 31 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, 32 targetState))
相比partition的单纯在partitionState里添加新partition的状态,它还多了一步brokerRequestBatch.addLeaderAndIsrRequestForBrokers,但由于此时还没有选出leader,所以这一步将被跳过,这就完成了replica状态机的初始化。
然后就该对partition的状态进行online上线了,这一步非常重要,80%的处理都在这部分,一起进去看看吧。
1 def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState, 2 leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector, 3 callbacks: Callbacks = (new CallbackBuilder).build) { 4 debug("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(","))) 5 try { 6 brokerRequestBatch.newBatch() 7 partitions.foreach { topicAndPartition => 8 // 给每个tp改了状态,分好了leader和isr,还写了zk,为下面的给server端发leaderAndIsr请求和update metadata请求做铺垫 9 handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks) 10 } 11 // controller给brokers发leaderAndIsr请求和update metadata请求 12 brokerRequestBatch.sendRequestsToBrokers(controller.epoch) 13 }catch { 14 case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e) 15 // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions 16 } 17 }
1 private def handleStateChange(topic: String, partition: Int, targetState: PartitionState, 2 leaderSelector: PartitionLeaderSelector, 3 callbacks: Callbacks) { 4 val topicAndPartition = TopicAndPartition(topic, partition) 5 if (!hasStarted.get) 6 throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " + 7 "the partition state machine has not started") 8 .format(controllerId, controller.epoch, topicAndPartition, targetState)) 9 // 初始给每个tp都置为NonExistentPartition状态,为了下面进行validation 10 val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition) 11 try { 12 targetState match { 13 case NewPartition => 14 // pre: partition did not exist before this 15 assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition) 16 partitionState.put(topicAndPartition, NewPartition) 17 val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",") 18 stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s" 19 .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, 20 assignedReplicas)) 21 // post: partition has been assigned replicas 22 case OnlinePartition => 23 assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition) 24 partitionState(topicAndPartition) match { 25 case NewPartition => 26 // initialize leader and isr path for new partition 27 // 不仅建leader和isr的zk path,还更新leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap, 28 // 还更新了partitionLeadershipInfo 29 initializeLeaderAndIsrForPartition(topicAndPartition) 30 case OfflinePartition => 31 electLeaderForPartition(topic, partition, leaderSelector) 32 case OnlinePartition => // invoked when the leader needs to be re-elected 33 electLeaderForPartition(topic, partition, leaderSelector) 34 case _ => // should never come here since illegal previous states are checked above 35 } 36 partitionState.put(topicAndPartition, OnlinePartition) 37 val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader 38 stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d" 39 .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader)) 40 } 41 } 42 {
回到这个复用的方法里,可以看到除了将partition的状态从newPartition改为onlinePartition之外,最重要的就是initializeLeaderAndIsrForPartition,在里面,选了leader和isr,并且为其写了zk路径,还更新leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap,以及partitionLeadershipInfo。细节需要进去看:
1 private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) { 2 val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) 3 val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r)) 4 liveAssignedReplicas.size match { 5 case 0 => 6 val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " + 7 "live brokers are [%s]. No assigned replica is alive.") 8 .format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds) 9 stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg) 10 throw new StateChangeFailedException(failMsg) 11 case _ => 12 debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas)) 13 // make the first replica in the list of assigned replicas, the leader 14 val leader = liveAssignedReplicas.head 15 val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList), 16 controller.epoch) 17 debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch)) 18 try { 19 zkUtils.createPersistentPath( 20 getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), 21 zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch)) 22 // NOTE: the above write can fail only if the current controller lost its zk session and the new controller 23 // took over and initialized this partition. This can happen if the current controller went into a long 24 // GC pause 25 controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch) 26 // 更新leaderAndIsrRequestMap和更新updateMetadataRequestPartitionInfoMap,这两个 27 // 是用来后面的server端处理leaderAndIsr请求用的,只给tp的相关brokers添加LeaderAndIsrRequest 28 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic, 29 topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment) 30 } catch { 31 case _: ZkNodeExistsException => 32 // read the controller epoch 33 val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic, 34 topicAndPartition.partition).get 35 val failMsg = ("encountered error while changing partition %s‘s state from New to Online since LeaderAndIsr path already " + 36 "exists with value %s and controller epoch %d") 37 .format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch) 38 stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg) 39 throw new StateChangeFailedException(failMsg) 40 } 41 } 42 }
首先是获取了该partition的replica分配,这正是从controllerContext.partitionReplicaAssignment里得到,与mark1呼应。在创建topic时replica的选举leader也很简单,就是对一个partition的replica中选第一个作为leader,所有在存活brokers上的relica都默认进isr队列,并且记录当前controller的epoch,这个值大有用处,在后面会有用到,暂且记作mark2。然后就是在zk上leader和isr的相关数据,并且更新到controllerContext.partitionLeadershipInfo,最后就是为该partition的所有replica相关的brokers添加LeaderAndIsrRequest:
1 def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, 2 leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, 3 replicas: Seq[Int], callback: AbstractResponse => Unit = null) { 4 val topicPartition = new TopicPartition(topic, partition) 5 6 brokerIds.filter(_ >= 0).foreach { brokerId => 7 val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty) 8 result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet)) 9 } 10 11 // 更新updateMetadataRequestPartitionInfoMap,需要更新meta data的那些tp的request,可以看到给所有存活的brokers都添加 12 addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, 13 Set(TopicAndPartition(topic, partition))) 14 }
1 /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */ 2 def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int], 3 partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition], 4 callback: AbstractResponse => Unit = null) { 5 def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean) { 6 val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition) 7 leaderIsrAndControllerEpochOpt match { 8 case Some(leaderIsrAndControllerEpoch) => 9 val replicas = controllerContext.partitionReplicaAssignment(partition).toSet 10 val partitionStateInfo = if (beingDeleted) { 11 val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr) 12 PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas) 13 } else { 14 PartitionStateInfo(leaderIsrAndControllerEpoch, replicas) 15 } 16 updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo) 17 // replica由不存在转为new时 18 case None => 19 info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition)) 20 } 21 } 22 23 val filteredPartitions = { 24 val givenPartitions = if (partitions.isEmpty) 25 controllerContext.partitionLeadershipInfo.keySet 26 else 27 partitions 28 if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty) 29 givenPartitions 30 else 31 givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted 32 } 33 34 // updateMetadataRequest会给所有存活的broker发送 35 updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0) 36 filteredPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = false)) 37 controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) 38 }
可以看到添加的请求有两种:LeaderAndIsrRequest和updateMetadataRequest,它们实质上都更新的是leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap,因为最后给server端发送的request都是从这两个集合里取的,还有一点就是,LeaderAndIsrRequest只会给该partition的相应replica的brokerids添加,而updateMetadataRequest会给所有的存活的brokers添加,换句话说就是LeaderAndIsr的请求会发给相关的broker处理,updateMetadata的请求会发给所有broker的更新。当相应的request的map包装完添加到相应的broker上之后就可以发送请求给server端处理了,目光回到partition的handleStateChanges:
1 def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState, 2 leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector, 3 callbacks: Callbacks = (new CallbackBuilder).build) { 4 debug("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(","))) 5 try { 6 brokerRequestBatch.newBatch() 7 partitions.foreach { topicAndPartition => 8 // 给每个tp改了状态,分好了leader和isr,还写了zk,为下面的给server端发leaderAndIsr请求和update metadata请求做铺垫 9 handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks) 10 } 11 // controller给brokers发leaderAndIsr请求和update metadata请求 12 brokerRequestBatch.sendRequestsToBrokers(controller.epoch) 13 }catch { 14 case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e) 15 // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions 16 } 17 }
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)完成的就是发送给server端处理的过程,进去看一下:
1 def sendRequestsToBrokers(controllerEpoch: Int) { 2 try { 3 leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) => 4 partitionStateInfos.foreach { case (topicPartition, state) => 5 val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" 6 stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " + 7 "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest, 8 state.leaderIsrAndControllerEpoch, broker, 9 topicPartition.topic, topicPartition.partition)) 10 } 11 val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet 12 val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map { 13 _.getNode(controller.config.interBrokerListenerName) 14 } 15 val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) => 16 val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch 17 val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader, 18 leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion, 19 partitionStateInfo.allReplicas.map(Integer.valueOf).asJava) 20 topicPartition -> partitionState 21 } 22 val leaderAndIsrRequest = new LeaderAndIsrRequest. 23 Builder(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava) 24 // leaderAndIsrRequest只会给涉及到的brokers发送 25 controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, null) 26 } 27 leaderAndIsrRequestMap.clear() 28 29 // 必须先处理leaderAndIsrRequest,再处理updateMetadataRequest 30 updateMetadataRequestPartitionInfoMap.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " + 31 "to brokers %s for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch, 32 updateMetadataRequestBrokerSet.toString(), p._1))) 33 val partitionStates = updateMetadataRequestPartitionInfoMap.map { case (topicPartition, partitionStateInfo) => 34 val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch 35 val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader, 36 leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion, 37 partitionStateInfo.allReplicas.map(Integer.valueOf).asJava) 38 topicPartition -> partitionState 39 } 40 41 val version: Short = 42 if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3 43 else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2 44 else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1 45 else 0 46 47 val updateMetadataRequest = { 48 val liveBrokers = if (version == 0) { 49 // Version 0 of UpdateMetadataRequest only supports PLAINTEXT. 50 controllerContext.liveOrShuttingDownBrokers.map { broker => 51 val securityProtocol = SecurityProtocol.PLAINTEXT 52 val listenerName = ListenerName.forSecurityProtocol(securityProtocol) 53 val node = broker.getNode(listenerName) 54 val endPoints = Seq(new EndPoint(node.host, node.port, securityProtocol, listenerName)) 55 new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull) 56 } 57 } else { 58 controllerContext.liveOrShuttingDownBrokers.map { broker => 59 val endPoints = broker.endPoints.map { endPoint => 60 new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName) 61 } 62 new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull) 63 } 64 } 65 new UpdateMetadataRequest.Builder( 66 controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava). 67 setVersion(version) 68 } 69 70 // 给所有存活的brokers发updateMetadataRequest 71 updateMetadataRequestBrokerSet.foreach { broker => 72 controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, updateMetadataRequest, null) 73 } 74 updateMetadataRequestBrokerSet.clear() 75 updateMetadataRequestPartitionInfoMap.clear() 76 77 stopReplicaRequestMap.foreach { case (broker, replicaInfoList) => 78 val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet 79 val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet 80 debug("The stop replica request (delete = true) sent to broker %d is %s" 81 .format(broker, stopReplicaWithDelete.mkString(","))) 82 debug("The stop replica request (delete = false) sent to broker %d is %s" 83 .format(broker, stopReplicaWithoutDelete.mkString(","))) 84 85 val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null) 86 87 // Send one StopReplicaRequest for all partitions that require neither delete nor callback. This potentially 88 // changes the order in which the requests are sent for the same partitions, but that‘s OK. 89 val stopReplicaRequest = new StopReplicaRequest.Builder(controllerId, controllerEpoch, false, 90 replicasToGroup.map(r => new TopicPartition(r.replica.topic, r.replica.partition)).toSet.asJava) 91 controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest) 92 93 replicasToNotGroup.foreach { r => 94 val stopReplicaRequest = new StopReplicaRequest.Builder( 95 controllerId, controllerEpoch, r.deletePartition, 96 Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava) 97 controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest, r.callback) 98 } 99 } 100 stopReplicaRequestMap.clear() 101 } catch { 102 case e: Throwable => 103 if (leaderAndIsrRequestMap.nonEmpty) { 104 error("Haven‘t been able to send leader and isr requests, current state of " + 105 s"the map is $leaderAndIsrRequestMap. Exception message: $e") 106 } 107 if (updateMetadataRequestBrokerSet.nonEmpty) { 108 error(s"Haven‘t been able to send metadata update requests to brokers $updateMetadataRequestBrokerSet, " + 109 s"current state of the partition info is $updateMetadataRequestPartitionInfoMap. Exception message: $e") 110 } 111 if (stopReplicaRequestMap.nonEmpty) { 112 error("Haven‘t been able to send stop replica requests, current state of " + 113 s"the map is $stopReplicaRequestMap. Exception message: $e") 114 } 115 throw new IllegalStateException(e) 116 } 117 }
代码虽多,但核心就是对上面的两个集合leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap进行取元素发请求,然后跳到server端先看下leaderAndIsrRequest是如何处理的:
1 def handleLeaderAndIsrRequest(request: RequestChannel.Request) { 2 // ensureTopicExists is only for client facing requests 3 // We can‘t have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they 4 // stop serving data to clients for the topic being deleted 5 val correlationId = request.header.correlationId 6 val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest] 7 8 try { 9 // 如果是__consumer_offsets相关,还需要处理consumer group的数据迁移,包括log数据的迁移和group metadata的缓存迁移 10 // log数据迁移已经在make leader和make follower中ready,所以这部分主要做group metadata的缓存迁移 11 def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) { 12 // for each new leader or follower, call coordinator to handle consumer group migration. 13 // this callback is invoked under the replica state change lock to ensure proper order of 14 // leadership changes 15 updatedLeaders.foreach { partition => 16 LimitTopicsManager.checkAndPutData(partition.topic) 17 if (partition.topic == Topic.GroupMetadataTopicName) 18 coordinator.handleGroupImmigration(partition.partitionId) 19 } 20 updatedFollowers.foreach { partition => 21 LimitTopicsManager.checkAndPutData(partition.topic) 22 if (partition.topic == Topic.GroupMetadataTopicName) 23 coordinator.handleGroupEmigration(partition.partitionId) 24 } 25 } 26 27 val leaderAndIsrResponse = 28 if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { 29 val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange) 30 new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava) 31 } else { 32 val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap 33 new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava) 34 } 35 36 requestChannel.sendResponse(new Response(request, leaderAndIsrResponse)) 37 } catch { 38 case e: KafkaStorageException => 39 fatal("Disk error during leadership change.", e) 40 Runtime.getRuntime.halt(1) 41 } 42 }
1 def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest, 2 metadataCache: MetadataCache, 3 onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = { 4 info("becomeLeaderOrFollower correlationId=" + correlationId + " leaderAndISRRequest=" + leaderAndISRRequest) 5 leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => 6 stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" 7 .format(localBrokerId, stateInfo, correlationId, 8 leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition)) 9 } 10 replicaStateChangeLock synchronized { 11 val responseMap = new mutable.HashMap[TopicPartition, Short] 12 // 请求里的controller leader epoch小于当前的值,说明是无效的 13 if (leaderAndISRRequest.controllerEpoch < controllerEpoch) { 14 stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " + 15 "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId, 16 correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch)) 17 BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code) 18 } else { 19 val controllerId = leaderAndISRRequest.controllerId 20 controllerEpoch = leaderAndISRRequest.controllerEpoch 21 22 // First check partition‘s leader epoch 23 val partitionState = new mutable.HashMap[Partition, PartitionState]() 24 leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => 25 val partition = getOrCreatePartition(topicPartition) 26 // 当前内存里获取到的tp leader的epoch值,初始是0-1=-1 27 val partitionLeaderEpoch = partition.getLeaderEpoch 28 // If the leader epoch is valid record the epoch of the controller that made the leadership decision. 29 // This is useful while updating the isr to maintain the decision maker controller‘s epoch in the zookeeper path 30 // 请求里的tp leader的epoch值要大于当前值才有效,并且只留下本broker的tp 31 if (partitionLeaderEpoch < stateInfo.leaderEpoch) { 32 if(stateInfo.replicas.contains(localBrokerId)) 33 partitionState.put(partition, stateInfo) 34 else { 35 stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + 36 "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s") 37 .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, 38 topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(","))) 39 responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) 40 } 41 } else { 42 // Otherwise record the error code in response 43 stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " + 44 "epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d") 45 .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch, 46 topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch)) 47 responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code) 48 } 49 } 50 51 val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) => 52 stateInfo.leader == localBrokerId 53 } 54 val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys 55 56 val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty) 57 makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap) 58 else 59 Set.empty[Partition] 60 val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty) 61 makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache) 62 else 63 Set.empty[Partition] 64 65 // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions 66 // have been completely populated before starting the checkpointing there by avoiding weird race conditions 67 if (!hwThreadInitialized) { 68 startHighWaterMarksCheckPointThread() 69 hwThreadInitialized = true 70 } 71 // make完leaders和followers后,清理掉无用的fetch threads 72 replicaFetcherManager.shutdownIdleFetcherThreads() 73 74 // 处理数据迁移 75 onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower) 76 BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code) 77 } 78 } 79 }
进入到becomeLeaderOrFollower方法中,首先判断了请求中的controllerEpoch的值与当前值的大小关系,这个用处与mark2呼应,如果请求的值小于当前值,那么说明这个controller发来的请求作废,因为controllerEpoch代表的是controller的版本,每当controller选主都会更新该值加1,如果请求里的该标记值小于内存中的当前值,则说明发送请求的controller是过期的。epoch值有效的话就直接更新controllerEpoch,然后再对所有partition做如下操作:将该tp加到allPartitions中,获取该tp的leaderEpoch值,作用同controller的epoch值一样,其次过滤出有副本在本broker上的所有tp,然后在这些tp中,找出tp的leaderid等于该brokerid的所有tp,表示这些tp将在该broker上完成makeLeader的过程,同理剩余tp将完成makeFollower的过程。当makeLeader和makeFollower完成之后,就需要清理下无用的fetch线程,如果涉及到的topic是__consumer_offsets,那么还需要完成相关的数据迁移。接下来重点讨论下makeLeaders和makeFollowers的过程,首先是makeLeaders:
1 private def makeLeaders(controllerId: Int, 2 epoch: Int, 3 partitionState: Map[Partition, PartitionState], 4 correlationId: Int, 5 responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = { 6 info("makeLeaders controllerId=" + controllerId + " epoch=" + epoch + " correlationId=" + correlationId) 7 partitionState.keys.foreach { partition => 8 stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + 9 "starting the become-leader transition for partition %s") 10 .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) 11 } 12 13 for (partition <- partitionState.keys) 14 responseMap.put(partition.topicPartition, Errors.NONE.code) 15 16 val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set() 17 18 try { 19 // First stop fetchers for all the partitions 20 // 要成为leader,首先停掉follower的fetch线程 21 replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition)) 22 // Update the partition information to be the leader 23 partitionState.foreach{ case (partition, partitionStateInfo) => 24 // make leader的行为 25 if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) 26 partitionsToMakeLeaders += partition 27 else 28 stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " + 29 "controller %d epoch %d for partition %s since it is already the leader for the partition.") 30 .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) 31 } 32 partitionsToMakeLeaders.foreach { partition => 33 stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " + 34 "%d epoch %d with correlation id %d for partition %s") 35 .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition)) 36 } 37 38 // Init partition newOffsetMetaDataMap for Leader 39 val metaData = new mutable.HashMap[TopicPartition, NewOffsetMetaData] 40 partitionsToMakeLeaders.map{ partition => 41 val leo: Long = partition.leaderReplicaIfLocal.get.logEndOffset.messageOffset 42 val lst: Long = partition.logManager.getLog(partition.topicPartition).get.segments.firstEntry().getValue.log.creationTime() 43 val let: Long = partition.logManager.getLog(partition.topicPartition).get.segments.lastEntry().getValue.log.file.lastModified() 44 val lso: Long = partition.leaderReplicaIfLocal.get.logStartOffset 45 metaData.put(partition.topicPartition, new NewOffsetMetaData(partition.leaderReplicaIdOpt.get, leo, lst, let, lso)) 46 } 47 48 info("makeLeaders updateNewOffsetMetaData broker=" + localBrokerId + " metaData=" + metaData) 49 updateNewOffsetMetaData(localBrokerId, metaData) 50 } catch { 51 case e: Throwable => 52 partitionState.keys.foreach { partition => 53 val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" + 54 " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition) 55 stateChangeLogger.error(errorMsg, e) 56 } 57 // Re-throw the exception for it to be caught in KafkaApis 58 throw e 59 } 60 61 partitionState.keys.foreach { partition => 62 stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + 63 "for the become-leader transition for partition %s") 64 .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) 65 } 66 67 partitionsToMakeLeaders 68 }
这里一共有两步:既然要成为leader,首先停掉follower的fetch线程,其次就是更新partition信息,而更新partition信息的具体过程就在makeLeader中,这里实现的是一个partition成为leader时要做的工作,具体如下:
1 /** 2 * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset 3 * from the time when this broker was the leader last time) and setting the new leader and ISR. 4 * If the leader replica id does not change, return false to indicate the replica manager. 5 */ 6 def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = { 7 val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) { 8 val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt) 9 // record the epoch of the controller that made the leadership decision. This is useful while updating the isr 10 // to maintain the decision maker controller‘s epoch in the zookeeper path 11 // 维护最新的controllerEpoch,方便在特殊情况下出现多leader controller时做决策 12 controllerEpoch = partitionStateInfo.controllerEpoch 13 // add replicas that are new 14 // 重新make leader replica时,更新offset到hw值(截断至hw),放到assignedReplicas中,如果是local broker的 15 // replica,还会附带log数据 16 allReplicas.foreach(replica => getOrCreateReplica(replica)) 17 val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet 18 // remove assigned replicas that have been removed by the controller 19 // 此tp之前在本broker上的的follower replica都要移除,oar+rar-rar,要删除oar 20 (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica) 21 inSyncReplicas = newInSyncReplicas 22 leaderEpoch = partitionStateInfo.leaderEpoch 23 zkVersion = partitionStateInfo.zkVersion 24 val isNewLeader = 25 if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) { 26 false 27 } else { 28 // 主副本编号不存在或现主副本编号与原主副本编号不一致(备份副本转为主副本) 29 info("makeLeader true, before leaderReplicaIdOpt=" + leaderReplicaIdOpt + 30 " after leaderReplicaIdOpt=" + localBrokerId + " topic="+ topicPartition) 31 leaderReplicaIdOpt = Some(localBrokerId) 32 true 33 } 34 val leaderReplica = getReplica().get 35 val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset 36 val curTimeMs = time.milliseconds 37 // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset. 38 // 对其余replica reset最近的进队时间 39 (assignedReplicas - leaderReplica).foreach { replica => 40 val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L 41 replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs) 42 } 43 // we may need to increment high watermark since ISR could be down to 1 44 if (isNewLeader) { 45 // construct the high watermark metadata for the new leader replica 46 leaderReplica.convertHWToLocalOffsetMetadata() 47 // reset log end offset for remote replicas 48 // 由于是新leader,需要重置下远程replica的几个offset值 49 assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult)) 50 } 51 // 比较现有replica的leo值,看是否更新leader的hw值 52 (maybeIncrementLeaderHW(leaderReplica), isNewLeader) 53 } 54 // some delayed operations may be unblocked after HW changed 55 if (leaderHWIncremented) 56 // hw更新后,需要尝试完成fetch和完成同步response 57 tryCompleteDelayedRequests() 58 isNewLeader 59 }
具体controllerEpoch的使用同上面所说,接下来就是对该partition的所有replica进行创建,对于本broker的replica(也就是leader),需要更新offset到hw值(截断至hw),放到assignedReplicas中,还会附带log数据,而不在本broker的replica,也就是follower,只会建立一个远程副本。具体创建的实现如下:
1 def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = { 2 assignedReplicaMap.getAndMaybePut(replicaId, { 3 if (isReplicaLocal(replicaId)) { 4 val config = LogConfig.fromProps(logManager.defaultConfig.originals, 5 AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)) 6 val log = logManager.createLog(topicPartition, config) 7 val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath) 8 val offsetMap = checkpoint.read 9 if (!offsetMap.contains(topicPartition)) 10 info(s"No checkpointed highwatermark is found for partition $topicPartition") 11 val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset) 12 new Replica(replicaId, this, time, offset, Some(log)) 13 } else new Replica(replicaId, this, time) // 这是在本broker中建远程副本 14 }) 15 }
回到makeLeader中来,所有的replica创建完之后,然后将之前旧的replica移出,具体就是assignedReplicas包含的是之前的oar加上了新的rar,allReplicas就是新的rar,所以oar+rar-rar=oar,也就是就的replica集合要被删除。然后更新该tp的isr和leaderEpoch信息,值得注意的是,这里的leaderEpoch的使用同controller的LeaderEpoch。然后判断选出来的leader是不是新leader,因为可能存在之前该tp的leader就在该broker上,也就是前后一致,这样就不是新选出的leader。之后就是对isr里的其它replica重置进队时间(当前时间),如果是新leader的情况,还需要重置下远程replica的几个offset值。然后比较现有replica的leo值,看是否更新leader的hw值,如果leader更新了leo值,那么之前的delay fetch和完成客户端对发送消息的response,这两个delay operation的具体实现读者可以自行深入研究,由于篇幅有限,不做展开。到这里makeLeader中更新partition信息就完成了,makeLeaders也就完成了,本broker上的所有leaders已经处理完成,接下来就是处理makeFollowers了,进去看下具体实现:
1 private def makeFollowers(controllerId: Int, 2 epoch: Int, 3 partitionState: Map[Partition, PartitionState], 4 correlationId: Int, 5 responseMap: mutable.Map[TopicPartition, Short], 6 metadataCache: MetadataCache) : Set[Partition] = { 7 info("makeFollowers controllerId=" + controllerId + " epoch=" + epoch + " correlationId=" + correlationId) 8 partitionState.keys.foreach { partition => 9 stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + 10 "starting the become-follower transition for partition %s") 11 .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) 12 } 13 14 for (partition <- partitionState.keys) 15 responseMap.put(partition.topicPartition, Errors.NONE.code) 16 17 val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() 18 19 try { 20 21 // TODO: Delete leaders from LeaderAndIsrRequest 22 partitionState.foreach{ case (partition, partitionStateInfo) => 23 val newLeaderBrokerId = partitionStateInfo.leader 24 metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { 25 // Only change partition state when the leader is available 26 case Some(_) => 27 if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) 28 partitionsToMakeFollower += partition 29 else 30 stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + 31 "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader") 32 .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, 33 partition.topicPartition, newLeaderBrokerId)) 34 case None => 35 // The leader broker should always be present in the metadata cache. 36 // If not, we should record the error message and abort the transition process for this partition 37 stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" + 38 " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.") 39 .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch, 40 partition.topicPartition, newLeaderBrokerId)) 41 // Create the local replica even if the leader is unavailable. This is required to ensure that we include 42 // the partition‘s high watermark in the checkpoint file (see KAFKA-1647) 43 partition.getOrCreateReplica() 44 } 45 } 46 47 // 停掉现有的fetch 48 replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition)) 49 partitionsToMakeFollower.foreach { partition => 50 stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " + 51 "%d epoch %d with correlation id %d for partition %s") 52 .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition)) 53 } 54 55 // 将log截断至hw位置 56 logManager.truncateTo(partitionsToMakeFollower.map { partition => 57 (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset) 58 }.toMap) 59 60 // 之前可能是leader,需要完成之前作为leader时的一些延时请求 61 partitionsToMakeFollower.foreach { partition => 62 val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition) 63 tryCompleteDelayedProduce(topicPartitionOperationKey) 64 tryCompleteDelayedFetch(topicPartitionOperationKey) 65 } 66 67 partitionsToMakeFollower.foreach { partition => 68 stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " + 69 "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId, 70 partition.topicPartition, correlationId, controllerId, epoch)) 71 } 72 73 if (isShuttingDown.get()) { 74 partitionsToMakeFollower.foreach { partition => 75 stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " + 76 "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId, 77 controllerId, epoch, partition.topicPartition)) 78 } 79 } else { 80 debug("makefollowers config.smartExtendEnable=" + config.smartExtendEnable) 81 if (config.smartExtendEnable) { 82 try { 83 // 1. clear NewOffsetMetaData. 84 clearNewOffsetMetaData(partitionsToMakeFollower) 85 86 // 2. get offset from leader. 87 val ResponseOffsetMap = getBestOffset(partitionsToMakeFollower, metadataCache, config.getStartOffsetRetries) 88 89 if (ResponseOffsetMap.nonEmpty) { 90 val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => 91 partition.topicPartition -> BrokerAndInitialOffset( 92 metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), 93 ResponseOffsetMap.get(partition.topicPartition).get.baseOffset)).toMap 94 95 // 3. trunc log 96 val partitionOffsets: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]() 97 ResponseOffsetMap.map { partition => 98 if (partition._2.error == Errors.OFFSET_HW) { 99 partitionOffsets.put(partition._1, partition._2.baseOffset) 100 } 101 } 102 103 if (partitionOffsets.nonEmpty) { 104 info("makefollowers trunc log, partitionOffsets size=" + partitionOffsets.size + " ResponseOffsetMap size=" + 105 ResponseOffsetMap.size + " partitionOffsets=" + partitionOffsets + " ResponseOffsetMap=" + ResponseOffsetMap) 106 partitionOffsets.map { partitionInfo => 107 // 按照new metadata返回的offset信息截断 108 logManager.truncateFullyAndStartAt(partitionInfo._1, partitionInfo._2) 109 } 110 } 111 replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) 112 } else { 113 // 走老策略兜底 114 val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => 115 partition.topicPartition -> BrokerAndInitialOffset( 116 metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), 117 partition.getReplica().get.logEndOffset.messageOffset)).toMap 118 error("makefollowers getStartOffset fail, and use old mode partitionsToMakeFollowerWithLeaderAndOffset=" + partitionsToMakeFollowerWithLeaderAndOffset) 119 replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) 120 } 121 } catch { 122 case e: Exception => 123 val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => 124 partition.topicPartition -> BrokerAndInitialOffset( 125 metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), 126 partition.getReplica().get.logEndOffset.messageOffset)).toMap 127 error("ReplicaManager makefollowers getStartOffset fail, and use old mode partitionsToMakeFollowerWithLeaderAndOffset=" + partitionsToMakeFollowerWithLeaderAndOffset 128 + " Exception=" + e.getMessage) 129 replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) 130 } 131 } else { 132 //we do not need to check if the leader exists again since this has been done at the beginning of this process 133 val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => 134 partition.topicPartition -> BrokerAndInitialOffset( 135 metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), 136 partition.getReplica().get.logEndOffset.messageOffset)).toMap 137 replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) 138 } 139 140 partitionsToMakeFollower.foreach { partition => 141 stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " + 142 "%d epoch %d with correlation id %d for partition %s") 143 .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition)) 144 } 145 } 146 } catch { 147 case e: Throwable => 148 val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " + 149 "epoch %d").format(localBrokerId, correlationId, controllerId, epoch) 150 stateChangeLogger.error(errorMsg, e) 151 // Re-throw the exception for it to be caught in KafkaApis 152 throw e 153 } 154 155 partitionState.keys.foreach { partition => 156 stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " + 157 "for the become-follower transition for partition %s") 158 .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)) 159 } 160 161 partitionsToMakeFollower 162 }
和makeLeaders的makeLeader一样,首先也会有一个makeFollower的操作,它的功能几乎一样,也是创建所有replica,将旧replica删除,更新该tp的isr和leaderEpoch信息,值得注意的是,如果该tp的leader还和老的tp leader一致,就不会对该broker上的replica做接下来的操作:先停掉现有的fetch线程,然后将log截断至hw的位置,可能之前是leader,所以需要该replica作为leader完成之前delay的客户端对发送消息的response和对它的fetch,最后就是用新leader的leo值重置下该replica作为新follower的初始offset值,并添加fetch线程。至此,makeFollowers的过程也结束了。
然后,回到becomeLeaderOrFollower,接下来就是清理掉无用的fetch threads,最后就是处理数据迁移。代码如下:
1 // 如果是__consumer_offsets相关,还需要处理consumer group的数据迁移,包括log数据的迁移和group metadata的缓存迁移 2 // log数据迁移已经在make leader和make follower中ready,所以这部分主要做group metadata的缓存迁移 3 def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) { 4 // for each new leader or follower, call coordinator to handle consumer group migration. 5 // this callback is invoked under the replica state change lock to ensure proper order of 6 // leadership changes 7 updatedLeaders.foreach { partition => 8 LimitTopicsManager.checkAndPutData(partition.topic) 9 if (partition.topic == Topic.GroupMetadataTopicName) 10 coordinator.handleGroupImmigration(partition.partitionId) 11 } 12 updatedFollowers.foreach { partition => 13 LimitTopicsManager.checkAndPutData(partition.topic) 14 if (partition.topic == Topic.GroupMetadataTopicName) 15 coordinator.handleGroupEmigration(partition.partitionId) 16 } 17 }
也就是说如果涉及到的topic是内置topic __consumer_offsets的话,还需要处理数据迁移,对于在本broker上成为leader的partition,需要『迁移进来group metadata』,在本broker上成为follower的partition,需要『迁出去metadata』,具体怎么迁移的,进去看下吧,首先是handleGroupImmigration:
1 def handleGroupImmigration(offsetTopicPartitionId: Int) { 2 groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded) 3 }
1 private def onGroupLoaded(group: GroupMetadata) { 2 group synchronized { 3 info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}") 4 assert(group.is(Stable) || group.is(Empty)) 5 group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _)) 6 } 7 }
1 def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) { 2 val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition) 3 4 def doLoadGroupsAndOffsets() { 5 info(s"Loading offsets and group metadata from $topicPartition") 6 7 inLock(partitionLock) { 8 if (loadingPartitions.contains(offsetsPartition)) { 9 info(s"Offset load from $topicPartition already in progress.") 10 return 11 } else { 12 loadingPartitions.add(offsetsPartition) 13 } 14 } 15 16 try { 17 loadGroupsAndOffsets(topicPartition, onGroupLoaded) 18 } catch { 19 case t: Throwable => error(s"Error loading offsets from $topicPartition", t) 20 } finally { 21 inLock(partitionLock) { 22 ownedPartitions.add(offsetsPartition) 23 loadingPartitions.remove(offsetsPartition) 24 } 25 } 26 } 27 28 // 加载和清理meta data cache的一个线程 29 scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets) 30 }
可以看到在这一步,将在本broker的相应的leader partition(这里操作的都是__consumer_offsets)加载到loadingPartitions中,然后执行loadGroupsAndOffsets的过程,执行完毕,将该partition放入ownedPartitions,再将其从loadingPartitions移出,这样就完成了leader partition的数据迁入过程。其中loadGroupsAndOffsets的具体实现如下:
1 private[coordinator] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) { 2 def highWaterMark = replicaManager.getHighWatermark(topicPartition).getOrElse(-1L) 3 4 val startMs = time.milliseconds() 5 replicaManager.getLog(topicPartition) match { 6 case None => 7 warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log") 8 9 case Some(log) => 10 var currOffset = log.logStartOffset 11 val buffer = ByteBuffer.allocate(config.loadBufferSize) 12 // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 13 val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]() 14 val removedOffsets = mutable.Set[GroupTopicPartition]() 15 val loadedGroups = mutable.Map[String, GroupMetadata]() 16 val removedGroups = mutable.Set[String]() 17 18 while (currOffset < highWaterMark && !shuttingDown.get()) { 19 buffer.clear() 20 val fileRecords = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true) 21 .records.asInstanceOf[FileRecords] 22 val bufferRead = fileRecords.readInto(buffer, 0) 23 24 MemoryRecords.readableRecords(bufferRead).deepEntries.asScala.foreach { entry => 25 val record = entry.record 26 require(record.hasKey, "Group metadata/offset entry key should not be null") 27 28 // 两类数据分别处理:offset数据和group元数据 29 GroupMetadataManager.readMessageKey(record.key) match { 30 case offsetKey: OffsetKey => 31 // load offset 32 val key = offsetKey.key 33 if (record.hasNullValue) { 34 loadedOffsets.remove(key) 35 removedOffsets.add(key) 36 } else { 37 val value = GroupMetadataManager.readOffsetMessageValue(record.value) 38 loadedOffsets.put(key, value) 39 removedOffsets.remove(key) 40 } 41 42 case groupMetadataKey: GroupMetadataKey => 43 // load group metadata 44 val groupId = groupMetadataKey.key 45 val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value) 46 if (groupMetadata != null) { 47 trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}") 48 removedGroups.remove(groupId) 49 loadedGroups.put(groupId, groupMetadata) 50 } else { 51 loadedGroups.remove(groupId) 52 removedGroups.add(groupId) 53 } 54 55 case unknownKey => 56 throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata") 57 } 58 59 currOffset = entry.nextOffset 60 } 61 } 62 63 val (groupOffsets, emptyGroupOffsets) = loadedOffsets 64 .groupBy(_._1.group) 65 .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)} ) 66 .partition { case (group, _) => loadedGroups.contains(group) } 67 68 loadedGroups.values.foreach { group => 69 val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata]) 70 // 真正更新metadata,包括group metadata和offset 71 loadGroup(group, offsets) 72 // 更新完所有members都心跳一下,表示能读到group metadata,group处于stable状态 73 onGroupLoaded(group) 74 } 75 76 // load groups which store offsets in kafka, but which have no active members and thus no group 77 // metadata stored in the log 78 emptyGroupOffsets.foreach { case (groupId, offsets) => 79 val group = new GroupMetadata(groupId) 80 loadGroup(group, offsets) 81 onGroupLoaded(group) 82 } 83 84 removedGroups.foreach { groupId => 85 // if the cache already contains a group which should be removed, raise an error. Note that it 86 // is possible (however unlikely) for a consumer group to be removed, and then to be used only for 87 // offset storage (i.e. by "simple" consumers) 88 if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId)) 89 throw new IllegalStateException(s"Unexpected unload of active group $groupId while " + 90 s"loading partition $topicPartition") 91 } 92 93 if (!shuttingDown.get()) 94 info("Finished loading offsets from %s in %d milliseconds." 95 .format(topicPartition, time.milliseconds() - startMs)) 96 } 97 }
整体来说,就是读取这个partition的log数据,分为两类数据分别处理:offset数据和group元数据,然后调用loadGroup来真正更新group的offset数据和metadata。其次调用onGroupLoaded使得该group的所有members都心跳一次,这个在上一篇博客coordinator中细讲过。这样leader的数据迁入就完成了。接下来是follower的数据迁出:
1 def handleGroupEmigration(offsetTopicPartitionId: Int) { 2 groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded) 3 }
1 private def onGroupUnloaded(group: GroupMetadata) { 2 group synchronized { 3 info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}") 4 val previousState = group.currentState 5 group.transitionTo(Dead) 6 group.metrics.close() 7 8 previousState match { 9 case Empty | Dead => 10 case PreparingRebalance => 11 for (member <- group.allMemberMetadata) { 12 if (member.awaitingJoinCallback != null) { 13 member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code)) 14 member.awaitingJoinCallback = null 15 } 16 } 17 joinPurgatory.checkAndComplete(GroupKey(group.groupId)) 18 19 case Stable | AwaitingSync => 20 for (member <- group.allMemberMetadata) { 21 if (member.awaitingSyncCallback != null) { 22 member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP.code) 23 member.awaitingSyncCallback = null 24 } 25 heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId)) 26 } 27 } 28 } 29 }
1 def removeGroupsForPartition(offsetsPartition: Int, 2 onGroupUnloaded: GroupMetadata => Unit) { 3 val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition) 4 scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets) 5 6 def removeGroupsAndOffsets() { 7 var numOffsetsRemoved = 0 8 var numGroupsRemoved = 0 9 10 inLock(partitionLock) { 11 // we need to guard the group removal in cache in the loading partition lock 12 // to prevent coordinator‘s check-and-get-group race condition 13 ownedPartitions.remove(offsetsPartition) 14 15 for (group <- groupMetadataCache.values) { 16 if (partitionFor(group.groupId) == offsetsPartition) { 17 // 将group转为dead状态 18 onGroupUnloaded(group) 19 group.metrics.close() 20 groupMetadataCache.remove(group.groupId, group) 21 numGroupsRemoved += 1 22 numOffsetsRemoved += group.numOffsets 23 } 24 } 25 } 26 27 if (numOffsetsRemoved > 0) 28 info(s"Removed $numOffsetsRemoved cached offsets for $topicPartition on follower transition.") 29 30 if (numGroupsRemoved > 0) 31 info(s"Removed $numGroupsRemoved cached groups for $topicPartition on follower transition.") 32 } 33 }
可以看到在本broker的__consumer_offsets partition成为follower之后,需要从ownedPartitions移除,然后将属于该partition的group转为dead状态(这个通过onGroupUnloaded完成),最后将该group的metadata从groupMetadataCache中删除,这样follower的数据迁出也完成了。至此,server端的leaderAndIsr请求就完成了,回到controller那端,接下来就该处理updateMetadataRequest,上面提到过,updateMetadataRequest是发给所有存活的broker的,然后再去server端看下是怎么处理updateMetadataRequest的:
1 def handleUpdateMetadataRequest(request: RequestChannel.Request) { 2 val correlationId = request.header.correlationId 3 val updateMetadataRequest = request.body.asInstanceOf[UpdateMetadataRequest] 4 5 val updateMetadataResponse = 6 if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { 7 // 这一步更新了metadata cache 8 val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache) 9 if (deletedPartitions.nonEmpty) 10 // 需要从group metadata中删除 11 coordinator.handleDeletedPartitions(deletedPartitions) 12 13 // 已经更新完meta data,可以完成delayed topic operations了 14 if (adminManager.hasDelayedTopicOperations) { 15 updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic => 16 adminManager.tryCompleteDelayedTopicOperations(topic) 17 } 18 } 19 new UpdateMetadataResponse(Errors.NONE.code) 20 } else { 21 new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code) 22 } 23 24 if (config.useHdfsFunction) { 25 replicaManager.taskManager.addDiffTask() 26 } 27 28 requestChannel.sendResponse(new Response(request, updateMetadataResponse)) 29 }
1 def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) : Seq[TopicPartition] = { 2 replicaStateChangeLock synchronized { 3 if(updateMetadataRequest.controllerEpoch < controllerEpoch) { 4 val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " + 5 "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId, 6 correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, 7 controllerEpoch) 8 stateChangeLogger.warn(stateControllerEpochErrorMessage) 9 throw new ControllerMovedException(stateControllerEpochErrorMessage) 10 } else { 11 // 更新metadata 12 val deletedPartitions = metadataCache.updateCache(correlationId, updateMetadataRequest) 13 controllerEpoch = updateMetadataRequest.controllerEpoch 14 metadataCache.setControllerEpoch(controllerEpoch) 15 deletedPartitions 16 } 17 } 18 }
1 def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = { 2 inWriteLock(partitionMetadataLock) { 3 controllerId = updateMetadataRequest.controllerId match { 4 case id if id < 0 => None 5 case id => Some(id) 6 } 7 aliveNodes.clear() 8 aliveBrokers.clear() 9 updateMetadataRequest.liveBrokers.asScala.foreach { broker => 10 // `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which 11 // is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could 12 // move to `AnyRefMap`, which has comparable performance. 13 val nodes = new java.util.HashMap[ListenerName, Node] 14 val endPoints = new mutable.ArrayBuffer[EndPoint] 15 broker.endPoints.asScala.foreach { ep => 16 endPoints += EndPoint(ep.host, ep.port, ep.listenerName, ep.securityProtocol) 17 nodes.put(ep.listenerName, new Node(broker.id, ep.host, ep.port)) 18 } 19 aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) 20 aliveNodes(broker.id) = nodes.asScala 21 } 22 23 val deletedPartitions = new mutable.ArrayBuffer[TopicPartition] 24 updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) => 25 val controllerId = updateMetadataRequest.controllerId 26 val controllerEpoch = updateMetadataRequest.controllerEpoch 27 if (info.leader == LeaderAndIsr.LeaderDuringDelete) { 28 removePartitionInfo(tp.topic, tp.partition) 29 stateChangeLogger.trace(s"Broker $brokerId deleted partition $tp from metadata cache in response to UpdateMetadata " + 30 s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") 31 deletedPartitions += tp 32 } else { 33 val partitionInfo = partitionStateToPartitionStateInfo(info) 34 // 实际更新metadata动作 35 addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo) 36 stateChangeLogger.trace(s"Broker $brokerId cached leader info $partitionInfo for partition $tp in response to " + 37 s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") 38 } 39 } 40 deletedPartitions 41 } 42 }
可以看到实际的更新metadata动作在addOrUpdatePartitionInfo中完成,在这几个过程中,发现如果有partition的leader处于被删除状态,还会顺便删除它的元信息,进而把它从group的metadata中删除,这就是updateMetadataRequest完成的事情,相比leaderAndIsrRequest简单很多。再回到controller端,完成了updateMetadata之后,就是stopReplicaRequestMap,由于创建topic过程不会涉及到这块,所以本文暂不深入进去。这样一来,将所有tp的state转为online上线的过程就完成了,最后就是将所有replica的state转为online上线:
1 def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState, 2 callbacks: Callbacks = (new CallbackBuilder).build) { 3 if(replicas.nonEmpty) { 4 debug("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(","))) 5 try { 6 brokerRequestBatch.newBatch() 7 replicas.foreach(r => handleStateChange(r, targetState, callbacks)) 8 // 已经将updateMetadataRequestBrokerSet和leaderAndIsrRequestMap在tp状态机上线完成server请求后clear 9 // 此轮什么也不做 10 brokerRequestBatch.sendRequestsToBrokers(controller.epoch) 11 }catch { 12 case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e) 13 } 14 } 15 }
还是一样的流程,先handleStateChange,再sendRequestsToBrokers,由于已经将updateMetadataRequestBrokerSet和leaderAndIsrRequestMap在tp状态机上线完成server请求后clear,所以在replica上线时sendRequestsToBrokers什么也不做,进去handleStateChange看看做了啥:
1 def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState, 2 callbacks: Callbacks) { 3 val topic = partitionAndReplica.topic 4 val partition = partitionAndReplica.partition 5 val replicaId = partitionAndReplica.replica 6 val topicAndPartition = TopicAndPartition(topic, partition) 7 if (!hasStarted.get) 8 throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " + 9 "to %s failed because replica state machine has not started") 10 .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState)) 11 val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica) 12 try { 13 val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition) 14 targetState match { 15 case NewReplica => 16 assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState) 17 // start replica as a follower to the current leader for its partition 18 val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) 19 leaderIsrAndControllerEpochOpt match { 20 case Some(leaderIsrAndControllerEpoch) => 21 if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) 22 throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica" 23 .format(replicaId, topicAndPartition) + "state as it is being requested to become leader") 24 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), 25 topic, partition, leaderIsrAndControllerEpoch, 26 replicaAssignment) 27 case None => // new leader request will be sent to this replica when one gets elected 28 } 29 replicaState.put(partitionAndReplica, NewReplica) 30 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 31 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, 32 targetState)) 33 case ReplicaDeletionStarted => 34 assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState) 35 replicaState.put(partitionAndReplica, ReplicaDeletionStarted) 36 // send stop replica command 37 brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true, 38 callbacks.stopReplicaResponseCallback) 39 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 40 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 41 case ReplicaDeletionIneligible => 42 assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) 43 replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) 44 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 45 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 46 case ReplicaDeletionSuccessful => 47 assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState) 48 replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful) 49 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 50 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 51 case NonExistentReplica => 52 assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState) 53 // remove this replica from the assigned replicas list for its partition 54 val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 55 controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId)) 56 replicaState.remove(partitionAndReplica) 57 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 58 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 59 case OnlineReplica => 60 assertValidPreviousStates(partitionAndReplica, 61 List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) 62 replicaState(partitionAndReplica) match { 63 case NewReplica => 64 // add this replica to the assigned replicas list for its partition 65 val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 66 if(!currentAssignedReplicas.contains(replicaId)) 67 controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId) 68 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 69 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, 70 targetState)) 71 case _ => 72 // check if the leader for this partition ever existed 73 controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { 74 case Some(leaderIsrAndControllerEpoch) => 75 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, 76 replicaAssignment) 77 replicaState.put(partitionAndReplica, OnlineReplica) 78 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 79 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 80 case None => // that means the partition was never in OnlinePartition state, this means the broker never 81 // started a log for that partition and does not have a high watermark value for this partition 82 } 83 } 84 replicaState.put(partitionAndReplica, OnlineReplica) 85 case OfflineReplica => 86 assertValidPreviousStates(partitionAndReplica, 87 List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState) 88 // send stop replica command to the replica so that it stops fetching from the leader 89 brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false) 90 // As an optimization, the controller removes dead replicas from the ISR 91 val leaderAndIsrIsEmpty: Boolean = 92 controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { 93 case Some(_) => 94 controller.removeReplicaFromIsr(topic, partition, replicaId) match { 95 case Some(updatedLeaderIsrAndControllerEpoch) => 96 // send the shrunk ISR state change request to all the remaining alive replicas of the partition. 97 val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) 98 if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) { 99 brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId), 100 topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment) 101 } 102 replicaState.put(partitionAndReplica, OfflineReplica) 103 stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s" 104 .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState)) 105 false 106 case None => 107 true 108 } 109 case None => 110 true 111 } 112 if (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) 113 throw new StateChangeFailedException( 114 "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty" 115 .format(replicaId, topicAndPartition)) 116 } 117 } 118 catch { 119 case t: Throwable => 120 stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed" 121 .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t) 122 } 123 }
可以看到仅仅时将controllerContext.partitionReplicaAssignment和replicaState更新了一下,也就是可以理解为仅仅将replica的状态由NewReplica改为OnlineReplica。
这样一来,partition和replica的状态机转变就完成了,整个topic创建的过程也就完成了。回头看其实最主要的还是在partition从new到online上线的那一段逻辑。
!--?xml>
以上是关于kafka源码走读-controller (创建topic过程)的主要内容,如果未能解决你的问题,请参考以下文章