Kafka 之Broker工作流程节点服役和退役
Posted 爱上口袋的天空
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 之Broker工作流程节点服役和退役相关的知识,希望对你有一定的参考价值。
一、Kafka Broker 工作流程
1、Zookeeper 存储的 Kafka 信息
(1)启动 Zookeeper 客户端。
cd /home/dhapp/software/apache-zookeeper-3.6.3-bin [dhapp@conch01 apache-zookeeper-3.6.3-bin]$ bin/zkCli.sh
2)通过 ls 命令可以查看 kafka 相关信息。
2、Kafka Broker 总体工作流程
1)模拟 Kafka 上下线,Zookeeper 中数据变化
(1)查看/kafka/brokers/ids 路径上的节点。
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids [0, 1, 2] [zk: localhost:2181(CONNECTED) 3]
2)查看/kafka/controller 路径上的数据。
[zk: localhost:2181(CONNECTED) 3] get /kafka/controller "version":1,"brokerid":2,"timestamp":"1649422082642" [zk: localhost:2181(CONNECTED) 4]
(3)查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。
[zk: localhost:2181(CONNECTED) 4] get /kafka/brokers/topics/first/partitions/0/state "controller_epoch":3,"leader":2,"version":1,"leader_epoch":2,"isr":[2,1,0] [zk: localhost:2181(CONNECTED) 5]
(4)停止 conch02 上的 kafka。
[dhapp@conch02 kafka_2.12-3.0.0]$ bin/kafka-server-stop.sh
(5)再次查看/kafka/brokers/ids 路径上的节点。
[zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers/ids [0, 2] [zk: localhost:2181(CONNECTED) 6]
6)再次查看/kafka/controller 路径上的数据。
[zk: localhost:2181(CONNECTED) 6] get /kafka/controller "version":1,"brokerid":2,"timestamp":"1649422082642" [zk: localhost:2181(CONNECTED) 7]
(7)再次查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。
[zk: localhost:2181(CONNECTED) 7] get /kafka/brokers/topics/first/partitions/0/state "controller_epoch":3,"leader":2,"version":1,"leader_epoch":3,"isr":[2,0] [zk: localhost:2181(CONNECTED) 8]
(8)启动 conch2上的 kafka。
[dhapp@conch02 kafka_2.12-3.0.0]$ bin/kafka-server-start.sh -daemon config/server.properties
(9)再次观察(1)、(2)、(3)步骤中的内容。
3、Broker 重要参数
二、节点服役和退役
1、 服役新节点
1)新节点准备
(1)这里准备一台新的节点conch04,ip地址为192.168.56.23,同时在这台机器上安装
一个kafka服务,步骤和之前一样,现在我们启动这台服务[dhapp@conch04 kafka_2.12-3.0.0]$ bin/kafka-server-start.sh -daemon config/server.properties
(2)在conch01服务上执行负载均衡操作
首先查询first主题的详情
[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server conch01:9092 --topic first --describe Topic: first TopicId: YzFaCPvpSQm1XxnNIfJjEA PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: first Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: first Partition: 1 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1 Topic: first Partition: 2 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1 [dhapp@conch01 kafka_2.12-3.0.0]$
(1)创建一个要均衡的主题
[dhapp@conch01 kafka_2.12-3.0.0]$ vim topics-to-move.json [dhapp@conch01 kafka_2.12-3.0.0]$ cat topics-to-move.json "topics": [ "topic": "first" ], "version": 1 [dhapp@conch01 kafka_2.12-3.0.0]$
注意:上面我们就拿first主题作为一个均衡的对象
(2)生成一个负载均衡的计划。(只是计划而已)
[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate Current partition replica assignment "version":1,"partitions":["topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[1,2,0],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]] Proposed partition reassignment configuration "version":1,"partitions":["topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]] [dhapp@conch01 kafka_2.12-3.0.0]$
(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。
[dhapp@conch01 kafka_2.12-3.0.0]$ vim increase-replication-factor.json
输入如下内容(刚生成的计划):
"version":1,"partitions":["topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]]
(4)执行副本存储计划。
bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --reassignment-json-file increase-replication-factor.json --execute
(5)验证副本存储计划。
[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --reassignment-json-file increase-replication-factor.json --verify Status of partition reassignment: Reassignment of partition first-0 is complete. Reassignment of partition first-1 is complete. Reassignment of partition first-2 is complete. Clearing broker-level throttles on brokers 0,1,2,3 Clearing topic-level throttles on topic first [dhapp@conch01 kafka_2.12-3.0.0]$
再次查询first主题的详情
2、 退役旧节点
1)执行负载均衡操作
先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
(1)创建一个要均衡的主题。[dhapp@conch01 kafka_2.12-3.0.0]$ vim topics-to-move.json [dhapp@conch01 kafka_2.12-3.0.0]$ cat topics-to-move.json "topics": [ "topic": "first" ], "version": 1 [dhapp@conch01 kafka_2.12-3.0.0]$
(2)创建执行计划。
[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate Current partition replica assignment "version":1,"partitions":["topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]] Proposed partition reassignment configuration "version":1,"partitions":["topic":"first","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]] [dhapp@conch01 kafka_2.12-3.0.0]$
(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)。
[dhapp@conch01 kafka_2.12-3.0.0]$ vim increase-replication-factor.json [dhapp@conch01 kafka_2.12-3.0.0]$ cat increase-replication-factor.json "version":1,"partitions":["topic":"first","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]] [dhapp@conch01 kafka_2.12-3.0.0]$
(4)执行副本存储计划。
[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --reassignment-json-file increase-replication-factor.json --execute Current partition replica assignment "version":1,"partitions":["topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]] Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for first-0,first-1,first-2 [dhapp@conch01 kafka_2.12-3.0.0]$
(5)验证副本存储计划。
[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --reassignment-json-file increase-replication-factor.json --verify Status of partition reassignment: Reassignment of partition first-0 is complete. Reassignment of partition first-1 is complete. Reassignment of partition first-2 is complete. Clearing broker-level throttles on brokers 0,1,2,3 Clearing topic-level throttles on topic first [dhapp@conch01 kafka_2.12-3.0.0]$
2)执行停止命令
在 conch04上执行停止命令即可。[dhapp@conch04 kafka_2.12-3.0.0]$ bin/kafka-server-stop.sh
以上是关于Kafka 之Broker工作流程节点服役和退役的主要内容,如果未能解决你的问题,请参考以下文章
kafka学习记录—Broker(服役退役节点,kafka副本,文件存储)
Kafka消息队列大数据实战教程-第五篇(Broker工作原理以及节点服役)