kafka搭建单机开发教程

Posted TGITCIC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka搭建单机开发教程相关的知识,希望对你有一定的参考价值。

下载安装

下载apache kafka2.11及以上版本Apache Kafka

下载后解压成这样的形式

改配置(单机非常的简单)

对于/conf/目录下新建一个properties文件叫server.properties文件即可。集群我后面单独会讲,那块有点篇幅的但不适合刚入手的开发,我们先把Kafka转起来再说。

server.properties

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://127.0.0.1:9092
port=9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
host.name=127.0.0.1
 
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
 
socket.request.max.bytes=104857600
 
 
log.dirs=/Users/chrishu123126.com/opt/kafka2.11/logs
num.partitions=3
 
num.recovery.threads.per.data.dir=1
 
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
 
 
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
 
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=6000
 
group.initial.rebalance.delay.ms=0
 
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

 

关键参数解释:

  • broker.id=0,只有一个kafka broker
  • delete.topic.enable=true,这个值如果不设,你运行kafka 里的删除topic命令是没用的,还需要自己跑到zk里去把topic删了;
  • listeners=PLAINTEXT://127.0.0.1:9092,kafka的监听地址,这边是本机,如果是连接kafka和kafka不在同一台服务器上那么你需要在此处把127.0.0.1换成这台机器的局域网ip;
  • port=9092,向远程暴露kafka服务的地址;
  • log.dirs=/Users/chrishu123126.com/opt/kafka2.11/logs,不仅仅是kafka的log日志,kafka的topic里的暂存信息也是用的这个目录,它实际是一个log日志+kafka的data文件所在路径;
  • num.partitions=3,不管是单机还是多机生产集群,这个最小数目是3.我们在后期说多机生产集群环境时,会详细述说kafka里的partition、broker、consumer几者间的关系,这三个概念是很重要的;
  • zookeeper.connect=localhost:2181,localhost:2182,localhost:2183,必须指定。kafka的节点间的信息是依靠zk去做协调的;
  • zookeeper.connection.timeout.ms=6000,ZK连接超时,一般为2-6s间即可;

zk集群本机搭建

这个太简单了,在此篇博文中就不作展开,网上有太多例子

我这边直接给出搭建所用的全配置。

先设3个zoo.cfg

分别命名为zoo1.cfg、zoo2.cfg、zoo3.cfg

zoo1.cfg

# The number of milliseconds of each tick
#zk时间单元(毫秒)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# floower启动过程中从leader同部数据的时间限制
#如果集群规模大,数据量多的话,适当调大此参数
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk1
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk1
clientPort=2181
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#leader不接受客户端连接,专注于通信和选举等
leaderServes=no
maxClientCnxns=1000
forceSync=no

zoo2.cfg

# The number of milliseconds of each tick
#zk时间单元(毫秒)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# floower启动过程中从leader同部数据的时间限制
#如果集群规模大,数据量多的话,适当调大此参数
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk2
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk2
clientPort=2182
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#leader不接受客户端连接,专注于通信和选举等
leaderServes=no
maxClientCnxns=1000
forceSync=no

zoo3.cfg

# The number of milliseconds of each tick
#zk时间单元(毫秒)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# floower启动过程中从leader同部数据的时间限制
#如果集群规模大,数据量多的话,适当调大此参数
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk3
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk3
clientPort=2183
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#leader不接受客户端连接,专注于通信和选举等
leaderServes=no
maxClientCnxns=1000
forceSync=no

开始搭建ZK集群

我们用的是zookeeper3.4.3

输入如下命令

touch data/zk1/myid
touch data/zk2/myid
touch data/zk3/myid

这样在每个/data/zk1这样的目录下会有一个空的文件叫myid

接下来再输入以下命令

echo 1 > data/zk1/myid
echo 2 > data/zk2/myid
echo 3 > data/zk3/myid

启动zk群

sh zkServer.sh start ../conf/zoo1.cfg
sh zkServer.sh start ../conf/zoo2.cfg
sh zkServer.sh start ../conf/zoo3.cfg

下面给出一条命令启动、一条命令关闭整个zk群的命令。

zk-start.sh

#!/bin/sh
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo1.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo2.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo3.cfg

zk-stop.sh

#!/bin/sh
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo1.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo2.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo3.cfg

运行kafka

./kafka-server-start.sh -daemon ../config/server.properties

如何创建一个kafka的topic和消费呢?

kafka创建producer

## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

命令结束后,你会得到一个command窗口。

在这个command窗口直接输入任意内容,即相当于在mq 里发了一条send("你输入的内容“)的作用。如果有相应的consumer连着kafka你就可以看到消费产生了。

kafka创建消息者

例:

producer

consumer

 

附、kafka新版本(2.两位数版本后)在kafka命令上发生的一些变化大全

后面在精讲kafka时我会把这些命令逐一做精讲。

启动
./kafka-server-start.sh -daemon ../config/server.properties
管理
## 创建主题(4个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
复制
查询
## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper
 
## topic列表查询
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
 
## topic列表查询(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
 
## 新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
 
## 新消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
 
## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
 
## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
 
## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
复制
发送和消费
## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
 
## 消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
 
## 新生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
 
## 新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
 
## 高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234  --max-messages 10
复制
平衡leader
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
复制
kafka自带压测命令
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092
复制
分区扩容
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
复制
迁移分区
1. 创建规则json
cat > increase-replication-factor.json <<EOF
"version":1, "partitions":[
"topic":"__consumer_offsets","partition":0,"replicas":[0,1],
"topic":"__consumer_offsets","partition":1,"replicas":[0,1],
"topic":"__consumer_offsets","partition":2,"replicas":[0,1],
"topic":"__consumer_offsets","partition":3,"replicas":[0,1],
"topic":"__consumer_offsets","partition":4,"replicas":[0,1],
"topic":"__consumer_offsets","partition":5,"replicas":[0,1],
"topic":"__consumer_offsets","partition":6,"replicas":[0,1],
"topic":"__consumer_offsets","partition":7,"replicas":[0,1],
"topic":"__consumer_offsets","partition":8,"replicas":[0,1],
"topic":"__consumer_offsets","partition":9,"replicas":[0,1],
"topic":"__consumer_offsets","partition":10,"replicas":[0,1],
"topic":"__consumer_offsets","partition":11,"replicas":[0,1],
"topic":"__consumer_offsets","partition":12,"replicas":[0,1],
"topic":"__consumer_offsets","partition":13,"replicas":[0,1],
"topic":"__consumer_offsets","partition":14,"replicas":[0,1],
"topic":"__consumer_offsets","partition":15,"replicas":[0,1],
"topic":"__consumer_offsets","partition":16,"replicas":[0,1],
"topic":"__consumer_offsets","partition":17,"replicas":[0,1],
"topic":"__consumer_offsets","partition":18,"replicas":[0,1],
"topic":"__consumer_offsets","partition":19,"replicas":[0,1],
"topic":"__consumer_offsets","partition":20,"replicas":[0,1],
"topic":"__consumer_offsets","partition":21,"replicas":[0,1],
"topic":"__consumer_offsets","partition":22,"replicas":[0,1],
"topic":"__consumer_offsets","partition":23,"replicas":[0,1],
"topic":"__consumer_offsets","partition":24,"replicas":[0,1],
"topic":"__consumer_offsets","partition":25,"replicas":[0,1],
"topic":"__consumer_offsets","partition":26,"replicas":[0,1],
"topic":"__consumer_offsets","partition":27,"replicas":[0,1],
"topic":"__consumer_offsets","partition":28,"replicas":[0,1],
"topic":"__consumer_offsets","partition":29,"replicas":[0,1],
"topic":"__consumer_offsets","partition":30,"replicas":[0,1],
"topic":"__consumer_offsets","partition":31,"replicas":[0,1],
"topic":"__consumer_offsets","partition":32,"replicas":[0,1],
"topic":"__consumer_offsets","partition":33,"replicas":[0,1],
"topic":"__consumer_offsets","partition":34,"replicas":[0,1],
"topic":"__consumer_offsets","partition":35,"replicas":[0,1],
"topic":"__consumer_offsets","partition":36,"replicas":[0,1],
"topic":"__consumer_offsets","partition":37,"replicas":[0,1],
"topic":"__consumer_offsets","partition":38,"replicas":[0,1],
"topic":"__consumer_offsets","partition":39,"replicas":[0,1],
"topic":"__consumer_offsets","partition":40,"replicas":[0,1],
"topic":"__consumer_offsets","partition":41,"replicas":[0,1],
"topic":"__consumer_offsets","partition":42,"replicas":[0,1],
"topic":"__consumer_offsets","partition":43,"replicas":[0,1],
"topic":"__consumer_offsets","partition":44,"replicas":[0,1],
"topic":"__consumer_offsets","partition":45,"replicas":[0,1],
"topic":"__consumer_offsets","partition":46,"replicas":[0,1],
"topic":"__consumer_offsets","partition":47,"replicas":[0,1],
"topic":"__consumer_offsets","partition":48,"replicas":[0,1],
"topic":"__consumer_offsets","partition":49,"replicas":[0,1]]

EOF
复制
2. 执行
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
复制
3. 验证
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
复制

以上是关于kafka搭建单机开发教程的主要内容,如果未能解决你的问题,请参考以下文章

Kafka单机搭建

kafka单机环境搭建

Kafka 单机和伪分布式集群搭建

在线预约小程序搭建教程8-教师详情页

mac 搭建单机版kafka完整过程-基于kafka3.0

在线预约小程序搭建教程8-教师详情页