Kafka 常用操作

Posted 进击的大杂烩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 常用操作相关的知识,希望对你有一定的参考价值。

一、常用命令

环境:
zookeeper集群:192.168.100.101:2181/192.168.100.102:2181/192.168.100.103:2181
kafka集群:192.168.100.101:9092/192.168.100.102:9092/192.168.100.103:9092
kafka安装目录:export KAFKA_HOME = /data/kafka/kafka_2.11-1.0.0
kafka集群在zookeeper中的注册目录为:/kafka
Kafka支持的基本命令位于${KAFKA_HOME}/bin目录。主要的脚本和作用如下表:

脚本 用途
kafka-server-start.sh kafka 服务启动脚本
kafka-server-stop.sh kafka 服务停止脚本
kafka-topics.sh topic 查看/创建/删除/修改工具
kafka-console-consumer.sh kafka 终端消费者脚本
kafka-console-producer.sh kafka 终端生产者脚本
kafka-consumer-perf-test.sh kafka 消费者压测脚本
kafka-producer-perf-test.sh kafka 生产者压测脚本
kafka-preferred-replica-election.sh 帮助恢复topic的Leader分配的平衡
kafka-reassign-partitions.sh 调整topic的partition和replication
kafka-log-dirs.sh 查看topic在broker中的日志位置和大小及partition

二、常用操作

1. topic 查看/创建/删除/修改 工具

  • 查看

# 查看当前kafka集群中的topic列表
cd ${KAFKA_HOME}
./bin/kafka-topics.sh --zookeeper 192.168.100.101:2181/kafka --list
__consumer_offsets
xsjop01
# 详细查看某个topic的信息
./bin/kafka-topics.sh --zookeeper 192.168.100.101:2181/kafka --describe --topic xsjop01
Topic:xsjop01   PartitionCount:4        ReplicationFactor:2     Configs:
       Topic: xsjop01  Partition: 0    Leader: 3       Replicas: 3,2   Isr: 2,3
       Topic: xsjop01  Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
       Topic: xsjop01  Partition: 2    Leader: 2       Replicas: 2,3   Isr: 2,3
       Topic: xsjop01  Partition: 3    Leader: 3       Replicas: 3,2   Isr: 2,3
  • 创建topic

./bin/kafka-topics.sh --zookeeper 192.168.100.102:2181/kafka --create --topic test01 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 3 --replication-factor 2
Created topic "test01".
# 参数解析
# --create: 指定创建topic动作
# --topic:指定新建topic的名字
# --zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一致
# --config:指定当前topic上有效的参数值,参数列表参考文档为:http://kafka.apache.org/documentation/#topicconfigs
# --partitions:指定当前创建的kafka分区数量,默认该值与server.properties文件中的配置项{num.partitions}一致
# --replication-factor:指定每个分区的复制因子个数,默认该值与server.properties文件中的配置项{default.replication.factor}一致
# 创建完成后查看刚刚创建的topic test01
./bin/kafka-topics.sh --zookeeper 192.168.100.101:2181/kafka --describe --topic test01
Topic:test01    PartitionCount:3        ReplicationFactor:2     Configs:max.message.bytes=12800000,flush.messages=1
       Topic: test01   Partition: 0    Leader: 3       Replicas: 3,2   Isr: 3,2
       Topic: test01   Partition: 1    Leader: 1       Replicas: 1,3   Isr: 1,3
       Topic: test01   Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
  • 修改topic,其中修改partitions数量时只能增加不允许减少

bin/kafka-topics.sh --zookeeper 192.168.100.101:2181/kafka --alter --topic test01 --config max.message.bytes=128000
bin/kafka-topics.sh --zookeeper 192.168.100.101:2181/kafka --alter --topic test01 --delete-config max.message.bytes
bin/kafka-topics.sh --zookeeper 192.168.100.101:2181/kafka --alter --topic test01 --partitions 10
bin/kafka-topics.sh --zookeeper 192.168.100.101:2181/kafka --alter --topic test01 --partitions 3
  • 删除topic

bin/kafka-topics.sh --delete --topic test01 --zookeeper 192.168.100.101:2181/kafka
Topic test01 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# 是否能够通过命令行删除topic与server.properties文件中的配置项{delete.topic.enable}有关,如果设置为false,则需要通过如下方式删除topic
# 通过delete命令删除后,手动将本地磁盘以及zk上的相关topic的信息删除即可
# 删除zk信息
cd /data/kafka/zookeeper-3.4.11/
./bin/zkCli.sh
# 查看所有注册的topics
ls /kafka/brokers/topics
[xsjop01, test01, __consumer_offsets]
# 删除test01
rmr /kafka/brokers/topics/test01
quit
# 删除各个broker上log目录,topic删除完毕。

2. 恢复topic的Leader分配的平衡

有了Replication机制后,每个Partition可能有多个备份。某个Partition的Replica列表叫作AR(Assigned Replicas),AR中的第一个Replica即为“Preferred Replica”。创建一个新的Topic或者给已有Topic增加Partition时,Kafka保证Preferred Replica被均匀分布到集群中的所有Broker上。理想情况下,Preferred Replica会被选为Leader。以上两点保证了所有Partition的Leader被均匀分布到了集群当中,这一点非常重要,因为所有的读写操作都由Leader完成,若Leader分布过于集中,会造成集群负载不均衡。但是,随着集群的运行,该平衡可能会因为Broker的宕机而被打破,该工具就是用来帮助恢复Leader分配的平衡。
Kafka还提供了自动平衡Leader分配的功能,该功能可通过将server.properties文件中配置项{auto.leader.rebalance.enable}设置为true开启,它将周期性检查Leader分配是否平衡,若不平衡度超过一定阈值则自动由Controller尝试将各Partition的Leader设置为其Preferred Replica。检查周期由配置项{leader.imbalance.check.interval.seconds}指定,不平衡度阈值由配置项{leader.imbalance.per.broker.percentage}指定。

  • 手动平衡命令

./bin/kafka-preferred-replica-election.sh --zookeeper 192.168.100.101:2181/kafka

3. 调整topic的partition和replication

该工具的设计目标与Preferred Replica Leader Election Tool有些类似,都旨在促进Kafka集群的负载均衡。不同的是,Preferred Replica Leader Election只能在Partition的AR范围内调整其Leader,使Leader分布均匀,而该工具还可以调整Partition的AR。
   Follower需要从Leader Fetch数据以保持与Leader同步,所以仅仅保持Leader分布的平衡对整个集群的负载均衡来说是不够的。另外,生产环境下,随着负载的增大,可能需要给Kafka集群扩容。向Kafka集群中增加Broker非常简单方便,但是对于已有的Topic,并不会自动将其Partition迁移到新加入的Broker上,此时可用该工具达到此目的。某些场景下,实际负载可能远小于最初预期负载,此时可用该工具将分布在整个集群上的Partition重装分配到某些机器上,然后可以停止不需要的Broker从而实现节约资源的目的。
需要说明的是,该工具不仅可以调整Partition的AR位置,还可调整其AR数量,即改变该Topic的replication factor。

  • 用法

该工具有三种使用模式
  (1)、generate模式,给定需要重新分配的Topic,自动生成reassign plan(并不执行)
  (2)、execute模式,根据指定的reassign plan重新分配Partition
  (3)、verify模式,验证重新分配Partition是否成功

  • 试验
    创建一个测试topic:test01,3个partition,每个partion 3个replication。

./bin/kafka-topics.sh --zookeeper 192.168.100.102:2181/kafka --create --topic test01 --partitions 3 --replication-factor 3
./bin/kafka-topics.sh --zookeeper 192.168.100.101:2181/kafka --describe --topic test01
Topic:test01    PartitionCount:3        ReplicationFactor:3     Configs:
       Topic: test01   Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
       Topic: test01   Partition: 1    Leader: 3       Replicas: 3,1,2 Isr: 1,2,3
       Topic: test01   Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

现在需要将topic test01 的partions 全部调整到broker 2,3上。操作过程如下:

1.使用generate模式,生成reassign plan
指定需要重新分配的Topic ({"topics":[{"topic":"test01"}],"version":1}),并存入/tmp/topics-to-move.json文件中,然后执行如下命令

echo '{"topics":[{"topic":"test01"}],"version":1}' > /tmp/topic-to-move.json
./bin/kafka-reassign-partitions.sh --zookeeper 192.168.100.101:2181/kafka --topics-to-move-json-file /tmp/topics-to-move.json --broker-list "2,3" --generate

会出现如下错误
Partitions reassignment failed due to Replication factor: 3 larger than available brokers: 2.
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.

2.因为kakfa规定replication 的数量不能大于broker的数量,使用工具调整 replication 的数量。并同时将partition 调整到broker 2,3
创建/tmp/replication.json文件内容如下

{
   "version": 1,
   "partitions": [
       {
           "topic": "test01",
           "partition": 0,
           "replicas": [
               3,
               2
           ]
       },
       {
           "topic": "test01",
           "partition": 1,
           "replicas": [
               2,
               3
           ]
       },
       {
           "topic": "test01",
           "partition": 2,
           "replicas": [
               2,
               3
           ]
       }
   ]
}

运行如下命令进行调整

./bin/kafka-reassign-partitions.sh --zookeeper 192.168.100.101:2181/kafka --reassignment-json-file /tmp/replication.json --execute

如果正常运行会有如下返回
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test01","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"test01","partition":0,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"test01","partition":2,"replicas":[1,2,3],"log_dirs":["any","any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

验证

./bin/kafka-topics.sh --zookeeper 192.168.100.101:2181/kafka --describe --topic test01
Topic:test01    PartitionCount:3        ReplicationFactor:2     Configs:
       Topic: test01   Partition: 0    Leader: 2       Replicas: 3,2   Isr: 2,3
       Topic: test01   Partition: 1    Leader: 3       Replicas: 2,3   Isr: 2,3
       Topic: test01   Partition: 2    Leader: 2       Replicas: 2,3   Isr: 2,3

可见通过上面的调整,不但调整replication的数量,同时也将partition调整到了broker 2,3 上。此时partition0 和partition2 leader 都在broker2 上,kafka 写入和读出都是通过 partition 的leader 来处理的,此时负载并不均衡,因为broker1还空闲,调整partition0 leader到broker 1.

3.使用generate模式,生成reassign plan
指定需要重新分配的Topic ({"topics":[{"topic":"test01"}],"version":1}),并存入/tmp/topics-to-move.json文件中,然后执行如下命令

./bin/kafka-reassign-partitions.sh --zookeeper 192.168.100.101:2181/kafka --topics-to-move-json-file /tmp/topics-to-move.json --broker-list "1,2,3" --generate

运行正常会有如下输出
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test01","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"test01","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"test01","partition":2,"replicas":[2,3],"log_dirs":["any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test01","partition":1,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"test01","partition":0,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"test01","partition":2,"replicas":[1,3],"log_dirs":["any","any"]}]}

4.使用execute模式,执行reassign plan
将上一步生成的reassignment plan存入/tmp/reassign-plan.json文件中,并执行

./bin/kafka-reassign-partitions.sh --zookeeper 192.168.100.101:2181/kafka --reassignment-json-file /tmp/reassign-plan.json --execute

运行成功后有如下输出
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test01","partition":1,"replicas":[2,3],"log_dirs":["any","any"]},{"topic":"test01","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"test01","partition":2,"replicas":[2,3],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

5.使用verify模式,验证reassign是否完成

./bin/kafka-reassign-partitions.sh --zookeeper 192.168.100.101:2181/kafka --reassignment-json-file /tmp/reassign-plan.json --verify

运行成功后会有如下输出
Status of partition reassignment:
Reassignment of partition test01-1 completed successfully
Reassignment of partition test01-0 completed successfully
Reassignment of partition test01-2 completed successfully

6.通过topic工具验证结果

./bin/kafka-topics.sh --zookeeper 192.168.100.101:2181/kafka --describe --topic test01
Topic:test01    PartitionCount:3        ReplicationFactor:2     Configs:
       Topic: test01   Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
       Topic: test01   Partition: 1    Leader: 3       Replicas: 3,2   Isr: 2,3
       Topic: test01   Partition: 2    Leader: 1       Replicas: 1,3   Isr: 3,1


以上是关于Kafka 常用操作的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 常用操作

提效小技巧——记录那些不常用的代码片段

Kafka常用操作

Kafka常用操作

Kafka常用操作

Kafka 常用命令行操作