Kafka 源码解析之 Topic 的新建/扩容/删除
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 源码解析之 Topic 的新建/扩容/删除相关的知识,希望对你有一定的参考价值。
参考技术A [TOC]
本篇接着讲述 Controller 的功能方面的内容,在 Kafka 中,一个 Topic 的新建、扩容或者删除都是由 Controller 来操作的,本篇文章也是主要聚焦在 Topic 的操作处理上(新建、扩容、删除),实际上 Topic 的创建在 Kafka 源码解析之 topic 创建过程(三) 中已经讲述过了,本篇与前面不同的是,本篇主要是从 Controller 角度来讲述,而且是把新建、扩容、删除这三个 Topic 级别的操作放在一起做一个总结。
这里把 Topic 新建与扩容放在一起讲解,主要是因为无论 Topic 是新建还是扩容,在 Kafka 内部其实都是 Partition 的新建,底层的实现机制是一样的,Topic 的新建与扩容的整体流程如下图所示:
Topic 新建与扩容触发条件的不同如下所示:
下面开始详细讲述这两种情况。
Topic 扩容
Kafka 提供了 Topic 扩容工具,假设一个 Topic(topic_test)只有一个 partition,这时候我们想把它扩容到两个 Partition,可以通过下面两个命令来实现:
这两种方法的区别是:第二种方法直接指定了要扩容的 Partition 2 的副本需要分配到哪台机器上,这样的话我们可以精确控制到哪些 Topic 放下哪些机器上。
无论是使用哪种方案,上面两条命令产生的结果只有一个,将 Topic 各个 Partition 的副本写入到 ZK 对应的节点上,这样的话 /brokers/topics/topic_test 节点的内容就会发生变化,PartitionModificationsListener 监听器就会被触发 ,该监听器的处理流程如下:
其 doHandleDataChange() 方法的处理流程如下:
下面我们看下 onNewPartitionCreation() 方法,其实现如下:
关于 Partition 的新建,总共分了以下四步:
经过上面几个阶段,一个 Partition 算是真正创建出来,可以正常进行读写工作了,当然上面只是讲述了 Controller 端做的内容,Partition 副本所在节点对 LeaderAndIsr 请求会做更多的工作,这部分会在后面关于 LeaderAndIsr 请求的处理中只能够详细讲述。
Topic 新建
Kafka 也提供了 Topic 创建的工具,假设我们要创建一个名叫 topic_test,Partition 数为2的 Topic,创建的命令如下:
跟前面的类似,方法二是可以精确控制新建 Topic 每个 Partition 副本所在位置,Topic 创建的本质上是在 /brokers/topics 下新建一个节点信息,并将 Topic 的分区详情写入进去,当 /brokers/topics 有了新增的 Topic 节点后,会触发 TopicChangeListener 监听器,其实现如下:
只要 /brokers/topics 下子节点信息有变化(topic 新增或者删除),TopicChangeListener 都会被触发,其 doHandleChildChange() 方法的处理流程如下:
接着看下 onNewTopicCreation() 方法实现
上述方法主要做了两件事:
onNewPartitionCreation() 的实现在前面 Topic 扩容部分已经讲述过,这里不再重复,最好参考前面流程图来梳理 Topic 扩容和新建的整个过程。
Kafka Topic 删除这部分的逻辑是一个单独线程去做的,这个线程是在 Controller 启动时初始化和启动的。
TopicDeletionManager 初始化
TopicDeletionManager 启动实现如下所示:
TopicDeletionManager 启动时只是初始化了一个 DeleteTopicsThread 线程,并启动该线程。TopicDeletionManager 这个类从名字上去看,它是 Topic 删除的管理器,它是如何实现 Topic 删除管理呢,这里先看下该类的几个重要的成员变量:
前面一小节,简单介绍了 TopicDeletionManager、DeleteTopicsThread 的启动以及它们之间的关系,这里我们看下一个 Topic 被设置删除后,其处理的整理流程,简单做了一个小图,如下所示:
这里先简单讲述上面的流程,当一个 Topic 设置为删除后:
先看下 DeleteTopicsListener 的实现,如下:
其 doHandleChildChange() 的实现逻辑如下:
接下来,看下 Topic 删除线程 DeleteTopicsThread 的实现,如下所示:
doWork() 方法处理逻辑如下:
先看下 onTopicDeletion() 方法,这是 Topic 最开始删除时的实现,如下所示:
Topic 的删除的真正实现方法还是在 startReplicaDeletion() 方法中,Topic 删除时,会先调用 onPartitionDeletion() 方法删除所有的 Partition,然后在 Partition 删除时,执行 startReplicaDeletion() 方法删除该 Partition 的副本,该方法的实现如下:
该方法的执行逻辑如下:
在将副本状态从 OfflineReplica 转移成 ReplicaDeletionStarted 时,会设置一个回调方法 deleteTopicStopReplicaCallback(),该方法会将删除成功的 Replica 设置为 ReplicaDeletionSuccessful 状态,删除失败的 Replica 设置为 ReplicaDeletionIneligible 状态(需要根据 StopReplica 请求处理的过程,看下哪些情况下 Replica 会删除失败,这个会在后面讲解)。
下面看下这个方法 completeDeleteTopic(),当一个 Topic 的所有 Replica 都删除成功时,即其状态都在 ReplicaDeletionSuccessful 时,会调用这个方法,如下所示:
当一个 Topic 所有副本都删除后,会进行如下处理:
至此,一个 Topic 算是真正删除完成。
以上是关于Kafka 源码解析之 Topic 的新建/扩容/删除的主要内容,如果未能解决你的问题,请参考以下文章