kafka环境搭建

Posted edycm

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka环境搭建相关的知识,希望对你有一定的参考价值。

kafka环境搭建

测试环境

centos 7.8.2003

JDK安装

下载地址: https://www.oracle.com/java/technologies/downloads/

当前下载路径:https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz

下载安装:

$ curl -O https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
$ tar -xvf jdk-17_linux-x64_bin.tar.gz
#jdk-17.0.1
$ echo "export JAVA_HOME=`pwd`/jdk-17.0.1" >>/etc/profile
$ echo 'export CLASSPATH=$:CLASSPATH:$JAVA_HOME/lib/' >>/etc/profile
$ echo 'export PATH=$PATH:$JAVA_HOME/bin' >>/etc/profile
$ source /etc/profile
$ java -version
java version "17.0.1" 2021-10-19 LTS
Java(TM) SE Runtime Environment (build 17.0.1+12-LTS-39)
Java HotSpot(TM) 64-Bit Server VM (build 17.0.1+12-LTS-39, mixed mode, sharing)

单节点

下载安装

#下载地址:http://kafka.apache.org/downloads
#当前版本:2.8.1
$ curl -O https://archive.apache.org/dist/kafka/2.8.1/kafka_2.13-2.8.1.tgz
$ tar -xvf kafka_2.13-2.8.1.tgz
#进入kafka目录
$ cd kafka_2.13-2.8.1

配置文件修改

$ vi config/server.properties 
	#添加监听的ip端口配置
	listeners=PLAINTEXT://<ip>:9092
	#日志存储目录
	log.dirs=<log_dir>/kafka-logs

启动

#先启动zookeeper
$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties
#然后启动kafka
$ ./bin/kafka-server-start.sh ./config/server.properties

zookeeper查看节点信息

$ ./bin/zookeeper-shell.sh localhost:2181
#输入如下命令(id为kafka配置文件中的broker.id):
#不知道id可以先查看
#跟多命令可以使用help查看
> ls /brokers/ids
[0]
> get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://a4b96b9307b2:9092"],"jmx_port":-1,"port":9092,"host":"a4b96b9307b2","version":5,"timestamp":"1634886207746"}
# 查看topic分区信息
> ls /brokers/topics/<topic name>/partitions

基于脚本的kafka操作

所有脚本可在下载的bin目录下找到

官网介绍:https://kafka.apache.org/documentation/#configuration

#主题(topic)
#测试主题名test
#查看topic列表
$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
#创建topic, --partitions: 分区数,--replication-factor: 副本数,不能超过broker数量
$ ./bin/kafka-topics.sh --bootstrap-server <ip>:9092 --create --topic test --partitions 20 --replication-factor 3 --config x=y
#修改topic配置和分区配置
#分区只能增加,不支持减少
$ ./bin/kafka-topics.sh --bootstrap-server <ip>:9092 --alter --topic test --partitions 30

#修改配置
#增加配置
$ ./bin/kafka-configs.sh --bootstrap-server <ip>:9092 --entity-type topics --entity-name test --alter --add-config x=y
#删除配置
$ ./bin/kafka-configs.sh --bootstrap-server <ip>:9092 --entity-type topics --entity-name test --alter --delete-config x
#删除主题
$ ./bin/kafka-topics.sh --bootstrap-server <ip>:9092 --delete --topic test

#获取分区最新offset
$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <ip>:9092 --topic test

消费测试

#启动消费者,消费消息
./bin/kafka-console-consumer.sh --bootstrap-server <ip>:9092 --topic test1 --group group1
#启动生产者,生产消息
./bin/kafka-console-producer.sh --bootstrap-server <ip>:9092 --topic test1

集群

zookeeper使用kafka包里自带的,集群最少需要三台服务器,且官方强烈建议使用奇数台服务器。

zookeeper配置:

更多配置参考官网(版本3.7.0):https://zookeeper.apache.org/doc/r3.7.0/zookeeperStarted.html

$ vi config/zookeeper.properties
   	#注释可能不准确,原文解释可查看官网
    #zookeeper使用的基本时间单位,单位是ms,用于心跳,最小会话超时将是tickTime 的两倍
    tickTime=2000
    #简单理解为保存数据的目录
    dataDir=/var/lib/zookeeper
    #监听端口
    clientPort=2181
    #连接到主节点的超时时间,使用tickTime为时间单位,即超时时间为2000 * 5 = 10000ms = 10s
    initLimit=5
    #从节点与主节点同步的超时时间,使用tickTime为时间单位,即超时时间为2000 * 2 = 4000ms = 4s
    syncLimit=2
    #集群信息,第一个ip用于从节点与主节点进行通信,第二个ip用于选举主节点,都是tcp通信
    server.1=<zoo1>:2888:3888
    server.2=<zoo2>:2888:3888
    server.3=<zoo3>:2888:3888
#创建zookeeper集群需要的myid文件(文件路径为配置文件中的<dataDir>), 然后在里面添加一个数字来标识此节点,与kafka broker id类似
$ echo 1 >> <dataDir>/myid

kafka配置:

$ vi config/server.properties
	#broker id,每个节点需设置不同的值
	broker.id=0
	#添加zookeeper集群信息
	zookeeper.connect=<zoo1>:2181,<zoo2>:2181,<zoo3>:2181

kafka官方建议需配置项:

# ZooKeeper
zookeeper.connect=[list of ZooKeeper servers]

# Log configuration
num.partitions=8
default.replication.factor=3 # 不能大于集群数量
log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]

# Other configurations
broker.id=[An integer. Start with 0 and increment by 1 for each new broker.]
listeners=[list of listeners]
auto.create.topics.enable=false
min.insync.replicas=2
queued.max.requests=[number of concurrent requests]

以上是关于kafka环境搭建的主要内容,如果未能解决你的问题,请参考以下文章

Kafka基于Windows的Kafka有关环境搭建以及使用.NET环境开发的案例代码与演示...

windows环境下kafka源代码开发环境搭建

Idea下Kafka源码阅读编译环境搭建

Kafka入门实战教程基于Docker搭建Kafka环境

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十三)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析(示例代码

kafka单机环境搭建及其基本使用