kafka搭建单机开发教程
Posted TGITCIC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka搭建单机开发教程相关的知识,希望对你有一定的参考价值。
下载安装
下载apache kafka2.11及以上版本Apache Kafka
下载后解压成这样的形式
改配置(单机非常的简单)
对于/conf/目录下新建一个properties文件叫server.properties文件即可。集群我后面单独会讲,那块有点篇幅的但不适合刚入手的开发,我们先把Kafka转起来再说。
server.properties
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://127.0.0.1:9092
port=9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
host.name=127.0.0.1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/Users/chrishu123126.com/opt/kafka2.11/logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
关键参数解释:
- broker.id=0,只有一个kafka broker
- delete.topic.enable=true,这个值如果不设,你运行kafka 里的删除topic命令是没用的,还需要自己跑到zk里去把topic删了;
- listeners=PLAINTEXT://127.0.0.1:9092,kafka的监听地址,这边是本机,如果是连接kafka和kafka不在同一台服务器上那么你需要在此处把127.0.0.1换成这台机器的局域网ip;
- port=9092,向远程暴露kafka服务的地址;
- log.dirs=/Users/chrishu123126.com/opt/kafka2.11/logs,不仅仅是kafka的log日志,kafka的topic里的暂存信息也是用的这个目录,它实际是一个log日志+kafka的data文件所在路径;
- num.partitions=3,不管是单机还是多机生产集群,这个最小数目是3.我们在后期说多机生产集群环境时,会详细述说kafka里的partition、broker、consumer几者间的关系,这三个概念是很重要的;
- zookeeper.connect=localhost:2181,localhost:2182,localhost:2183,必须指定。kafka的节点间的信息是依靠zk去做协调的;
- zookeeper.connection.timeout.ms=6000,ZK连接超时,一般为2-6s间即可;
zk集群本机搭建
这个太简单了,在此篇博文中就不作展开,网上有太多例子
我这边直接给出搭建所用的全配置。
先设3个zoo.cfg
分别命名为zoo1.cfg、zoo2.cfg、zoo3.cfg
zoo1.cfg
# The number of milliseconds of each tick
#zk时间单元(毫秒)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# floower启动过程中从leader同部数据的时间限制
#如果集群规模大,数据量多的话,适当调大此参数
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk1
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk1
clientPort=2181
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#leader不接受客户端连接,专注于通信和选举等
leaderServes=no
maxClientCnxns=1000
forceSync=no
zoo2.cfg
# The number of milliseconds of each tick
#zk时间单元(毫秒)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# floower启动过程中从leader同部数据的时间限制
#如果集群规模大,数据量多的话,适当调大此参数
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk2
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk2
clientPort=2182
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#leader不接受客户端连接,专注于通信和选举等
leaderServes=no
maxClientCnxns=1000
forceSync=no
zoo3.cfg
# The number of milliseconds of each tick
#zk时间单元(毫秒)
tickTime=5000
# The number of ticks that the initial
# synchronization phase can take
# floower启动过程中从leader同部数据的时间限制
#如果集群规模大,数据量多的话,适当调大此参数
initLimit=50
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=6
dataDir=/Users/chrishu123126.com/opt/zookeeper/data/zk3
dataLogDir=/Users/chrishu123126.com/opt/zookeeper/logs/zk3
clientPort=2183
server.1=localhost:2187:2887
server.2=localhost:2188:2888
server.3=localhost:2189:2889
#leader不接受客户端连接,专注于通信和选举等
leaderServes=no
maxClientCnxns=1000
forceSync=no
开始搭建ZK集群
我们用的是zookeeper3.4.3
输入如下命令
touch data/zk1/myid
touch data/zk2/myid
touch data/zk3/myid
这样在每个/data/zk1这样的目录下会有一个空的文件叫myid
接下来再输入以下命令
echo 1 > data/zk1/myid
echo 2 > data/zk2/myid
echo 3 > data/zk3/myid
启动zk群
sh zkServer.sh start ../conf/zoo1.cfg
sh zkServer.sh start ../conf/zoo2.cfg
sh zkServer.sh start ../conf/zoo3.cfg
下面给出一条命令启动、一条命令关闭整个zk群的命令。
zk-start.sh
#!/bin/sh
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo1.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo2.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh start /Users/chrishu123126.com/opt/zookeeper/conf/zoo3.cfg
zk-stop.sh
#!/bin/sh
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo1.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo2.cfg
/Users/chrishu123126.com/opt/zookeeper/bin/zkServer.sh stop /Users/chrishu123126.com/opt/zookeeper/conf/zoo3.cfg
运行kafka
./kafka-server-start.sh -daemon ../config/server.properties
如何创建一个kafka的topic和消费呢?
kafka创建producer
## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
命令结束后,你会得到一个command窗口。
在这个command窗口直接输入任意内容,即相当于在mq 里发了一条send("你输入的内容“)的作用。如果有相应的consumer连着kafka你就可以看到消费产生了。
kafka创建消息者
例:
producer
consumer
附、kafka新版本(2.两位数版本后)在kafka命令上发生的一些变化大全
后面在精讲kafka时我会把这些命令逐一做精讲。
启动
./kafka-server-start.sh -daemon ../config/server.properties
管理
## 创建主题(4个分区,2个副本)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic test
复制
查询
## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper
## topic列表查询
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
## topic列表查询(支持0.9版本+)
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
## 新消费者列表查询(支持0.9版本+)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
## 新消费者列表查询(支持0.10版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
## 显示某个消费组的消费详情(仅支持offset存储在zookeeper上的)
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test
## 显示某个消费组的消费详情(0.9版本 - 0.10.1.0 之前)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
## 显示某个消费组的消费详情(0.10.1.0版本+)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
复制
发送和消费
## 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
## 消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
## 新生产者(支持0.9版本+)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties
## 新消费者(支持0.9版本+)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
## 高级点的用法
bin/kafka-simple-consumer-shell.sh --brist localhost:9092 --topic test --partition 0 --offset 1234 --max-messages 10
复制
平衡leader
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
复制
kafka自带压测命令
bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
复制
分区扩容
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
复制
迁移分区
1. 创建规则json
cat > increase-replication-factor.json <<EOF
"version":1, "partitions":[
"topic":"__consumer_offsets","partition":0,"replicas":[0,1],
"topic":"__consumer_offsets","partition":1,"replicas":[0,1],
"topic":"__consumer_offsets","partition":2,"replicas":[0,1],
"topic":"__consumer_offsets","partition":3,"replicas":[0,1],
"topic":"__consumer_offsets","partition":4,"replicas":[0,1],
"topic":"__consumer_offsets","partition":5,"replicas":[0,1],
"topic":"__consumer_offsets","partition":6,"replicas":[0,1],
"topic":"__consumer_offsets","partition":7,"replicas":[0,1],
"topic":"__consumer_offsets","partition":8,"replicas":[0,1],
"topic":"__consumer_offsets","partition":9,"replicas":[0,1],
"topic":"__consumer_offsets","partition":10,"replicas":[0,1],
"topic":"__consumer_offsets","partition":11,"replicas":[0,1],
"topic":"__consumer_offsets","partition":12,"replicas":[0,1],
"topic":"__consumer_offsets","partition":13,"replicas":[0,1],
"topic":"__consumer_offsets","partition":14,"replicas":[0,1],
"topic":"__consumer_offsets","partition":15,"replicas":[0,1],
"topic":"__consumer_offsets","partition":16,"replicas":[0,1],
"topic":"__consumer_offsets","partition":17,"replicas":[0,1],
"topic":"__consumer_offsets","partition":18,"replicas":[0,1],
"topic":"__consumer_offsets","partition":19,"replicas":[0,1],
"topic":"__consumer_offsets","partition":20,"replicas":[0,1],
"topic":"__consumer_offsets","partition":21,"replicas":[0,1],
"topic":"__consumer_offsets","partition":22,"replicas":[0,1],
"topic":"__consumer_offsets","partition":23,"replicas":[0,1],
"topic":"__consumer_offsets","partition":24,"replicas":[0,1],
"topic":"__consumer_offsets","partition":25,"replicas":[0,1],
"topic":"__consumer_offsets","partition":26,"replicas":[0,1],
"topic":"__consumer_offsets","partition":27,"replicas":[0,1],
"topic":"__consumer_offsets","partition":28,"replicas":[0,1],
"topic":"__consumer_offsets","partition":29,"replicas":[0,1],
"topic":"__consumer_offsets","partition":30,"replicas":[0,1],
"topic":"__consumer_offsets","partition":31,"replicas":[0,1],
"topic":"__consumer_offsets","partition":32,"replicas":[0,1],
"topic":"__consumer_offsets","partition":33,"replicas":[0,1],
"topic":"__consumer_offsets","partition":34,"replicas":[0,1],
"topic":"__consumer_offsets","partition":35,"replicas":[0,1],
"topic":"__consumer_offsets","partition":36,"replicas":[0,1],
"topic":"__consumer_offsets","partition":37,"replicas":[0,1],
"topic":"__consumer_offsets","partition":38,"replicas":[0,1],
"topic":"__consumer_offsets","partition":39,"replicas":[0,1],
"topic":"__consumer_offsets","partition":40,"replicas":[0,1],
"topic":"__consumer_offsets","partition":41,"replicas":[0,1],
"topic":"__consumer_offsets","partition":42,"replicas":[0,1],
"topic":"__consumer_offsets","partition":43,"replicas":[0,1],
"topic":"__consumer_offsets","partition":44,"replicas":[0,1],
"topic":"__consumer_offsets","partition":45,"replicas":[0,1],
"topic":"__consumer_offsets","partition":46,"replicas":[0,1],
"topic":"__consumer_offsets","partition":47,"replicas":[0,1],
"topic":"__consumer_offsets","partition":48,"replicas":[0,1],
"topic":"__consumer_offsets","partition":49,"replicas":[0,1]]
EOF
复制
2. 执行
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
复制
3. 验证
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
复制
以上是关于kafka搭建单机开发教程的主要内容,如果未能解决你的问题,请参考以下文章