kafka集群的部署及kafka监控工具

Posted zhy1372431588

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka集群的部署及kafka监控工具相关的知识,希望对你有一定的参考价值。

1.kafka介绍

参考

2. kafka学习记录(术语)

参考

3. kafka集群部署(直接部署在了一台机器上)

3.1 安装多节点zookeeper集群

zookeeper下载地址

[root@kafka ~]# ll zookeeper-3.4.9.tar.gz 
-rwxr-xr-x 1 root root 22724574 Jul 21 22:23 zookeeper-3.4.9.tar.gz
[root@kafka ~]# tar zxf zookeeper-3.4.9.tar.gz 
[root@kafka ~]# cd zookeeper-3.4.9/conf/
[root@kafka conf]# ls   ##创建zoo配置文件
configuration.xsl  log4j.properties  zoo_sample.cfg 
[root@kafka conf]# cp zoo_sample.cfg zoo1.cfg
[root@kafka conf]# cp zoo_sample.cfg zoo2.cfg
[root@kafka conf]# cp zoo_sample.cfg zoo3.cfg

##一台机器上配置三个zookeeper,使用不同端口
#1.zookeeper1
[root@kafka conf]# pwd
/root/zookeeper/conf
[root@kafka conf]# vim zoo1.cfg 
[root@kafka conf]# cat zoo1.cfg | grep server
server.1=172.25.13.3:2888:3888
server.2=172.25.13.3:2889:3889
server.3=172.25.13.3:2890:3890
[root@kafka conf]# cat zoo1.cfg | grep clientPort
clientPort=2181
[root@kafka conf]# cat zoo1.cfg | grep dataDir
dataDir=/tmp/zookeeper/zk1

#2.zookeeper2
[root@kafka conf]# vim zoo2.cfg 
[root@kafka conf]# cat zoo2.cfg | grep server 
server.1=172.25.13.3:2888:3888
server.2=172.25.13.3:2889:3889
server.3=172.25.13.3:2890:3890
[root@kafka conf]# cat zoo2.cfg | grep clientPort
clientPort=2182
[root@kafka conf]# cat zoo2.cfg | grep dataDir=
dataDir=/tmp/zookeeper/zk2
#3.zookeeper3
[root@kafka conf]# vim zoo3.cfg 
[root@kafka conf]# cat zoo3.cfg | grep server 
server.1=172.25.13.3:2888:3888
server.2=172.25.13.3:2889:3889
server.3=172.25.13.3:2890:3890
[root@kafka conf]# cat zoo3.cfg | grep clientPort
clientPort=2183
[root@kafka conf]# cat zoo3.cfg | grep dataDir=
dataDir=/tmp/zookeeper/zk3

##创建myid
#1.创建/tmp/zookeeper目录
[root@kafka zookeeper-3.4.9]# mkdir /tmp/zookeeper/zk1 /tmp/zookeeper/zk2 /tmp/zookeeper/zk3 -p
[root@kafka zookeeper-3.4.9]# ll -d /tmp/zookeeper
drwxr-xr-x 5 root root 39 Jul 21 22:44 /tmp/zookeeper
[root@kafka zookeeper-3.4.9]# ll /tmp/zookeeper
total 0
drwxr-xr-x 2 root root 6 Jul 21 22:44 zk1
drwxr-xr-x 2 root root 6 Jul 21 22:44 zk2
drwxr-xr-x 2 root root 6 Jul 21 22:44 zk3
#2.设置myid
[root@kafka zookeeper-3.4.9]# echo 1 > /tmp/zookeeper/zk1/myid
[root@kafka zookeeper-3.4.9]# echo 2 > /tmp/zookeeper/zk2/myid
[root@kafka zookeeper-3.4.9]# echo 3 > /tmp/zookeeper/zk3/myid
[root@kafka zookeeper-3.4.9]# cat /tmp/zookeeper/zk1/myid
1
[root@kafka zookeeper-3.4.9]# cat /tmp/zookeeper/zk2/myid 
2
[root@kafka zookeeper-3.4.9]# cat /tmp/zookeeper/zk3/myid 
3
[root@kafka zookeeper-3.4.9]# 

##启动zookeeper
#1.启动
[root@kafka ~]# mv zookeeper-3.4.9 zookeeper
[root@kafka zookeeper]# pwd
/root/zookeeper
[root@kafka zookeeper]# cd bin
[root@kafka bin]# ./zkServer.sh start ../conf/zoo1.cfg  ##启动
ZooKeeper JMX enabled by default
Using config: ../conf/zoo1.cfg
Starting zookeeper ... STARTED
[root@kafka bin]# ./zkServer.sh start ../conf/zoo2.cfg 
ZooKeeper JMX enabled by default
Using config: ../conf/zoo2.cfg
Starting zookeeper ... STARTED
[root@kafka bin]# ./zkServer.sh start ../conf/zoo3.cfg 
ZooKeeper JMX enabled by default
Using config: ../conf/zoo3.cfg
Starting zookeeper ... STARTED
#2.测试是否启动成功(配置万没有成功可以等一会,系统可能慢,一会就可以启动)
[root@kafka bin]# ./zkServer.sh status ../conf/zoo1.cfg 
ZooKeeper JMX enabled by default
Using config: ../conf/zoo1.cfg
Mode: follower
[root@kafka bin]# ./zkServer.sh status ../conf/zoo2.cfg    ##可以看到2是被选举的leader
ZooKeeper JMX enabled by default
Using config: ../conf/zoo2.cfg
Mode: leader
[root@kafka bin]# ./zkServer.sh status ../conf/zoo3.cfg 
ZooKeeper JMX enabled by default
Using config: ../conf/zoo3.cfg
Mode: follower





3.2 配置kafka集群

kafka下载地址

将三个kafka节点部署在一台主机上作为集群,分别命名为server1,server2,server3。

##1.下载kafka
[root@kafka ~]# ll kafka_2.11-1.1.0.tgz 
-rwxr-xr-x 1 root root 56969154 Jul 21 23:37 kafka_2.11-1.1.0.tgz
[root@kafka ~]# tar zxf kafka_2.11-1.1.0.tgz 
[root@kafka ~]# ll -d kafka_2.11-1.1.0
drwxr-xr-x 6 root root 89 Mar 23  2018 kafka_2.11-1.1.0
[root@kafka ~]# mv kafka_2.11-1.1.0 kafka

##2.设置配置文件
[root@kafka ~]# cd kafka/config/
[root@kafka config]# cp server.properties server2.properties   ##采用示例配置文件
[root@kafka config]# cp server.properties server3.properties 
[root@kafka config]# mv server.properties server1.properties 

##3.修改配置文件参数 
#1.server1的kafka配置
[root@kafka config]# pwd 
/root/kafka/config
[root@kafka config]# vim server1.properties    ##修改配置文件
[root@kafka config]# cat server1.properties | grep ^[^#]   ##查看所有的配置文件
broker.id=1    #改
listeners=PLAINTEXT://:9092  #改
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/root/kafka/kafka-logs/kafka1  #改 文件夹没有自己建立一个
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183  ##配置zk集群
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#2.server2的kafka配置
[root@kafka config]# vim server2.properties 
[root@kafka config]# cat server2.properties | grep ^[^#] 
broker.id=2     ##broker
listeners=PLAINTEXT://:9093   ##端口
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/root/kafka/kafka-logs/kafka2   ##日志文件位置
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

#3.server3的kafka配置
[root@kafka config]# pwd
/root/kafka/config
[root@kafka config]# cat server3.properties | grep ^[^#] 
broker.id=3
listeners=PLAINTEXT://:9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/root/kafka/kafka-logs/kafka3
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

##4.创建日志目录
[root@kafka config]# mkdir /root/kafka/kafka-logs/kafka1 -p 
[root@kafka config]# mkdir /root/kafka/kafka-logs/kafka2 -p 
[root@kafka config]# mkdir /root/kafka/kafka-logs/kafka3 -p 
[root@kafka config]# ll -d /root/kafka/kafka-logs/kafka1
drwxr-xr-x 2 root root 6 Jul 22 00:05 /root/kafka/kafka-logs/kafka1
[root@kafka config]# ll -d /root/kafka/kafka-logs/kafka2
drwxr-xr-x 2 root root 6 Jul 22 00:05 /root/kafka/kafka-logs/kafka2
[root@kafka config]# ll -d /root/kafka/kafka-logs/kafka3
drwxr-xr-x 2 root root 6 Jul 22 00:05 /root/kafka/kafka-logs/kafka3



3.3 启动集群

[root@kafka bin]# pwd
/root/kafka/bin
[root@kafka bin]# ./kafka-server-start.sh -daemon /root/kafka/config/server1.properties ##启动
[root@kafka bin]# ./kafka-server-start.sh -daemon /root/kafka/config/server2.properties 
[root@kafka bin]# ./kafka-server-start.sh -daemon /root/kafka/config/server3.properties 
[root@kafka bin]# jps | grep -v Jps  ##查看是否启动成功
8064 Kafka
8337 Kafka
8609 Kafka
4418 QuorumPeerMain
4392 QuorumPeerMain
4443 QuorumPeerMain
[root@kafka logs]# pwd
/root/kafka/logs
[root@kafka logs]# cat kafkaServer.out   ##查看日志也可以看到集群是否启动



3.4 设置zookeeper开机自启动

3.4.1 法一(简单)

  • 直接修改/etc/rc.d/rc.local文件
  • 在/etc/rc.d/rc.local文件中需要输入两行,
    其中export JAVA_HOME=/usr/java/jdk1.8.0_112是必须要有的,否则开机启动不成功,大家根据自己JDK安装的位置自行更改。
    另一行 /usr/local/zookeeper-3.4.5/bin/zkServer.sh start (本机配置的是三个zookeeper不同端口,start后面加配置文件地址即可)
    则是我们zookeeper的启动命令。配置好之后,重启虚拟机,会发现已经可以开机自启了。

3.4.2 法二(将zookeeper做成服务)

#1.先切换目录然后建立服务文件
[root@kafka ~]# cd /etc/rc.d/init.d/
[root@kafka init.d]# ls
elasticsearch  functions  jexec  netconsole  network  README  rhnsd
[root@kafka init.d]# touch zookeeper
[root@kafka init.d]# chmod +x zookeeper 
[root@kafka init.d]# ll zookeeper 
-rwxr-xr-x 1 root root 0 Jul 25 04:34 zookeeper

#2.书写文件内容(注意事项一样要添加export JAVA_HOME=//usr/java/jdk1.8.0_112这一行,否则无法正常启动。)
[root@kafka init.d]# cat zookeeper   ##下面配置是因为我的是本机三个端口对应的不同zookeeper配置文件
#!/bin/bash
#chkconfig:2345 20 90
#description:zookeeper
#processname:zookeeper
export JAVA_HOME=//usr/java/jdk1.8.0_171-amd64/
case $1 in
        start) su root /root/zookeeper/bin/zkServer.sh start /root/zookeeper/conf/zoo1.cfg && su root /root/zookeeper/bin/zkServer.sh start /root/zookeeper/conf/zoo2.cfg && su root /root/zookeeper/bin/zkServer.sh start /root/zookeeper/conf/zoo3.cfg;;   ##start后面是自己的zk配置文件
        stop) su root /root/zookeeper/bin/zkServer.sh stop /root/zookeeper/conf/zoo1.cfg &&  su root /root/zookeeper/bin/zkServer.sh stop /root/zookeeper/conf/zoo2.cfg &&  su root /root/zookeeper/bin/zkServer.sh stop /root/zookeeper/conf/zoo3.cfg;;
        status) su root /root/zookeeper/bin/zkServer.sh status /root/zookeeper/conf/zoo1.cfg &&  su root /root/zookeeper/bin/zkServer.sh status /root/zookeeper/conf/zoo2.cfg &&  su root /root/zookeeper/bin/zkServer.sh status /root/zookeeper/conf/zoo3.cfg;;
        restart) su /root/zookeeper/bin/zkServer.sh restart /root/zookeeper/conf/zoo1.cfg &&  su root /root/zookeeper/bin/zkServer.sh restart /root/zookeeper/conf/zoo2.cfg &&  su root /root/zookeeper/bin/zkServer.sh restart /root/zookeeper/conf/zoo3.cfg;;
        *) echo "require start|stop|status|restart" ;;
esac

##如果是三台机器配置zookeeper集群,可以写成下面的
[root@zookeeper init.d]# vim zookeeper 
#!/bin/bash
#chkconfig:2345 20 90
#description:zookeeper
#processname:zookeeper
export JAVA_HOME=//usr/java/jdk1.8.0_112   ##自己的java安装位置
case $1 in
        start) su root /usr/local/zookeeper-3.4.5/bin/zkServer.sh start;;  ##自己的zk的bin目录
        stop) su root /usr/local/zookeeper-3.4.5/bin/zkServer.sh stop;;
        status) su root /usr/local/zookeeper-3.4.5/bin/zkServer.sh status;;
        restart) su /usr/local/zookeeper-3.4.5/bin/zkServer.sh restart;;
        *) echo "require start|stop|status|restart" ;;
esac

## 3.验证
[root@kafka init.d]# service zookeeper start
ZooKeeper JMX enabled by default
Using config: /root/zookeeper/conf/zoo1.cfg
Starting zookeeper ... STARTED
ZooKeeper JMX enabled by default
Using config: /root/zookeeper/conf/zoo2.cfg
Starting zookeeper ... STARTED
ZooKeeper JMX enabled by default
Using config: /root/zookeeper/conf/zoo3.cfg
Starting zookeeper ... STARTED

##4. 设置开机自启动
[root@kafka init.d]# chkconfig --add zookeeper    ##添加自启动
[root@kafka init.d]# chkconfig --list
Note: This output shows SysV services only and does not include native
      systemd services. SysV configuration data might be overridden by native
      systemd configuration.

      If you want to list systemd services use 'systemctl list-unit-files'.
      To see services enabled on particular target use
      'systemctl list-dependencies [target]'.

jexec          	0:off	1:on	2:on	3:on	4:on	5:on	6:off
netconsole     	0:off	1:off	2:off	3:off	4:off	5:off	6:off
network        	0:off	1:off	2:on	3:on	4:on	5:on	6:off
rhnsd          	0:off	1:off	2:on	3:on	4:on	5:on	6:off
zookeeper      	0:off	1:off	2:on	3:on	4:on	5:on	6:off

3.5 配置kafka开机自启动

##配置方式类似于设置zookeeper开机自启动
##1.创建kafka文件
[root@kafka ~]# cd /etc/init.d/
[root@kafka init.d]# touch kafka
[root@kafka init.d]# chmod +x kafka 
[root@kafka init.d]# ll kafka 
-rwxr-xr-x 1 root root 0 Jul 25 06:13 kafka

##2.书写配置文件
[root@kafka init.d]# vim kafka 
[root@kafka ~]# cat /etc/init.d/kafka 
#!/bin/bash
#chkconfig:2345 30 80
#description:kafka
#processname:kafka
case $1 in
        start) /root/kafka/bin/kafka-server-start.sh -daemon /root/kafka/config/server1.properties && /root/kafka/bin/kafka-server-start.sh -daemon /root/kafka/config/server2.properties && /root/kafka/bin/kafka-server-start.sh -daemon /root/kafka/config/server3.properties;;
        stop) /root/kafka/bin/kafka-server-start.sh -daemon /root/kafka/config/server1.properties && /root/kafka/bin/kafka-server-start.sh -daemon /root/kafka/config/server2.properties && /root/kafka/bin/kafka-server-start.sh -daemon /root/kafka/config/server3.properties;;
        status) jps;;
	restart) /root/kafka/bin/kafka-server-start.sh -daemon /root/kafka/config/server1.properties && /root/kafka/bin/kafka-server-start.sh -daemon /root/kafka/config/server2.properties && /root/kafka/bin/kafka-server-start.sh -daemon /root/kafka/config/server3.properties;;
        *) echo "require start|stop|status|restart";;
esac


##3.部署开机自启动
[root@kafka init.d]# ls
elasticsearch  jexec  netconsole  README  zookeeper
functions      kafka  network     rhnsd
[root@kafka init.d]# chkconfig --add kafka 
[root@kafka init.d]# chkconfig --list

Note: This output shows SysV services only and does not include native
      systemd services. SysV configuration data might be overridden by native
      systemd configuration.

      If you want to list systemd services use 'systemctl list-unit-files'.
      To see services enabled on particular target use
      'systemctl list-dependencies [target]'.

jexec          	0:off	1:on	2:on	3:on	4:on	5:on	6:off
kafka          	0:off	1:off	2:on	3:on	4:on	5:on	6:off
netconsole     	0:off	1:off	2:off	3:off	4:off	5:off	6:off
network        	0:off	1:off	2:on	3:on	4:on	5:on	6:off
rhnsd          	0:off	1:off	2:on	3:on	4:on	5:on	6:off
zookeeper      	0:off	1:off	2:on	3:on	4:on	5:on	6:off
[root@kafka init.d]# chkconfig kafka on    ##add完默认是on的,怕没有可以执行一下

##4.验证
[root@kafka ~]# service kafka start 
[root@kafka ~]# jps
3539 QuorumPeerMain
6692 Kafka
3285 QuorumPeerMain
5686 Kafka
6423 Kafka
6887 Jps
3565 QuorumPeerMain


4. 验证部署

将从以下几个方面验证kafka集群部署的正确性
	1、测试 topic 创建与删除。
	2、测试消息的生产与发送
	3、生产者吞吐量测试。
	4、消费者吞吐量测试。
[root@kafka ~]# vim .bash_profile    ##可以配置环境变量
[root@kafka ~]# source .bash_profile 
[root@kafka ~]# cat .bash_profile | grep kafka
PATH=$PATH:$HOME/bin:/root/kafka/bin

4.1 测试topic的创建和删除

4.1.1 创建topic—>test

[root@kafka bin]# pwd
/root/kafka/bin
[root@kafka bin]# kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --create --topic test-topic --partitions 1 --replication-factor 3  
Created topic "test-topic".    ##创建topic成功

4.1.2 查看该topic的具体情况

[root@kafka bin]# kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 -list
test-topic      ##列出所有的topic
[root@kafka bin]# kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 -describe --topic test-topic   ##查看指定topic的详细情况
Topic:test-topic	PartitionCount:1	ReplicationFactor:3	Configs:
Topic: test-topic	Partition: 0	    Leader: 2	        Replicas: 2,3,1Isr: 2,3,1

显而易见,该 topic 下有1个分区,每个分区有3个副本,分区leader是2,表明 Kafka 将该 topic 的这个分区均匀地在3台broker 上进行了分配。

4.1.3 删除topic

[root@kafka bin]# kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 -delete --topic test-topic   ##删除topic

说明:上面的输出仅仅表示该 topic 被成功地标记为“待删除”,至于 topic 是否会被真正删除取决于 broker 端参数 delete.topic.enable 该参数。该参数在当前 Kafka 1.0.0 版本中被默认设置为 true ,即 表明 Kafka 默认允许删除 topic。事实上,该参数在旧版本中默认值 一直是false ,故若用户显 式设置该参数为 false ,或使用了 1.0 .0 之前版本的默认值,那么即使运行了上面的命令, Kafka 也不会删除该 topic,我们本次使用的kafka版本是1.1.0,所以默认是为true,通过list命令再次查看kafka的topic列表,已经删除了(该删除是一个异步过程)。

4.2 测试消息的生产与发送

4.2.1 创建生产者

[root@kafka bin]# pwd
/root/kafka/bin
[root@kafka bin]# kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic  

4.2.2 创建消费者

[root@kafka bin]# kafka-console-consumer.sh --bootstrap-server localhost:9092calhost:9093,localhost:9094 --topic test-topic --from-beginning  ##消费者(新版本用法)

4.2.3 生产者吞吐量测试

[root@kafka bin]# kafka-producer-perf-test.sh --topic test-topic --num-records 50000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=localhost:9092,localhost:9093,localhost:9094 acks=-1

4.2.4 消费者吞吐量测试

[root@kafka bin]# kafka-consumer-perf-test.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --messages 50000 --topic test-topic

5. kafka监控工具KafkaOffsetMonitor配置及使用

5.1 介绍

KafkaOffsetMonitor是用来实时监控Kafka集群中的consumer以及在队列中的位置(偏移量)。

你可以查看当前的消费者组,每个topic队列的所有partition的消费情况。可以很快地知道每个partition中的消息是否很快被消费以及相应的队列消息增长速度等信息。这些可以debug kafka的producer和consumer,你完全知道你的系统将会发生什么。

这个web管理平台保留的partition offset和consumer滞后的历史数据(具体数据保存多少天我们可以在启动的时候配置),所以你可以很轻易了解这几天consumer消费情况。

KafkaOffsetMonitor这款软件是用Scala代码编写的,消息等历史数据是保存在名为offsetapp.db数据库文件中,该数据库是SQLLite文件,非常的轻量级。虽然我们可以在启动KafkaOffsetMonitor程序的时候指定数据更新的频率和数据保存的时间,但是不建议更新很频繁,或者保存大量的数据,因为在KafkaOffsetMonitor图形展示的时候会出现图像展示过慢,或者是直接导致内存溢出了。

5.2 下载安装

下载地址

##1.下载插件
[root@kafka ~]# ll KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar    ##百度网盘下载
-rwxrwxrwx 1 root root 48089949 Jul 22 03:09 KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar

##2.启动脚本(标准输入输出的日志位置自己定义)
[root@kafka ~]# java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --offsetStorage kafka --zk 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --port 8088 --refresh 5.seconds  --retain 2.days 1>/root/kafka/logs/stdout.log 2>/root/kafka/logs/stderr.log &

##3. 访问
#网页输入:
http://ip:8088  

##4.关闭脚本
[root@kafka ~]# jps
8064 Kafka
8337 Kafka
8609 Kafka
4418 QuorumPeerMain
24438 Jps
4392 QuorumPeerMain
24314 OffsetGetterWeb   ##web后台进程
4443 QuorumPeerMain
[root@kafka ~]# killnum=`jps | grep OffsetGetterWeb | awk '{print $1}'`  
[root@kafka ~]# kill -9 ${killnum}    


5.3 测试

[root@kafka ~]# kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --create --topic test-topic --partitions 3 --replication-factor 3    ##创建topic
Created topic "test-topic".
[root@kafka ~]# kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic        ##创建生产者
>111
>222
>333
>444
>555
>666
>777
>888
>999
>000
>^C[root@kafka ~]# kafka-console-consumer.sh --bootstrap-server localhost:9calhost:9093,localhost:9094 --topic test-topic --from-beginning --consumer-property group.id=group_test   ##创建消费者和消费者组
[2021-07-22 03:59:07,842] WARN Removing server localhost:9092calhost:9093 from bootstrap.servers as DNS resolution failed for localhost:9092calhost (org.apache.kafka.clients.ClientUtils)
222
555
888
333
666
999
111
444
777
000
^CProcessed a total of 10 messages


以上是关于kafka集群的部署及kafka监控工具的主要内容,如果未能解决你的问题,请参考以下文章

如何在CDH集群中部署Kafka Manager

Kafka的集群部署实践及运维相关

kafka eagle安装部署

FlinkCdc--Debezium实现Kafka实时监控mysql binlog日志

几种常见的 Kafka 集群监控工具

几种常见的 Kafka 集群监控工具