Kafka 之Broker工作流程节点服役和退役

Posted 爱上口袋的天空

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 之Broker工作流程节点服役和退役相关的知识,希望对你有一定的参考价值。

一、Kafka Broker 工作流程

1、Zookeeper 存储的 Kafka 信息

        (1)启动 Zookeeper 客户端。

cd /home/dhapp/software/apache-zookeeper-3.6.3-bin
[dhapp@conch01 apache-zookeeper-3.6.3-bin]$ bin/zkCli.sh

        2)通过 ls 命令可以查看 kafka 相关信息。

 

2、Kafka Broker 总体工作流程

 

1)模拟 Kafka 上下线,Zookeeper 中数据变化

(1)查看/kafka/brokers/ids 路径上的节点。 

[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 3]

2)查看/kafka/controller 路径上的数据。

[zk: localhost:2181(CONNECTED) 3] get /kafka/controller
"version":1,"brokerid":2,"timestamp":"1649422082642"
[zk: localhost:2181(CONNECTED) 4]

(3)查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。

[zk: localhost:2181(CONNECTED) 4] get /kafka/brokers/topics/first/partitions/0/state
"controller_epoch":3,"leader":2,"version":1,"leader_epoch":2,"isr":[2,1,0]
[zk: localhost:2181(CONNECTED) 5]

(4)停止 conch02 上的 kafka。

[dhapp@conch02 kafka_2.12-3.0.0]$ bin/kafka-server-stop.sh

(5)再次查看/kafka/brokers/ids 路径上的节点。

[zk: localhost:2181(CONNECTED) 5] ls /kafka/brokers/ids
[0, 2]
[zk: localhost:2181(CONNECTED) 6]

6)再次查看/kafka/controller 路径上的数据。

[zk: localhost:2181(CONNECTED) 6] get /kafka/controller
"version":1,"brokerid":2,"timestamp":"1649422082642"
[zk: localhost:2181(CONNECTED) 7]

(7)再次查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。

[zk: localhost:2181(CONNECTED) 7] get /kafka/brokers/topics/first/partitions/0/state
"controller_epoch":3,"leader":2,"version":1,"leader_epoch":3,"isr":[2,0]
[zk: localhost:2181(CONNECTED) 8]

(8)启动 conch2上的 kafka。

[dhapp@conch02 kafka_2.12-3.0.0]$ bin/kafka-server-start.sh -daemon config/server.properties

(9)再次观察(1)、(2)、(3)步骤中的内容。

3、Broker 重要参数


二、节点服役和退役

1、 服役新节点

1)新节点准备
(1)这里准备一台新的节点conch04,ip地址为192.168.56.23,同时在这台机器上安装
         一个kafka服务,步骤和之前一样,现在我们启动这台服务

[dhapp@conch04 kafka_2.12-3.0.0]$ bin/kafka-server-start.sh -daemon config/server.properties

(2)在conch01服务上执行负载均衡操作

        首先查询first主题的详情

[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server conch01:9092 --topic first --describe
Topic: first    TopicId: YzFaCPvpSQm1XxnNIfJjEA PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: first    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: first    Partition: 1    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
        Topic: first    Partition: 2    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
[dhapp@conch01 kafka_2.12-3.0.0]$

(1)创建一个要均衡的主题

[dhapp@conch01 kafka_2.12-3.0.0]$ vim topics-to-move.json
[dhapp@conch01 kafka_2.12-3.0.0]$ cat topics-to-move.json

 "topics": [
 "topic": "first"
 ],
 "version": 1

[dhapp@conch01 kafka_2.12-3.0.0]$

注意:上面我们就拿first主题作为一个均衡的对象

(2)生成一个负载均衡的计划。(只是计划而已)

[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
Current partition replica assignment
"version":1,"partitions":["topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[1,2,0],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]]

Proposed partition reassignment configuration
"version":1,"partitions":["topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]]
[dhapp@conch01 kafka_2.12-3.0.0]$

(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。

[dhapp@conch01 kafka_2.12-3.0.0]$ vim increase-replication-factor.json

输入如下内容(刚生成的计划):

"version":1,"partitions":["topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]]

(4)执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --reassignment-json-file increase-replication-factor.json --execute

(5)验证副本存储计划。

[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.

Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first
[dhapp@conch01 kafka_2.12-3.0.0]$

 再次查询first主题的详情

 


2、 退役旧节点

1)执行负载均衡操作
先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
(1)创建一个要均衡的主题。

[dhapp@conch01 kafka_2.12-3.0.0]$ vim topics-to-move.json
[dhapp@conch01 kafka_2.12-3.0.0]$ cat topics-to-move.json

 "topics": [
 "topic": "first"
 ],
 "version": 1

[dhapp@conch01 kafka_2.12-3.0.0]$

(2)创建执行计划。

[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate
Current partition replica assignment
"version":1,"partitions":["topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]]

Proposed partition reassignment configuration
"version":1,"partitions":["topic":"first","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]]
[dhapp@conch01 kafka_2.12-3.0.0]$

(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)。

[dhapp@conch01 kafka_2.12-3.0.0]$ vim increase-replication-factor.json
[dhapp@conch01 kafka_2.12-3.0.0]$ cat increase-replication-factor.json
"version":1,"partitions":["topic":"first","partition":0,"replicas":[2,1,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]]
[dhapp@conch01 kafka_2.12-3.0.0]$

(4)执行副本存储计划。

[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

"version":1,"partitions":["topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"],"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"],"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]]

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for first-0,first-1,first-2
[dhapp@conch01 kafka_2.12-3.0.0]$

(5)验证副本存储计划。

[dhapp@conch01 kafka_2.12-3.0.0]$ bin/kafka-reassign-partitions.sh --bootstrap-server conch01:9092 --reassignment-json-file increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.

Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first
[dhapp@conch01 kafka_2.12-3.0.0]$

2)执行停止命令
在 conch04上执行停止命令即可。

[dhapp@conch04 kafka_2.12-3.0.0]$ bin/kafka-server-stop.sh

以上是关于Kafka 之Broker工作流程节点服役和退役的主要内容,如果未能解决你的问题,请参考以下文章

kafka学习记录—Broker(服役退役节点,kafka副本,文件存储)

Kafka消息队列大数据实战教程-第五篇(Broker工作原理以及节点服役)

Kafka消息队列大数据实战教程-第五篇(Broker工作原理以及节点服役)

Hbase节点的管理|服役和退役节点

HDFS服役新数据节点和退役旧节点步骤

大数据之kafka Broker的工作流程