kafka源码ReassignPartitionsCommand源码原理分析(附配套教学视频)
Posted 石臻臻的杂货铺
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka源码ReassignPartitionsCommand源码原理分析(附配套教学视频)相关的知识,希望对你有一定的参考价值。
日常运维 、问题排查 怎么能够少了滴滴开源的
(后续的视频会在 公众号[全套视频首发]、CSDN、B站等各平台同名号[石臻臻的杂货铺]上上传 )
【kafka源码】分区副本重分配源码分析(上)
【kafka源码】分区副本重分配源码分析(下)
文章目录
1.脚本的使用
2.源码解析
如果阅读源码太枯燥,可以直接跳转到 源码总结和Q&A部分
2.1--generate
生成分配策略分析
配置启动类--zookeeper xxxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list "0,1,2,3" --generate
配置move-json-file.json
文件
启动,调试:
ReassignPartitionsCommand.generateAssignment
- 获取入参的数据
- 校验
--broker-list
传入的BrokerId是否有重复的,重复就报错 - 开始进行分配
ReassignPartitionsCommand.generateAssignment
def generateAssignment(zkClient: KafkaZkClient, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]]) =
//解析出游哪些Topic
val topicsToReassign = parseTopicsData(topicsToMoveJsonString)
//检查是否有重复的topic
val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
if (duplicateTopicsToReassign.nonEmpty)
throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
//获取topic当前的副本分配情况 /brokers/topics/topicName
val currentAssignment = zkClient.getReplicaAssignmentForTopics(topicsToReassign.toSet)
val groupedByTopic = currentAssignment.groupBy case (tp, _) => tp.topic
//机架感知模式
val rackAwareMode = if (disableRackAware) RackAwareMode.Disabled else RackAwareMode.Enforced
val adminZkClient = new AdminZkClient(zkClient)
val brokerMetadatas = adminZkClient.getBrokerMetadatas(rackAwareMode, Some(brokerListToReassign))
val partitionsToBeReassigned = mutable.Map[TopicPartition, Seq[Int]]()
groupedByTopic.foreach case (topic, assignment) =>
val (_, replicas) = assignment.head
val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
partitionsToBeReassigned ++= assignedReplicas.map case (partition, replicas) =>
new TopicPartition(topic, partition) -> replicas
(partitionsToBeReassigned, currentAssignment)
- 检查是否有重复的topic,重复则抛出异常
- 从zk节点
/brokers/topics/topicName
获取topic当前的副本分配情况 - 从zk节点
brokers/ids
中获取所有在线节点,并跟--broker-list
参数传入的取个交集 - 获取Brokers元数据,如果机架感知模式
RackAwareMode.Enforced
(默认)&&上面3中获取到的交集列表brokers不是都有机架信息或者都没有机架信息的话就抛出异常; 因为要根据机架信息做分区分配的话,必须要么都有机架信息,要么都没有机架信息; 出现这种情况怎么办呢? 那就将机架感知模式RackAwareMode
设置为RackAwareMode.Disabled
;只需要加上一个参数--disable-rack-aware
就行了 - 调用
AdminUtils.assignReplicasToBrokers
计算分配情况;
我们在【kafka源码】创建Topic的时候是如何分区和副本的分配规则里面分析过就不再赘述了,AdminUtils.assignReplicasToBrokers(要分配的Broker们的元数据, 分区数, 副本数)
需要注意的是副本数是通过assignment.head.replicas.size
获取的,意思是第一个分区的副本数量,正常情况下分区副本都会相同,但是也不一定,也可能被设置为了不同
根据这条信息我们是不是就可以直接调用这个接口来实现其他功能? 比如副本的扩缩容
2.2--execute
执行阶段分析
使用脚本执行
--zookeeper xxx --reassignment-json-file config/reassignment-json-file.json --execute --throttle 10000
ReassignPartitionsCommand.executeAssignment
def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[Admin], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L): Unit =
//对json文件进行校验和解析
val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)
val adminZkClient = new AdminZkClient(zkClient)
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient)
//检查是否已经存在副本重分配进程, 则尝试限流
if (zkClient.reassignPartitionsInProgress())
reassignPartitionsCommand.maybeLimit(throttle)
else
//打印当前的副本分配方式,方便回滚
printCurrentAssignment(zkClient, partitionAssignment.map(_._1.topic))
if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)
println(String.format("Warning: You must run Verify periodically, until the reassignment completes, to ensure the throttle is removed. You can also alter the throttle by rerunning the Execute command passing a new value."))
//开始进行重分配进程
if (reassignPartitionsCommand.reassignPartitions(throttle, timeoutMs))
println("Successfully started reassignment of partitions.")
else
println("Failed to reassign partitions %s".format(partitionAssignment))
- 解析json文件并做些校验
- (partition、replica非空校验,partition重复校验)
- 校验
partition
是否有不存在的分区;(新增分区请用kafka-topic
) - 检查配置中的Brokers-id是否都存在
- 如果发现已经存在副本重分配进程(检查是否有节点
/admin/reassign_partitions
),则检查是否需要更改限流; 如果有参数(--throttle
,--replica-alter-log-dirs-throttle
) 则设置限流信息; 而后不再执行下一步 - 如果当前没有执行中的副本重分配任务(检查是否有节点
/admin/reassign_partitions
),则开始进行副本重分配任务;
2.2.1 已有任务,尝试限流
如果zk中有节点/admin/reassign_partitions
; 则表示当前已有一个任务在进行,那么当前操作就不继续了,如果有参数
--throttle:
--replica-alter-log-dirs-throttle:
则进行限制
限制当前移动副本的节流阀。请注意,此命令可用于更改节流阀,但如果某些代理已完成重新平衡,则它可能不会更改最初设置的所有限制。所以后面需要将这个限制给移除掉 通过
--verify
maybeLimit
def maybeLimit(throttle: Throttle): Unit =
if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0)
//当前存在的broker
val existingBrokers = existingAssignment().values.flatten.toSeq
//期望的broker
val proposedBrokers = proposedPartitionAssignment.values.flatten.toSeq ++ proposedReplicaAssignment.keys.toSeq.map(_.brokerId())
//前面broker相加去重
val brokers = (existingBrokers ++ proposedBrokers).distinct
//遍历与之相关的Brokers, 添加限流配置写入到zk节点/config/broker/brokerId中
for (id <- brokers)
//获取broker的配置 /config/broker/brokerId
val configs = adminZkClient.fetchEntityConfig(ConfigType.Broker, id.toString)
if (throttle.interBrokerLimit >= 0)
configs.put(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.interBrokerLimit.toString)
configs.put(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.interBrokerLimit.toString)
if (throttle.replicaAlterLogDirsLimit >= 0)
configs.put(DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp, throttle.replicaAlterLogDirsLimit.toString)
adminZkClient.changeBrokerConfig(Seq(id), configs)
/config/brokers/brokerId
节点配置是Broker端的动态配置,不需要重启Broker实时生效;
- 如果传入了参数
--throttle:
则从zk节点/config/brokers/BrokerId
节点获取Broker们的配置信息,然后再加上以下两个配置重新写入到节点/config/brokers/BrokerId
中
leader.replication.throttled.rate
控制leader副本端处理FETCH请求的速率
follower.replication.throttled.rate
控制follower副本发送FETCH请求的速率 - 如果传入了参数
--replica-alter-log-dirs-throttle:
则将如下配置也写入节点中;
replica.alter.log.dirs.io.max.bytes.per.second:
broker内部目录之间迁移数据流量限制功能,限制数据拷贝从一个目录到另外一个目录带宽上限
例如写入之后的数据
"version":1,"config":"leader.replication.throttled.rate":"1","follower.replication.throttled.rate":"1"
注意: 这里写入的限流配置,是写入所有与之相关的Broker的限流配置;
2.2.2 当前未有执行任务,开始执行副本重分配任务
ReassignPartitionsCommand.reassignPartitions
def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean =
//写入一些限流数据
maybeThrottle(throttle)
try
//验证分区是否存在
val validPartitions = proposedPartitionAssignment.groupBy(_._1.topic())
.flatMap case (topic, topicPartitionReplicas) =>
validatePartition(zkClient, topic, topicPartitionReplicas)
if (validPartitions.isEmpty) false
else
if (proposedReplicaAssignment.nonEmpty && adminClientOpt.isEmpty)
throw new AdminCommandFailedException("bootstrap-server needs to be provided in order to reassign replica to the specified log directory")
val startTimeMs = System.currentTimeMillis()
// Send AlterReplicaLogDirsRequest to allow broker to create replica in the right log dir later if the replica has not been created yet.
if (proposedReplicaAssignment.nonEmpty)
alterReplicaLogDirsIgnoreReplicaNotAvailable(proposedReplicaAssignment, adminClientOpt.get, timeoutMs)
// Create reassignment znode so that controller will send LeaderAndIsrRequest to create replica in the broker
zkClient.createPartitionReassignment(validPartitions.map(case (key, value) => (new TopicPartition(key.topic, key.partition), value)).toMap)
// Send AlterReplicaLogDirsRequest again to make sure broker will start to move replica to the specified log directory.
// It may take some time for controller to create replica in the broker. Retry if the replica has not been created.
var remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis()
val replicasAssignedToFutureDir = mutable.Set.empty[TopicPartitionReplica]
while (remainingTimeMs > 0 && replicasAssignedToFutureDir.size < proposedReplicaAssignment.size)
replicasAssignedToFutureDir ++= alterReplicaLogDirsIgnoreReplicaNotAvailable(
proposedReplicaAssignment.filter case (replica, _) => !replicasAssignedToFutureDir.contains(replica) ,
adminClientOpt.get, remainingTimeMs)
Thread.sleep(100)
remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis()
replicasAssignedToFutureDir.size == proposedReplicaAssignment.size
catch
case _: NodeExistsException =>
val partitionsBeingReassigned = zkClient.getPartitionReassignment()
throw new AdminCommandFailedException("Partition reassignment currently in " +
"progress for %s. Aborting operation".format(partitionsBeingReassigned))
-
maybeThrottle(throttle)
设置副本移动时候的限流配置,这个方法只用于任务初始化的时候private def maybeThrottle(throttle: Throttle): Unit = if (throttle.interBrokerLimit >= 0) assignThrottledReplicas(existingAssignment(), proposedPartitionAssignment, adminZkClient) maybeLimit(throttle) if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0) throttle.postUpdateAction() if (throttle.interBrokerLimit >= 0) println(s"The inter-broker throttle limit was set to $throttle.interBrokerLimit B/s") if (throttle.replicaAlterLogDirsLimit >= 0) println(s"The replica-alter-dir throttle limit was set to $throttle.replicaAlterLogDirsLimit B/s")
1.1 将一些topic的限流配置写入到节点
/config/topics/topicName
中
将计算得到的leader、follower 值写入到/config/topics/topicName
中
leader: 找到 TopicPartition中有新增的副本的 那个分区;数据= 分区号:副本号,分区号:副本号
follower: 遍历 预期 TopicPartition,副本= 预期副本-现有副本;数据= 分区号:副本号,分区号:副本号
leader.replication.throttled.replicas
: leader
follower.replication.throttled.replicas
: follower
1.2. 执行 《2.2.1 已有任务,尝试限流》流程 -
从zk中获取
/broker/topics/topicName
数据来验证给定的分区是否存在,如果分区不存在则忽略此分区的配置,继续流程 -
如果Json文件中存在指定Log Dir的情况,则发送
AlterReplicaLogDirsRequest
以允许代理稍后在正确的日志目录中创建副本。
比如:log_dirs
指定了文件存放目录
"version":1,"partitions":["topic":"Topic1","partition":2,"replicas":[1],"log_dirs":["/Users/shirenchuang/work/IdeaPj/didi_source/kafka/k0"]]
那么 AlterReplicaLogDirsRequest
请求就会先去创建对应的副本。具体的 跨目录数据迁移请看跨目录数据迁移
- 将重分配的数据写入到zk的节点
/admin/reassign_partitions
中;数据内容如:"version":1,"partitions":["topic":"test_create_topic1","partition":0,"replicas":[0,1,2,3],"topic":"test_create_topic1","partition":1,"replicas":[1,2,0,3],"topic":"test_create_topic1","partition":2,"replicas":[2,1,0,3]]
- 再次发送
AlterReplicaLogDirsRequest
以确保代理将开始将副本移动到指定的日志目录。控制器在代理中创建副本可能需要一些时间。如果尚未创建副本,请重试。- 像Broker发送
alterReplicaLogDirs
请求
- 像Broker发送
2.2.3 Controller监听/admin/reassign_partitions
节点变化
KafkaController.processZkPartitionReassignment
private def processZkPartitionReassignment(): Set[TopicPartition] =
// We need to register the watcher if the path doesn't exist in order to detect future
// reassignments and we get the `path exists` check for free
if (isActive && zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler))
val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError]
val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment]
zkClient.getPartitionReassignment().foreach case (tp, targetReplicas) =>
maybeBuildReassignment(tp, Some(targetReplicas)) match
case Some(context) => partitionsToReassign.put(tp, context)
case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS))
reassignmentResults ++= maybeTriggerPartitionReassignment(partitionsToReassign)
val (partitionsReassigned, partitionsFailed) = reassignmentResults.partition(_._2.error == Errors.NONE)
if (partitionsFailed.nonEmpty)
warn(s"Failed reassignment through zk with the following errors: $partitionsFailed")
maybeRemoveFromZkReassignment((tp, _) => partitionsFailed.contains(tp))
partitionsReassigned.keySet
else
Set.empty
- 判断是否是Controller角色并且是否存在节点
/admin/reassign_partitions
maybeTriggerPartitionReassignment
重分配,如果topic已经被标记为删除了,则此topic流程终止;maybeRemoveFromZkReassignment
将执行失败的一些分区信息从zk中删除;(覆盖信息)
onPartitionReassignment
KafkaController.onPartitionReassignment
private def onPartitionReassignment(topicPartition: TopicPartition, reassignment: ReplicaAssignment): Unit =
// 暂停一些正在删除的Topic操作
topicDeletionManager.markTopicIneligibleForDeletion(Set(topicPartition.topic), reason = "topic reassignment in progress")
//更新当前的分配
updateCurrentReassignment(topicPartition, reassignment)
val addingReplicas = reassignment.addingReplicas
val removingReplicas = reassignment.removingReplicas
if (!isReassignmentComplete(topicPartition, reassignment))
// A1. Send LeaderAndIsr request to every replica in ORS + TRS (with the new RS, AR and RR).
updateLeaderEpochAndSendRequest(topicPartition, reassignment)
// A2. replicas in AR -> NewReplica
startNewReplicasForReassignedPartition(topicPartition, addingReplicas)
else
// B1. replicas in AR -> OnlineReplica
replicaStateMachine.handleStateChanges(addingReplicas.map(PartitionAndReplica(topicPartition, _)), OnlineReplica)
// B2. Set RS = TRS, AR = [], RR = [] in memory.
val completedReassignment = ReplicaAssignment(reassignment.targetReplicas)
controllerContext.updatePartitionFullReplicaAssignment(topicPartition, completedReassignment)
// B3. Send LeaderAndIsr request with a potential new leader (if current leader not in TRS) and
// a new RS (using TRS) and same isr to every broker in ORS + TRS or TRS
moveReassignedPartitionLeaderIfRequired(topicPartition, completedReassignment)
// B4. replicas in RR -> Offline (force those replicas out of isr)
// B5. replicas in RR -> NonExistentReplica (force those replicas to be deleted)
stopRemovedReplicasOfReassignedPartition(topicPartition, removingReplicas)
// B6. Update ZK with RS = TRS, AR = [], RR = [].
updateReplicaAssignmentForPartition(topicPartition, completedReassignment)
// B7. Remove the ISR reassign listener and maybe update the /admin/reassign_partitions path in ZK to remove this partition from it.
removePartitionFromReassigningPartitions(topicPartition, completedReassignment)
// B8. After electing a leader in B3, the replicas and isr information changes, so resend the update metadata request to every broker
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition))
// signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic))
- 暂停一些正在删除的Topic操作
- 更新 Zk节点
brokers/topics/topicName
,和内存中的当前分配状态。如果重新分配已经在进行中,那么新的重新分配将取代它并且一些副本将被关闭。
2.1 更新zk中的topic节点信息brokers/topics/topicName
,这里会标记AR哪些副本是新增的,RR哪些副本是要删除的;例如:
2.2 更新当前内存
2.3 如果重新分配已经在进行中,那么一些当前新增加的副本有可能被立即删除,在这种情况下,我们需要停止副本。
2.4 注册一个监听节点/brokers/topics/topicName/partitions/分区号/state
变更的处理器PartitionReassignmentIsrChangeHandler
- 如果该分区的重新分配还没有完成(根据
/brokers/topics/topicName/partitions/分区号/state
里面的isr来判断是否已经包含了新增的BrokerId了);则
以下几个名称说明:
ORS
: OriginReplicas 原先的副本
TRS
: targetReplicas 将要变更成的目标副本
AR
: adding_replicas 正在添加的副本
RR
:removing_replicas 正在移除的副本
3.1 向 ORS + TRS 中的每个副本发送LeaderAndIsr
请求(带有新的 RS、AR 和 RR)。
3.2 给新增加的AR副本 进行状态变更成NewReplica
; 这个过程有发送LeaderAndIsrRequest
详细请看【kafka源码】Controller中的状态机
2.2.4 Controller监听节点brokers/topics/topicName
变化,检查是否有新增分区
这一个流程可以不必在意,因为在这里没有做任何事情;
上面的 2.2.3 的第2小段中不是有将新增的和删掉的副本写入到了 zk中吗
例如:"version":2,"partitions":"2":[0,1],"1":[0,1],"0":[0,1],"adding_replicas":"2":[1],"1":[1],"0":[1],"removing_replicas":
Controller监听到这个节点之后,执行方法processPartitionModifications
KafkaController.processPartitionModifications
private def processPartitionModifications(topic: String): Unit =
def restorePartitionReplicaAssignment(以上是关于kafka源码ReassignPartitionsCommand源码原理分析(附配套教学视频)的主要内容,如果未能解决你的问题,请参考以下文章