kafka环境搭建
Posted edycm
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka环境搭建相关的知识,希望对你有一定的参考价值。
kafka环境搭建
测试环境
centos 7.8.2003
JDK安装
下载地址: https://www.oracle.com/java/technologies/downloads/
当前下载路径:https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
下载安装:
$ curl -O https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
$ tar -xvf jdk-17_linux-x64_bin.tar.gz
#jdk-17.0.1
$ echo "export JAVA_HOME=`pwd`/jdk-17.0.1" >>/etc/profile
$ echo 'export CLASSPATH=$:CLASSPATH:$JAVA_HOME/lib/' >>/etc/profile
$ echo 'export PATH=$PATH:$JAVA_HOME/bin' >>/etc/profile
$ source /etc/profile
$ java -version
java version "17.0.1" 2021-10-19 LTS
Java(TM) SE Runtime Environment (build 17.0.1+12-LTS-39)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.1+12-LTS-39, mixed mode, sharing)
单节点
下载安装
#下载地址:http://kafka.apache.org/downloads
#当前版本:2.8.1
$ curl -O https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
$ tar -xvf kafka_2.13-2.8.1.tgz
#进入kafka目录
$ cd kafka_2.13-2.8.1
配置文件修改
$ vi config/server.properties
#添加监听的ip端口配置
listeners=PLAINTEXT://<ip>:9092
#日志存储目录
log.dirs=<log_dir>/kafka-logs
启动
#先启动zookeeper
$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
#然后启动kafka
$ ./bin/kafka-server-start.sh ./config/server.properties
zookeeper查看节点信息
$ ./bin/zookeeper-shell.sh localhost:2181
#输入如下命令(id为kafka配置文件中的broker.id):
#不知道id可以先查看
#跟多命令可以使用help查看
> ls /brokers/ids
[0]
> get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://a4b96b9307b2:9092"],"jmx_port":-1,"port":9092,"host":"a4b96b9307b2","version":5,"timestamp":"1634886207746"}
# 查看topic分区信息
> ls /brokers/topics/<topic name>/partitions
基于脚本的kafka操作
所有脚本可在下载的bin目录下找到
官网介绍:https://kafka.apache.org/documentation/#configuration
#主题(topic)
#测试主题名test
#查看topic列表
$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
#创建topic, --partitions: 分区数,--replication-factor: 副本数,不能超过broker数量
$ ./bin/kafka-topics.sh --bootstrap-server <ip>:9092 --create --topic test --partitions 20 --replication-factor 3 --config x=y
#修改topic配置和分区配置
#分区只能增加,不支持减少
$ ./bin/kafka-topics.sh --bootstrap-server <ip>:9092 --alter --topic test --partitions 30
#修改配置
#增加配置
$ ./bin/kafka-configs.sh --bootstrap-server <ip>:9092 --entity-type topics --entity-name test --alter --add-config x=y
#删除配置
$ ./bin/kafka-configs.sh --bootstrap-server <ip>:9092 --entity-type topics --entity-name test --alter --delete-config x
#删除主题
$ ./bin/kafka-topics.sh --bootstrap-server <ip>:9092 --delete --topic test
#获取分区最新offset
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <ip>:9092 --topic test
消费测试
#启动消费者,消费消息
./bin/kafka-console-consumer.sh --bootstrap-server <ip>:9092 --topic test1 --group group1
#启动生产者,生产消息
./bin/kafka-console-producer.sh --bootstrap-server <ip>:9092 --topic test1
集群
zookeeper使用kafka包里自带的,集群最少需要三台服务器,且官方强烈建议使用奇数台服务器。
zookeeper配置:
更多配置参考官网(版本3.7.0):https://zookeeper.apache.org/doc/r3.7.0/zookeeperStarted.html
$ vi config/zookeeper.properties
#注释可能不准确,原文解释可查看官网
#zookeeper使用的基本时间单位,单位是ms,用于心跳,最小会话超时将是tickTime 的两倍
tickTime=2000
#简单理解为保存数据的目录
dataDir=/var/lib/zookeeper
#监听端口
clientPort=2181
#连接到主节点的超时时间,使用tickTime为时间单位,即超时时间为2000 * 5 = 10000ms = 10s
initLimit=5
#从节点与主节点同步的超时时间,使用tickTime为时间单位,即超时时间为2000 * 2 = 4000ms = 4s
syncLimit=2
#集群信息,第一个ip用于从节点与主节点进行通信,第二个ip用于选举主节点,都是tcp通信
server.1=<zoo1>:2888:3888
server.2=<zoo2>:2888:3888
server.3=<zoo3>:2888:3888
#创建zookeeper集群需要的myid文件(文件路径为配置文件中的<dataDir>), 然后在里面添加一个数字来标识此节点,与kafka broker id类似
$ echo 1 >> <dataDir>/myid
kafka配置:
$ vi config/server.properties
#broker id,每个节点需设置不同的值
broker.id=0
#添加zookeeper集群信息
zookeeper.connect=<zoo1>:2181,<zoo2>:2181,<zoo3>:2181
kafka官方建议需配置项:
# ZooKeeper
zookeeper.connect=[list of ZooKeeper servers]
# Log configuration
num.partitions=8
default.replication.factor=3 # 不能大于集群数量
log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]
# Other configurations
broker.id=[An integer. Start with 0 and increment by 1 for each new broker.]
listeners=[list of listeners]
auto.create.topics.enable=false
min.insync.replicas=2
queued.max.requests=[number of concurrent requests]
以上是关于kafka环境搭建的主要内容,如果未能解决你的问题,请参考以下文章
Kafka基于Windows的Kafka有关环境搭建以及使用.NET环境开发的案例代码与演示...
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析(示例代码