kafka集群部署

Posted vnx

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka集群部署相关的知识,希望对你有一定的参考价值。

版本:kafka_2.10-0.10.1.1 + zookeeper-3.4.9

说明:kafka自带了zookeeper,但是推荐使用独立的zookeeper集群。

前置环境:(参考Hadoop搭建)

1、节点配分配:
192.168.235.138 node1
192.168.235.139 node2
192.168.235.140 node3
2、配置静态IP、hosts、hostname、关闭防火墙、安装JDK
View Code

1、zookeeper集群部署

(1)安装

上传文件到指定集群上,执行如下步骤

# 解压文件到/usr/local/目录下
# tar -zxvf zookeeper-3.4.9.tar.gz -C /usr/local/
# 删除无用文件(可选)
# cd /usr/local/zookeeper-3.4.9/
# find . -name "*.cmd" -exec rm -rf {} \\;
# rm -rf docs/
# 创建数据、日志存放目录及当前节点ID
# mkdir data 
# mkdir dataLog
# echo "1" > data/myid
View Code

(2)配置

# pwd
/usr/local/zookeeper-3.4.9
# cp conf/zoo_sample.cfg conf/kafka_zk.cfg 
# vim conf/kafka_zk.cfg 
# 在kafka_zk.cfg文件中加入如下配置:
tickTime=2000
# 数据文件存放位置
dataDir=/usr/local/zookeeper-3.4.9/data
dataLogDir=/usr/local/zookeeper-3.4.9/dataLog
#服务监听端口
clientPort=2181
#选举等待时间
initLimit=5
syncLimit=2
#集群节点信息
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
View Code

(3)分发文件、修改对应节点的ID

scp -r zookeeper-3.4.9 root@node2:/usr/local/
scp -r zookeeper-3.4.9 root@node3:/usr/local/
View Code

修改node2和node3节点上/usr/local/zookeeper-3.4.9/data目录下的myid文件内容分别为2、3

(4)查看各节点时间并同步时钟

# date  #系统时间,系统重启失效
2017年 04月 14日 星期五 11:28:08 CST
# 手动更改系统时间
# date -s \'2017/04/14 11:33:50\'
# clock -r    #读取CMOS时间
2017年04月14日 星期五 19时34分33秒  -1.038372 seconds
# clock -w   #将当前时间写入CMOS
View Code

注:集群各节点之间的时间同步很重要,必要时部署本地集群的时钟服务器

(5)启动集群

# /usr/local/zookeeper-3.4.9/bin/zkServer.sh start /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg #启动(指定配置文件)
# jps  #查看进程
15286 QuorumPeerMain
15319 Jps
# /usr/local/zookeeper-3.4.9/bin/zkServer.sh status /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg   #查看状态
# /usr/local/zookeeper-3.4.9/bin/zkServer.sh stop /usr/local/zookeeper-3.4.9/conf/kafka_zk.cfg     #关闭 
View Code

注意:

  i 最好在启动、关闭时都指定配置文件,一方面,集群中可能有多个配置文件,另一方面,暴力kill进程可能会带来意想不到的错误。

  ii jps只能查看进程是否存在,在某些情况下进程存在不代表zk集群处于可用状态,可通过查看zk状态或者用自带的客户端连接确认是

否处于可用状态。连接示例如下:

# /usr/local/zookeeper-3.4.9/bin/zkCli.sh -server node1:2181
日志。。。。
[zk: node1:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: node1:2181(CONNECTED) 1] help  #命令参考
ZooKeeper -server host:port cmd args
    connect host:port
    get path [watch]
    ls path [watch]
    set path data [version]
    rmr path
    delquota [-n|-b] path
    quit 
    printwatches on|off
    create [-s] [-e] path data acl
    stat path [watch]
    close 
    ls2 path [watch]
    history 
    listquota path
    setAcl path acl
    getAcl path
    sync path
    redo cmdno
    addauth scheme auth
    delete path [version]
    setquota -n|-b val path
退出:quit 或 Ctrl + C/D
View Code

(6)zk自带工具介绍

zookeeper自带的工具如下:

# ls bin/
  zkCleanup.sh  zkCli.sh  zkEnv.sh  zkServer.sh
# zkCleanup.sh :zookeeper不会自己删除旧的快照和日志文件,需要人工清除,可以自己写脚本清除或者用zk自带的zkCleanup工具。
# zkCli.sh :zk客户端
# zkEnv.sh : 主要配置,zookeeper集群启动时配置环境变量的文件
# zkServer.sh 管理程序文件
View Code

日志管理是后期维护的重要一环,要特别注意。至此,zk集群搭建完毕。

2、kafka集群

(1)安装

上传文件到集群节点

tar -zxvf kafka_2.10-0.10.1.1.tgz -C /usr/local/
cd /usr/local/
mv kafka_2.10-0.10.1.1 kafka
View Code

(2)配置

# cd /usr/local/kafka/
# mkdir logs                      #预设日志存放目录
# ll config/ | awk \'{print $9}\'   #配置文件列表
connect-console-sink.properties
connect-console-source.properties
connect-distributed.properties
connect-file-sink.properties
connect-file-source.properties
connect-log4j.properties
connect-standalone.properties
consumer.properties         #消费者配置选项
log4j.properties
producer.properties         #生产者配置选项
server.properties            #主要配置文件
tools-log4j.properties
zookeeper.properties
# vim config/server.properties 
修改如下参数,没有则添加
broker.id=0                 
host.name=192.168.235.138  
log.dirs=/usr/local/kafka/logs   

#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

zookeeper.connect=node1:2181,node2:2181,node3:2181
View Code

补充:server.properties可选参数如下

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
View Code

注意:每个节点对应一个broker id,并和IP一一对应

(3)启动服务

# cd kafka/
# bin/kafka-server-start.sh config/server.properties 
# jps
View Code

(4)测试

创建一个主题,并用node1作为消息生产者(发布者),node2、node3作为消息消费者(订阅者)。

在node1节点上:

#启动服务
# bin/kafka-server-start.sh -daemon config/server.properties  
# jps
# 创建一个主题test:一个分区,两个副本
# bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partition 1 --topic test   
# 创建一个生产者(消息发布者)
# bin/kafka-console-producer.sh --broker-list node1:9092 --topic test
等待输入要发布的消息。。。

在node2、node3节点上:

# bin/kafka-server-start.sh -daemon config/server.properties 
# 创建一个消费者(消息订阅者) # bin
/kafka-console-consumer.sh --zookeeper node1:2181 --topic test --from beginning
等待接收消息。。。

在node1的发布频道里输入消息,可在node2、node3上接收到对应的消息。

其他一些常用命令参考:

查看主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看主题详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
删除主题
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
生产者参数查看:bin/kafka-console-producer.sh
消费者参数查看:bin/kafka-console-consumer.sh

(5)补充:

日志说明:

server.log #kafka的运行日志
state-change.log  #kafka他是用zookeeper来保存状态,所以他可能会进行切换,切换的日志就保存在这里
controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.

zookeeper查看:

# bin/zkCli.sh 
...
[zk: localhost:2181(CONNECTED) 0] ls /
[isr_change_notification, zookeeper, admin, consumers, cluster, config, controller, brokers, controller_epoch]
每个节点上都可以通过zookeeper查看kafka的节点注册信息

 

官方文档:http://kafka.apache.org/documentation.html

参考:http://www.cnblogs.com/luotianshuai/p/5206662.html

 

以上是关于kafka集群部署的主要内容,如果未能解决你的问题,请参考以下文章

Kafka(四)集群之kafka

Kafka快速入门——Kafka集群部署

Kafka快速入门——Kafka集群部署

kafka集群的部署及kafka监控工具

k8s部署Kafka集群

解开Kafka神秘的面纱:kafka单机部署和集群部署