安装Zookeeper和Kafka集群
Posted 南瓜慢说 www.pkslow.com
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了安装Zookeeper和Kafka集群相关的知识,希望对你有一定的参考价值。
安装Zookeeper和Kafka集群
本文介绍如何安装Zookeeper和Kafka集群。为了方便,介绍的是在一台服务器上的安装,实际应该安装在多台服务器上,但步骤是一样的。
安装Zookeeper集群
下载安装包
从官网上下载安装包:
curl https://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz -o apache-zookeeper-3.7.1-bin.tar.gz
解压:
tar xvf apache-zookeeper-3.7.1-bin.tar.gz
配置
创建目录 zk1
,然后添加如下配置:
zk1/myid
:
1
zk1/zk.config
:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/Users/larry/IdeaProjects/pkslow-samples/other/install-kafka-cluster/src/main/zookeeper/zk1
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
对于zk2
和zk3
也重复同样的步骤,并修改相应的配置:
zk2/myid
:
2
zk2/zk.config
:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/Users/larry/IdeaProjects/pkslow-samples/other/install-kafka-cluster/src/main/zookeeper/zk2
clientPort=2182
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
zk3/myid
:
3
zk3/zk.config
:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/Users/larry/IdeaProjects/pkslow-samples/other/install-kafka-cluster/src/main/zookeeper/zk3
clientPort=2183
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
启动集群
启动三个服务如下:
$ ./apache-zookeeper-3.7.1-bin/bin/zkServer.sh start ./zk1/zk.config
ZooKeeper JMX enabled by default
Using config: ./zk1/zk.config
Starting zookeeper ... STARTED
$ ./apache-zookeeper-3.7.1-bin/bin/zkServer.sh start ./zk2/zk.config
ZooKeeper JMX enabled by default
Using config: ./zk2/zk.config
Starting zookeeper ... STARTED
$ ./apache-zookeeper-3.7.1-bin/bin/zkServer.sh start ./zk3/zk.config
ZooKeeper JMX enabled by default
Using config: ./zk3/zk.config
Starting zookeeper ... STARTED
查看状态
通过status
命令查看:
$ ./apache-zookeeper-3.7.1-bin/bin/zkServer.sh status ./zk1/zk.config
ZooKeeper JMX enabled by default
Using config: ./zk1/zk.config
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
$ ./apache-zookeeper-3.7.1-bin/bin/zkServer.sh status ./zk2/zk.config
ZooKeeper JMX enabled by default
Using config: ./zk2/zk.config
Client port found: 2182. Client address: localhost. Client SSL: false.
Mode: leader
$ ./apache-zookeeper-3.7.1-bin/bin/zkServer.sh status ./zk3/zk.config
ZooKeeper JMX enabled by default
Using config: ./zk3/zk.config
Client port found: 2183. Client address: localhost. Client SSL: false.
Mode: follower
连接其中一个服务并添加数据:
$ ./apache-zookeeper-3.7.1-bin/bin/zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 0] create /pkslow
Created /pkslow
[zk: localhost:2181(CONNECTED) 1] create /pkslow/website www.pkslow.com
Created /pkslow/website
连接另外一个服务,并查看数据,发现与之前创建的是一样的:
$ ./apache-zookeeper-3.7.1-bin/bin/zkCli.sh -server localhost:2182
[zk: localhost:2182(CONNECTED) 1] get /pkslow/website
www.pkslow.com
目录结构如下:
安装Kafka集群
下载安装包
通过官网下载如下:
curl https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz -o kafka_2.13-3.4.0.tgz
解压安装包:
tar -xzf kafka_2.13-3.4.0.tgz
配置
Broker 1
的配置如下:
broker.id=1
port=9091
listeners=PLAINTEXT://:9091
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
log.dirs=/Users/larry/IdeaProjects/pkslow-samples/other/install-kafka-cluster/src/main/kafka/kafka1/kafka-logs
Broker 2
的配置如下:
broker.id=2
port=9092
listeners=PLAINTEXT://:9092
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
log.dirs=/Users/larry/IdeaProjects/pkslow-samples/other/install-kafka-cluster/src/main/kafka/kafka2/kafka-logs
Broker 3
的配置如下:
broker.id=3
port=9093
listeners=PLAINTEXT://:9093
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
log.dirs=/Users/larry/IdeaProjects/pkslow-samples/other/install-kafka-cluster/src/main/kafka/kafka3/kafka-logs
目录结构如下:
启动集群
启动kafka
服务如下:
./kafka_2.13-3.4.0/bin/kafka-server-start.sh ./kafka1/server.properties
./kafka_2.13-3.4.0/bin/kafka-server-start.sh ./kafka2/server.properties
./kafka_2.13-3.4.0/bin/kafka-server-start.sh ./kafka3/server.properties
检查如测试
创建topic:
$ kafka_2.13-3.4.0/bin/kafka-topics.sh --create --topic pkslow-topic --bootstrap-server localhost:9091,localhost:9092,localhost:9093 --partitions 3 --replication-factor 3
Created topic pkslow-topic.
列出topic:
$ kafka_2.13-3.4.0/bin/kafka-topics.sh --list --bootstrap-server localhost:9091,localhost:9092,localhost:9093
pkslow-topic
查看topic:
$ kafka_2.13-3.4.0/bin/kafka-topics.sh --describe --topic pkslow-topic --bootstrap-server localhost:9091,localhost:9092,localhost:9093
Topic: pkslow-topic TopicId: 7CLy7iZeRvm8rCrn8Dw_mA PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: pkslow-topic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: pkslow-topic Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: pkslow-topic Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
生产者发消息到brokers:
$ kafka_2.13-3.4.0/bin/kafka-console-producer.sh --broker-list localhost:9091,localhost:9092,localhost:9093 --topic pkslow-topic
>My name is Larry Deng.
>My website is www.pkslow.com.
>
消费者从brokers收消息:
$ kafka_2.13-3.4.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9091,localhost:9092,localhost:9093 --topic pkslow-topic --from-beginning
My name is Larry Deng.
My website is www.pkslow.com.
代码
配置可以参考 GitHub pkslow-samples
ELK之七------Zookeeper和Kafka安装与配置
一、zookeeper介绍:
ZooKeeper是一个分布式且开源的分布式应用程序协调服务。
zookeeper集群特性:整个集群种只要有超过集群数量一半的zookeeper工作只正常的,那么整个集群对外就是可用的,假如有2台服务器做了一个zookeeper集群,只要有任何一台故障或宕机,那么这个zookeeper集群就不可用了,因为剩下的一台没有超过集群一半的数量,但是假如有三台zookeeper组成一个集群,那么损坏一台就还剩两台,大于3台的一半,所以损坏一台还是可以正常运行的,但是再损坏一台就只剩一台集群就不可用了。那么要是4台组成一个zookeeper集群,损坏一台集群肯定是正常的,那么损坏两台就还剩两台,那么2台不大于集群数量的一半,所以3台的zookeeper集群和4台的zookeeper集群损坏两台的结果都是集群不可用,一次类推5台和6台以及7台和8台都是同理,所以这也就是为什么集群一般都是奇数的原因。
1、安装JDK环境
1、将下载的JDK包解压(有几个集群主机,就配置几个)
[root@web1 src]# tar xvf jdk-8u212-linux-x64.tar.gz [root@web1 src]# ln -s /usr/local/src/jdk1.8.0_212/ /usr/local/jdk [root@web1 src]# ln -s /usr/local/jdk/bin/java /usr/bin
2、配置JDK环境
[root@web1 src]# vim /etc/profile.d/jdk.sh export HISTTIMEFORMAT="%F %T `whoami`" export export LANG="en_US.utf-8" export JAVA_HOME=/usr/local/jdk export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin [root@web1 src]# . /etc/profile.d/jdk.sh
2、zookeeper安装
zookeeper 下载地址:http://zookeeper.apache.org/releases.html
不要下载最新和太旧的版本,尽量使用较新的版本。
1、将下载下来的zookeeper包进行解压,并创建软链接。(有几个集群主机,配置几个)
[root@web1 src]# tar xvf zookeeper-3.4.14.tar.gz [root@web1 src]# ln -s /usr/local/src/zookeeper-3.4.14 /usr/local/zookeeper
2、修改zookeeper配置文件,实现zookeeper集群功能
[root@web1 conf]# pwd /usr/local/src/zookeeper-3.4.14/conf [root@web1 conf]# cp zoo_sample.cfg zoo.cfg # 复制zookeeper配置文件 [root@web1 conf]# mkdir /usr/local/zookeeper/data # 有几个zookeeper集群就创建一个目录,存放zookeeper数据的位置 [root@web1 conf]# vim zoo.cfg # 修改zookeeper配置文件 dataDir=/usr/local/zookeeper/data # 自定义zookeeper保存数据目录 clientPort=2181 # 客户端端口号 maxClientCnxns=4096 #客户端最大连接数 autopurge.snapRetainCount=512 # 设置zookeeper保存多少次的客户端数据 autopurge.purgeInterval=1 # 设置zookeeper间隔多少小时清理一次保存的客户端数据 server.1=192.168.7.104:2888:3888 # 如果有三个集群,就写三个对应的IP地址。 server.2=192.168.7.105:2888:3888
3、对每个zookeeper集群设置一个myid,必须与上面的zoo.cfg配置文件下面的server.xxx的ID对应,有几个集群就需要设置几个ID。
[root@web1 conf]# echo 1 > /usr/local/zookeeper/data/myid [root@tomcat-web2]# echo 2 > /usr/local/zookeeper/data/myid
4、启动zookeeper服务
[root@web1 ~]# /usr/local/zookeeper/bin/zkServer.sh start #启动zookeeper服务器 [root@web1 ~]# /usr/local/zookeeper/bin/zkServer.sh status # 查看启动的状态,如果不是以下的状态,说明集群失败 ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: follower # 从服务器 [root@tomcat-web2 bin]# /usr/local/zookeeper/bin/zkServer.sh start [root@tomcat-web2 bin]# /usr/local/zookeeper/bin/zkServer.sh status #查看zookeeper启动状态 ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: leader # 主服务器
5、将zookeeper设置为开机启动
[root@tomcat-web2 src]# vim /etc/rc.d/rc.local /usr/local/zookeeper/bin/zkServer.sh start # 设置为开机启动 [root@tomcat-web2 src]# chmod +x /etc/rc.d/rc.local #加上执行权限
二、kafka简介:
Kafka 被称为下一代分布式消息系统,是非营利性组织ASF(Apache Software Foundation,简称为ASF)基金会中的一个开源项目,比如HTTP Server、Hadoop、ActiveMQ、Tomcat等开源软件都属于Apache基金会的开源软件,类似的消息系统还有RbbitMQ、ActiveMQ、ZeroMQ,最主要的优势是其具备分布式功能、并且结合zookeeper可以实现动态扩容。
kafka要想正常运行,必须配置zookeeper,否则无论是kafka集群还是客户端的生存者和消费者都无法正常的工作的;所以需要配置启动zookeeper服务。
官方文档:http://www.infoq.com/cn/articles/apache-kafka
kafka术语解释
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
Partition
parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
Producer
负责发布消息到Kafka broker
Consumer
消费消息。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
1、下载并安装kafka
kafka下载地址:http://kafka.apache.org/downloads.html
1、将下载的kafka包进行解压,并创建软链接(有几个集群主机就配置几个)
[root@tomcat-web2 src]# tar xvf kafka_2.12-2.1.0.tgz #解压kafka包 [root@web1 src]# ln -s /usr/local/src/kafka_2.12-2.1.0 /usr/local/kafka # 设置软链接
2、修改kafka配置文件(有几个集群主机,就配置几个)
[root@tomcat-web2 kafka_2.12-2.1.0]# pwd /usr/local/src/kafka_2.12-2.1.0 [root@tomcat-web2 kafka_2.12-2.1.0]# vim config/server.properties # 修改kafka配置文件 broker.id=1 # 与zookeeper的ID一致,有几个kafka主机,就在每个kafka写入对应的zookeeper主机ID listeners=PLAINTEXT://192.168.7.104:9092 # 写成本地的服务器地址,针对不同的集群,在不同的kafka主机写对应的IP地址 log.dirs=/usr/local/kafka/kafka-logs # 日志存放目录 log.retention.hours=24 # 保留日志的时间,以小时为单位 zookeeper.connect=192.168.7.104:2181,192.168.7.105:2181 # 设置与zookeeper主机的连接地址,有几个集群,就配置几个IP地址。
3、启动kafka服务,以守护进程的方式启动
# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
查看此时kafka启动状态,第一台kafka已经启动
查看第二台kafka主机状态,也已经启动
4、将kafka设置为开机启动(经测试暂时未能成功,但是可以在开机启动时候使用kafka的service命令将kafka启动)
(1)配置kafka的环境变量
[root@web1 init.d]# vim /etc/profile.d/kafka.sh export KAFKA_HOME="/usr/local/kafka" export PATH="${KAFKA_HOME}/bin:$PATH" [root@web1 init.d]# . /etc/profile.d/kafka.sh
(2)在/etc/init.d/目录下编辑一个启动脚,并加上执行权限chmod +x kafka
#!/bin/sh # # chkconfig: 345 99 01 # description: Kafka # # File : Kafka # # Description: Starts and stops the Kafka server # source /etc/rc.d/init.d/functions KAFKA_HOME=/usr/local/kafka # 修改kafka的存放路径 KAFKA_USER=root # 使用root启动 export LOG_DIR=/usr/local/kafka/kafka-logs # 定义kafka的log存放位置,与前面的kafka配置文件有关 [ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka # See how we were called. case "$1" in start) echo -n "Starting Kafka:" /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &" echo " done." exit 0 ;; stop) echo -n "Stopping Kafka: " /sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk ‘{print $2}‘ | xargs kill" echo " done." exit 0 ;; hardstop) echo -n "Stopping (hard) Kafka: " /sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk ‘{print $2}‘ | xargs kill -9" echo " done." exit 0 ;; status) c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk ‘{print $2}‘` if [ "$c_pid" = "" ] ; then echo "Stopped" exit 3 else echo "Running $c_pid" exit 0 fi ;; restart) stop start ;; *) echo "Usage: kafka {start|stop|hardstop|status|restart}" exit 1 ;; esac
重新加载配置文件
# systemctl daemon-reload # service kafka start # 启动kafka
(3)将kafka添加到服务上,然后设置为开机启动
# chkconfig --add kafka # 添加到服务上 # chkconfig kafka on #设置为开机启动
2、基于logstash测试kafka和zookeeper
1、在logstash服务器的/etc/logstash/conf.d目录下创建一个测试kafka的标准输出、标准输入文件
input { stdin {} } output { kafka { topic_id => "hello" bootstrap_servers => "192.168.7.104:9092" # IP地址为kafka的地址以及监听的端口号,测试其他集群的kafka主机,需要修改IP地址即可。 batch_size => 5 } stdout { codec => rubydebug } }
2、开始测是kafka的日志文件是否可以传到logstash服务器上
[root@logstash conf.d]# logstash -f log-to-kafka.conf # 测试配置文件 { "host" => "logstash", "message" => "hello world", # 输入的hello world已经可以在logstash显示 "@version" => "1", "@timestamp" => 2020-03-15T03:30:03.426Z } nihao { "host" => "logstash", "message" => "nihao", # 输入的nihao也已经可以在logstash服务上显示 "@version" => "1", "@timestamp" => 2020-03-15T03:30:12.118Z }
以上是关于安装Zookeeper和Kafka集群的主要内容,如果未能解决你的问题,请参考以下文章