kafka集群部署
Posted vnx
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka集群部署相关的知识,希望对你有一定的参考价值。
版本:kafka_2.10-0.10.1.1 + zookeeper-3.4.9
说明:kafka自带了zookeeper,但是推荐使用独立的zookeeper集群。
前置环境:(参考Hadoop搭建)
1、节点配分配: 192.168.235.138 node1 192.168.235.139 node2 192.168.235.140 node3 2、配置静态IP、hosts、hostname、关闭防火墙、安装JDK
1、zookeeper集群部署
(1)安装
上传文件到指定集群上,执行如下步骤
# 解压文件到/usr/local/目录下 # tar -zxvf zookeeper-3.4.9.tar.gz -C /usr/local/ # 删除无用文件(可选) # cd /usr/local/zookeeper-3.4.9/ # find . -name "*.cmd" -exec rm -rf {} \\; # rm -rf docs/ # 创建数据、日志存放目录及当前节点ID # mkdir data # mkdir dataLog # echo "1" > data/myid
(2)配置
# pwd /usr/local/zookeeper-3.4.9 # cp conf/zoo_sample.cfg conf/kafka_zk.cfg # vim conf/kafka_zk.cfg # 在kafka_zk.cfg文件中加入如下配置: tickTime=2000 # 数据文件存放位置 dataDir=/usr/local/zookeeper-3.4.9/data dataLogDir=/usr/local/zookeeper-3.4.9/dataLog #服务监听端口 clientPort=2181 #选举等待时间 initLimit=5 syncLimit=2 #集群节点信息 server.1=node1:2888:3888 server.2=node2:2888:3888 server.3=node3:2888:3888
(3)分发文件、修改对应节点的ID
scp -r zookeeper-3.4.9 root@node2:/usr/local/ scp -r zookeeper-3.4.9 root@node3:/usr/local/
修改node2和node3节点上/usr/local/zookeeper-3.4.9/data目录下的myid文件内容分别为2、3
(4)查看各节点时间并同步时钟
# date #系统时间,系统重启失效 2017年 04月 14日 星期五 11:28:08 CST # 手动更改系统时间 # date -s \'2017/04/14 11:33:50\' # clock -r #读取CMOS时间 2017年04月14日 星期五 19时34分33秒 -1.038372 seconds # clock -w #将当前时间写入CMOS
注:集群各节点之间的时间同步很重要,必要时部署本地集群的时钟服务器
(5)启动集群
# /usr/local/zookeeper-3.4.9/bin/zkServer.sh start /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg #启动(指定配置文件) # jps #查看进程 15286 QuorumPeerMain 15319 Jps # /usr/local/zookeeper-3.4.9/bin/zkServer.sh status /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg #查看状态 # /usr/local/zookeeper-3.4.9/bin/zkServer.sh stop /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg #关闭
注意:
i 最好在启动、关闭时都指定配置文件,一方面,集群中可能有多个配置文件,另一方面,暴力kill进程可能会带来意想不到的错误。
ii jps只能查看进程是否存在,在某些情况下进程存在不代表zk集群处于可用状态,可通过查看zk状态或者用自带的客户端连接确认是
否处于可用状态。连接示例如下:
# /usr/local/zookeeper-3.4.9/bin/zkCli.sh -server node1:2181 日志。。。。 [zk: node1:2181(CONNECTED) 0] ls / [zookeeper] [zk: node1:2181(CONNECTED) 1] help #命令参考 ZooKeeper -server host:port cmd args connect host:port get path [watch] ls path [watch] set path data [version] rmr path delquota [-n|-b] path quit printwatches on|off create [-s] [-e] path data acl stat path [watch] close ls2 path [watch] history listquota path setAcl path acl getAcl path sync path redo cmdno addauth scheme auth delete path [version] setquota -n|-b val path 退出:quit 或 Ctrl + C/D
(6)zk自带工具介绍
zookeeper自带的工具如下:
# ls bin/
zkCleanup.sh zkCli.sh zkEnv.sh zkServer.sh
# zkCleanup.sh :zookeeper不会自己删除旧的快照和日志文件,需要人工清除,可以自己写脚本清除或者用zk自带的zkCleanup工具。
# zkCli.sh :zk客户端
# zkEnv.sh : 主要配置,zookeeper集群启动时配置环境变量的文件
# zkServer.sh 管理程序文件
日志管理是后期维护的重要一环,要特别注意。至此,zk集群搭建完毕。
2、kafka集群
(1)安装
上传文件到集群节点
tar -zxvf kafka_2.10-0.10.1.1.tgz -C /usr/local/ cd /usr/local/ mv kafka_2.10-0.10.1.1 kafka
(2)配置
# cd /usr/local/kafka/ # mkdir logs #预设日志存放目录 # ll config/ | awk \'{print $9}\' #配置文件列表 connect-console-sink.properties connect-console-source.properties connect-distributed.properties connect-file-sink.properties connect-file-source.properties connect-log4j.properties connect-standalone.properties consumer.properties #消费者配置选项 log4j.properties producer.properties #生产者配置选项 server.properties #主要配置文件 tools-log4j.properties zookeeper.properties # vim config/server.properties 修改如下参数,没有则添加 broker.id=0 host.name=192.168.235.138 log.dirs=/usr/local/kafka/logs #在log.retention.hours=168 下面新增下面三项 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 zookeeper.connect=node1:2181,node2:2181,node3:2181
补充:server.properties可选参数如下
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样 port=19092 #当前kafka对外提供服务的端口默认是9092 host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。 num.network.threads=3 #这个是borker进行网络处理的线程数 num.io.threads=8 #这个是borker进行I/O处理的线程数 log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘 socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 num.partitions=1 #默认的分区数,一个topic默认1个分区数 log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务 replica.fetch.max.bytes=5242880 #取消息的最大直接数 log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除 log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能 zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
注意:每个节点对应一个broker id,并和IP一一对应
(3)启动服务
# cd kafka/ # bin/kafka-server-start.sh config/server.properties # jps
(4)测试
创建一个主题,并用node1作为消息生产者(发布者),node2、node3作为消息消费者(订阅者)。
在node1节点上:
#启动服务 # bin/kafka-server-start.sh -daemon config/server.properties # jps # 创建一个主题test:一个分区,两个副本 # bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partition 1 --topic test # 创建一个生产者(消息发布者) # bin/kafka-console-producer.sh --broker-list node1:9092 --topic test 等待输入要发布的消息。。。
在node2、node3节点上:
# bin/kafka-server-start.sh -daemon config/server.properties
# 创建一个消费者(消息订阅者) # bin/kafka-console-consumer.sh --zookeeper node1:2181 --topic test --from beginning
等待接收消息。。。
在node1的发布频道里输入消息,可在node2、node3上接收到对应的消息。
其他一些常用命令参考:
查看主题 bin/kafka-topics.sh --list --zookeeper localhost:2181 查看主题详情 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test 删除主题 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test 生产者参数查看:bin/kafka-console-producer.sh 消费者参数查看:bin/kafka-console-consumer.sh
(5)补充:
日志说明:
server.log #kafka的运行日志 state-change.log #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里 controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
zookeeper查看:
# bin/zkCli.sh ... [zk: localhost:2181(CONNECTED) 0] ls / [isr_change_notification, zookeeper, admin, consumers, cluster, config, controller, brokers, controller_epoch] 每个节点上都可以通过zookeeper查看kafka的节点注册信息
官方文档:http://kafka.apache.org/documentation.html
参考:http://www.cnblogs.com/luotianshuai/p/5206662.html
以上是关于kafka集群部署的主要内容,如果未能解决你的问题,请参考以下文章