关于删除CDH 中 Kafka 的Topic 实战
Posted 全栈思想小栈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于删除CDH 中 Kafka 的Topic 实战相关的知识,希望对你有一定的参考价值。
Topic 删不掉?为什么?怎么玩?
基于项目需求,需要动态的创建topic,还要动态的删除topic。你有可能会说这不很easy嘛,加个配置不就行了吗?的却,开始我也这么认为的,然后就悲剧了。
我们项目使用的Zookeeper和Kafka 是CDH版本的,有优势也有劣势,优势嘛很明显,图形化的页面管理各个集群的服务,
劣势也很痛苦,往往很多个配置要找半天。
好了,回归正题,自动创建删除的配置如下图,
允许Admin删除,
描述很到位,只有勾选了,才让你用 admin 工具删除。
关于admin 工具,你可以使用 kafka java 的Api ,也可以使用命令的形式。
到这里,你以为全部搞定了,但是接下来。。。
Java 异常捕获到TopicAlreadyMarkedForDeletionException,根据字面意思,topic 已经被标记为删除了。同样,用命令的形式
查找了相关的资料,整理的删除的流程图
1. Kafka 控制器在启动的时候会注册对于zookeeper 节点/admin/delete_topics的子节点变更的监听器,delete命令实际上就是要在该节点下创建一个临时节点,名字是待删除topic名,标记该topic是待删除的
2. Kafka 控制器在启动时创建一个单独的 线程,执行topic 删除的操作(由DeleteTopicsThread类实现)
3. 线程启动时查看是否有需要进行删除的topic
4. 一旦发现待删除topic集合是空,topic删除线程会被挂起
5. 这时,我们执行delete操作,删除topic: topic,delete命令在/admin/delete_topics下创建子节点topic
6. 监听器捕获到该变更,立刻出发删除逻辑:
a.查询topic是否存在,不存在的话直接删除/admin/delete_topics/ topic节点——随便删除一个不存在的topic,删除命令也只是创建/admin/delete_topics/[topicName]的节点,它不负责做存在性校验
b.查询一下topic是不是当前正在执行Preferred副本选举或分区重分配,如果是的话,肯定是不适合进行删除掉的。Controller 本地会缓存当前无法进行删除的topic集合,待分区重分配完成或preferred副本选举后单独处理该集合中的topic
c.如果两者都不是的话说明现在可以进行删除操作,那么就恢复挂起的删除线程执行删除操作
删除线程执行删除操作的真正逻辑是:
1. 它首先会给当前所有的broker发送更新元数据信息的请求,告诉broker说这个topic准备要删除了, 你们可以把它的信息从缓存中删除掉
2. 开始删除这个topic的所有分区
a. 给所有broker发请求,告诉它们这些分区要被删除了。Broker收到后,便不再接受任何在这些分区上的请求了
b. 把每个分区下的所有副本都设置为offlineReplica状态,这样ISR就不断缩小,当leader副本最后也被置于OfflineReplica状态时leader信息将被更新为-1
c. 将所有副本置于ReplicaDeletionStarted状态
d. 关闭所有空闲的Fetcher线程
3. 删除zookeeper下/brokers/topics/ topic节点
4. 删除zookeeper下/config/topics/ topic节点
5. 删除zookeeper下/admin/delete_topics/ topic节点
6. 更新各种缓存,把test-topic相关信息移除出去
好了,啰里啰唆这么多,删除的逻辑基本清楚了。被标记为删除的,一定时间内时被回收的,但是,如果它不被回收呢?
比如这个:
这个时候,我们可以基于上述的原理,手动的删除/brokers/topics/topic,删除注册在ZK上的信息,那么这个topic 也就被变相的删除了。
吐槽一下,在CDH 上半天也没找到 brokers的位置,还好有Zookeeper的插件
点击一下,在broker上的topic 就被彻底的删除了。
以上是关于关于删除CDH 中 Kafka 的Topic 实战的主要内容,如果未能解决你的问题,请参考以下文章
Flink 实战系列Flink 消费多个 Topic 数据利用侧流输出完成分流功能