zookeeper和kafka安装
Posted 小怪獣55
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper和kafka安装相关的知识,希望对你有一定的参考价值。
1.介绍
1.1kafka介绍:https://www.infoq.cn/article/apache-kafka/
Kafka 被称为下一代分布式消息系统,是非营利性组织ASF(Apache Software Foundation,简称为ASF)基金会中的一个开源项目,比如HTTP Server、Hadoop、ActiveMQ、Tomcat等开源软件都属于Apache基金会的开源软件,类似的消息系统还有RbbitMQ、ActiveMQ、ZeroMQ,最主要的优势是其具备分布式功能、并且结合zookeeper可以实现动态扩容。
kifka下载地址:https://kafka.apache.org/downloads.html
zookeeper下载地址:http://zookeeper.apache.org/releases.html
1.2 ZooKeeper是一个分布式且开源的应用程序协调服务。
zookeeper集群特性
整个集群中只要有超过集群数量一半的zookeeper工作是正常的,那么整个集群
对外就是可用的,
假如有2台服务器做了一个zookeeper集群,只要有任何一台故障或宕机,
那么这个zookeeper集群就不可用了,因为剩下的一台没有超过集群一半的数量,
假如有三台zookeeper组成一个集群,那么损坏一台就还剩两台,大于3台的一半,
所以损坏一台还是可以正常运行的,但是再损坏一台就只剩一台集群就不可用了。
要是4台组成一个zookeeper集群,损坏一台集群肯定是正常的,那么损坏两台
就还剩两台,那么2台不大于集群数量的一半,所以3台的zookeeper集群和4台的
zookeeper集群损坏两台的结果都是集群不可用,
依次类推5台和6台以及7台和8台都是同理,
所以这也就是为什么集群一般都是奇数的原因。
2.zookeeper安装
1.jdk安装,参考:https://blog.51cto.com/u_14814545/4898618
2.zookeeper安装
cd /usr/local/src
tar xf zookeeper-3.4.14.tar.gz
ln -s /usr/local/src/zookeeper-3.4.14 /usr/local/zookeeper
mkdir /usr/local/zookeeper/data/ -p
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
vim /usr/local/zookeeper/conf/zoo.cfg
------------------------------------------
dataDir=/usr/local/zookeeper/data
maxClientCnxns=4096
autopurge.snapRetainCount=512
autopurge.purgeInterval=1
server.1=192.168.47.111:2888:3888
server.2=192.168.47.112:2888:3888
server.3=192.168.47.113:2888:3888
------------------------------------------
echo 1 > /usr/local/zookeeper/data/myid #111主机
echo 2 > /usr/local/zookeeper/data/myid #112主机
echo 3 > /usr/local/zookeeper/data/myid #113主机
参数说明
tickTime=2000
#服务器与服务器之间和客户端与服务器之间的单次心跳检测时间间隔,
单位为毫秒
initLimit=5
#集群中leader服务器与follower服务器初始连接心跳次数,即多少个2000毫秒
syncLimit=5
# leader与follower之间连接完成之后,后期检测发送和应答的心跳次数,
如果该follower 在设置的时间内(5*2000)不能与leader 进行通信,
那么此follower 将被视为不可用。
clientPort=2181
#客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,
接受客户端的访问请求
dataDir=/usr/local/zookeeper/data
#自定义的zookeeper保存数据的目录
autopurge.snapRetainCount=3
#设置zookeeper保存保留多少次客户端连接的数据
autopurge.purgeInterval=1
#设置zookeeper间隔多少小时清理一次保存的客户端数据
server.1=192.168.15.211:2888:3888
server.2=192.168.15.212:2888:3888
server.3=192.168.15.213:2888:3888
#服务器编号=服务器IP:LF数据同步端口:LF选举端口
启动
/usr/local/zookeeper/bin/zkServer.sh start
查看状态
/usr/local/zookeeper/bin/zkServer.sh status
连接到任意节点生成数据
/usr/local/zookeeper/bin/zkCli.sh -server 192.168.47.111:2181
验证
/usr/local/zookeeper/bin/zkCli.sh -server 192.168.47.112:2181
3.kafka安装
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。
(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或
多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据
存于何处)
Partition
parition是物理上的概念,每个topic包含一个或多个partition,
创建topic时可指定parition数量。每个partition对应于一个文件夹,
该文件夹下存储该partition的数据和索引文件
Producer
负责发布消息到Kafka broker
Consumer
消费消息。每个consumer属于一个特定的consuer group
(可为每个consumer指定group name,若不指定group name则属于默认的group)
使用consumer high level API时,同一topic的一条消息只能被同一个
consumer group内的一个consumer消费,但多个consumer group可同时消费
这一消息
安装
tar xf kafka_2.12-2.1.0.tgz
ln -sv /usr/local/src/kafka_2.12-2.1.0 /usr/local/kafka
vim /usr/local/kafka/config/server.properties
------------------------------------------------------
broker.id=1 #设置每个代理全局唯一的整数ID
listeners=PLAINTEXT://192.168.47.111:9092 #自己的ip地址
log.retention.hours=24 #保留指定小时的日志内容
zookeeper.connect=192.168.47.111,192.168.47.112,192.168.47.113:2181 ##所有的zookeeper地址
------------------------------------------------------
启动kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
#此方式zookeeper会在shell断开后关闭
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
测试kafka
验证进程
apt install openjdk-8-jdk-headless
root@ubuntu:/usr/local/src# jps
2694 QuorumPeerMain
16710 Jps
3215 Kafka
测试创建topic
创建名为logstashtest,partitions(分区)为3,replication(复制)为3的topic(主题)
#在任意kafaka服务器操作
/usr/local/kafka/bin/kafka-topics.sh \\
--create --zookeeper 192.168.47.111,192.168.47.112,192.168.47.113:2181 \\
--partitions 3 \\
--replication-factor 3 \\
--topic logstashtest
测试获取topic
可以在任意一台kafka服务器进行测试
/usr/local/kafka/bin/kafka-topics.sh \\
--describe \\
--zookeeper 192.168.47.111,192.168.47.112,192.168.47.113:2181 \\
--topic logstashtest
状态说明:
logstashtest有三个分区分别为0、1、2,
分区0的leader是2(broker.id),
分区0有三个副本,并且状态都为lsr(ln-sync,表示可以参加选举成为leader)
删除topic
/usr/local/kafka/bin/kafka-topics.sh \\
--delete \\
--zookeeper 192.168.47.111,192.168.47.112,192.168.47.113:2181 \\
--topic logstashtest
获取所有topic
/usr/local/kafka/bin/kafka-topics.sh \\
--list \\
--zookeeper 192.168.47.111,192.168.47.112,192.168.47.113:2181
kafka命令测试消息发送
#1.创建topic
/usr/local/kafka/bin/kafka-topics.sh \\
--create --zookeeper 192.168.47.111,192.168.47.112,192.168.47.113:2181 \\
--partitions 3 \\
--replication-factor 3 \\
--topic messagetest
#2.发送消息
/usr/local/kafka/bin/kafka-console-producer.sh \\
--broker-list 192.168.47.111:9092,192.168.47.112:9092,192.168.47.113:9092 \\
--topic messagetest
#3.其他kafka服务器测试获取数据
/usr/local/kafka/bin/kafka-console-consumer.sh \\
--bootstrap-server 192.168.47.111:9092,192.168.47.112:9092,192.168.47.113:9092 \\
--topic messagetest\\
--from-beginning
使用logstash测试向kafka写入数据
vim /etc/logstash/conf.d/logstash-to-kafka.conf
input
stdin
output
kafka
topic_id => "hello"
bootstrap_servers => "192.168.47.113:9092"
batch_size => 5
stdout
codec => rubydebug
验证kafka收到logstash数据
/usr/local/kafka/bin/kafka-console-consumer.sh \\
--bootstrap-server 192.168.47.111:9092,192.168.47.112:9092,192.168.47.113:9092 \\
--topic hello \\
--from-beginning
以上是关于zookeeper和kafka安装的主要内容,如果未能解决你的问题,请参考以下文章
ELK之七------Zookeeper和Kafka安装与配置
zookeeper集群环境搭建(使用kafka的zookeeper搭建zk集群)