kafka
Posted zzl0916
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka相关的知识,希望对你有一定的参考价值。
今天来安装kafka 安装kafka前台必须安装zookeeper 不会安装请移步:点我快速进入安装zookeeper文章
一、安装kafka
下载kafka两种方式
1、手动下载 下载地址:http://kafka.apache.org/downloads
下载好了然后上传服务器(不在这里赘述。前两个文章都有!)
2、也可以用命令下载(直接下载到服务器哦!):
命令:wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz
二、解压kafka:
命令:tar -xzvf kafka_2.12-2.0.0.tgz(文件名可以更换哦!)
三、修改server.properties文件
在config目录下输入命令:vi server.properties
修改内容为:
-
server.properties修改 内容如下:可直接复制粘贴。
-
broker.id=0
-
port=9092 #端口号
-
host.name=localhost #单机可直接用localhost
-
log.dirs=/DATA/kafka/kafka_2.12-2.0.0/log #日志存放路径可修改可不修改
-
zookeeper.connect=localhost:2181 #zookeeper地址和端口,单机配置部署,localhost:2181
由于我修改了日志存放地址 我需要创建一个文件夹
命令:直接在kafka目录下输入:mkdir log
四、修改zookeeper.properties文件
-
进入kafka目录下 config目录下输入命令:vi zookeeper.properties
-
-
修改内容为:
-
dataDir=/DATA/kafka/kafka_2.12-2.0.0/zookeeper/data/dataDir #zookeeper数据目录 (可以修改可以不修改)
-
dataLogDir=/DATA/kafka/kafka_2.12-2.0.0/zookeeper/data/dataLogDir #zookeeper日志目录 (可以修改可以不修改)
-
clientPort=2181
-
maxClientCnxns=100
-
tickTime=2000
-
initLimit=10
由于我修改了默认的数据目录地址和日志目录需要创建文件夹如下图:
五、修改完之后就可以启动zookeeper和kafka了。直接敲命令感觉有些low呀。弄一个脚本命令吧: 启动脚本:
-
进入kafka目录下 输入命令:vi kafkaStart.sh
-
-
添加内容为:
-
#!/bin/bash
-
#启动zookeeper
-
/DATA/kafka/kafka_2.12-2.0.0/bin/zookeeper-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/zookeeper.properties &
-
sleep 3 #默默等3秒后执行
-
#启动kafka
-
/DATA/kafka/kafka_2.12-2.0.0/bin/kafka-server-start.sh /DATA/kafka/kafka_2.12-2.0.0/config/server.properties &
-
添加脚本执行权限:
-
在刚创建的脚本目录许下执行以下命令
-
chmod +x kafkastart.sh(启动脚本名称)
六、启动kafka
1、先启动zookeeper。
启动命令:sh $zookeeper_home/bin/zkServer.sh start
2、启动kafka
在kafka目录下输入 启动脚本命令: ./kafkaStart.sh
七、创建topic 出现Created topid test 则创建成功
-
命令:在kafka 目录下bin目录下执行:
-
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test test是topic名字可以随便换哦。
查询创建的top主题 出现test则为正常
-
命令:在kafka 目录下bin目录下执行:
-
./kafka-topics.sh --list --zookeeper localhost:2181
删除创建的topic
-
命令:在kafka 目录下bin目录下执行:
-
./kafka-topics.sh --delete --zookeeper localhost:2181 --topic 要删除的topic名称
八、测试生成者(test topic):
-
命令:在kafka 目录下bin目录下执行:
-
./kafka-console-producer.sh --broker-list localhost:9092 --topic test(要启动生成者的topic名称)
九、测试消费者 启动另一个xsheel窗口 这样效果更明显哦!(test topic):
-
命令:在kafka 目录下bin目录下执行:
-
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
十、生产者发送消息:
十一、消费者消费消息:
发送完消息消费者会变成感叹号哦!点击查看输入内容已经打印控制台证明消费成功。
十二、到此结束kafka单机版本就集成了哦。我们来停止kafka网上有很多脚本 你们可以参考一下
关闭脚本:
-
进入kafka目录下 输入命令:vi kafkaStop.sh
-
#!/bin/bash
-
#关闭zookeeper
-
/DATA/kafka/kafka_2.12-2.0.0/bin/zookeeper-server-stop.sh /DATA/kafka/kafka_2.12-2.0.0/config/zookeeper.properties &
-
sleep 3 #默默等3秒后执行
-
#关闭kafka
-
/DATA/kafka/kafka_2.12-2.0.0/bin/kafka-server-stop.sh /DATA/kafka/kafka_2.12-2.0.0/config/server.properties &
-
关闭命令:
关闭kafka:
-
在kafka目录下执行
-
./kafkaStop.sh
关闭zookeeper
sh $zookeeper_home/bin/zkServer.sh stop
但是他会报没有启动着zookeeper和kafka
我们ps-ef | grep kafka 一下你会发现还在启动着。
上面的方法我没成功。你们可以测试一下。用最粗暴的方法把。
-
ps -ef | grep kafka
-
kill -9 kafka进程号
-
ps -ef | grep zookeeper
-
kill -9 zookeeper进程号
注意:一定要先关闭kafka在关闭zookeeper !!! 如果先关闭zookeeper kafka会一直去连接zookeeper服务 进入死循环了。 如果进入死循环有两种解决办法:
1、重启服务
-
shutdown -h 10 #计算机将于10分钟后关闭,且会显示在登录用户的当前屏幕中
-
shutdown -h now #计算机会立刻关机
-
shutdown -h 22:22 #计算机会在这个时刻关机
-
shutdown -r now #计算机会立刻重启
-
shutdown -r +10 #计算机会将于10分钟后重启
-
reboot #重启
-
halt #关机
2、新打开一个xsheel窗口 然后把kafka杀掉。
以上是关于kafka的主要内容,如果未能解决你的问题,请参考以下文章
Flink实战系列Flink 1.14.0 消费 kafka 数据自定义反序列化器
kafkaThe group member needs to have a valid member id before actually entering a consumer group(代码片段
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段
Camel-Kafka java.io.EOFException - NetworkReceive.readFromReadableChannel