kafka安装及配置过程
Posted 李晓LOVE向阳
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka安装及配置过程相关的知识,希望对你有一定的参考价值。
来来先扫一下,一码不扫何以扫天下
简介
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
基础架构及术语
先看一下Kafka基础架构示意图
Producer:Producer即生产者,消息的产生者,是消息的入口。
kafka cluster:
Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message:每一条发送的消息主体。
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
Kafka优势及应用场景
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。简单地说,Kafka就相比是一个邮箱,生产者是发送邮件的人,消费者是接收邮件的人,Kafka就是用来存东西的,只不过它提供了一些处理邮件的机制。
一、Kafka的优势如下:
高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒;
可扩展性:kafka集群支持热扩展;
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
容错性:允许集群中节点故障(若副本数量为n,则允许n-1个节点故障);
高并发:支持数千个客户端同时读写。
二、Kafka适合以下应用场景:
日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer;
消息系统:解耦生产者和消费者、缓存消息等;
用户活动跟踪:kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后消费者通过订阅这些topic来做实时的监控分析,亦可保存到数据库;
运营指标:kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
流式处理:比如spark streaming和storm。
二、Kafka安装及配置过程
一、安装
kafka可以通过官网下载:https://kafka.apache.org/downloads
kafka根据Scala版本不同,又分为多个版本,我不需要使用Scala,所以就下载官方推荐版本kafka_2.13-2.7.0.tgz。
使用tar -xzvf kafka_2.12-2.4.0.tgz 解压
为了使用方便,可以创建软链接kafka0
ln -l kafka_2.13-2.7.0 kafka0
kafka的安装需要依赖Zookeeper,下面先介绍Kafka与Zookeeper的关系,然后在介绍如何安装Zookeeper。
三、Kafka与Zookeeper的关系
一个典型的Kafka集群中包含若干Produce,若干broker(一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
1)Producer端直接连接broker.list列表,从列表中返回TopicMetadataResponse,该Metadata包含Topic下每个partition leader建立socket连接并发送消息.
2)Broker端使用zookeeper用来注册broker信息,以及监控partition leader存活性.
3)Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息。
Zookeeper作用:管理broker、consumer
创建Broker后,向zookeeper注册新的broker信息,实现在服务器正常运行下的水平拓展。具体的,通过注册watcher,获取partition的信息。
Topic的注册,zookeeper会维护topic与broker的关系,通/brokers/topics/topic.name节点来记录。
Producer向zookeeper中注册watcher,了解topic的partition的消息,以动态了解运行情况,实现负载均衡。Zookeepr不管理producer,只是能够提供当前broker的相关信息。
Consumer可以使用group形式消费kafka中的数据。所有的group将以轮询的方式消费broker中的数据,具体的按照启动的顺序。Zookeeper会给每个consumer group一个ID,即同一份数据可以被不同的用户ID多次消费。因此这就是单播与多播的实现。以单个消费者还是以组别的方式去消费数据,由用户自己去定义。Zookeeper管理consumer的offset跟踪当前消费的offset。
kafka使用ZooKeeper用于管理、协调代理。每个Kafka代理通过Zookeeper协调其他Kafka代理。
当Kafka系统中新增了代理或某个代理失效时,Zookeeper服务将通知生产者和消费者。生产者与消费者据此开始与其他代理协调工作。
Zookeeper在Kakfa中扮演的角色:Kafka将元数据信息保存在Zookeeper中,但是发送给Topic本身的数据是不会发到Zk上的
· kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。
· 而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除 broker时,各broker间仍能自动实现负载均衡。这里的客户端指的是Kafka的消息生产端(Producer)和消息消费端(Consumer)
· Broker端使用zookeeper来注册broker信息,以及监测partitionleader存活性.
· Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partitionleader建立socket连接,并获取消息.
· Zookeer和Producer没有建立关系,只和Brokers、Consumers建立关系以实现负载均衡,即同一个ConsumerGroup中的Consumers可以实现负载均衡(因为Producer是瞬态的,可以发送后关闭,无需直接等待)
四、Zookeeper配置
当前下载的kafka程序里自带Zookeeper,可以直接使用其自带的Zookeeper建立集群,也可以单独使用Zookeeper安装文件建立集群。
1. 单独使用Zookeeper安装文件建立集群
Zookeeper的安装及配置可以参考另一篇博客,里面有详细介绍
https://www.cnblogs.com/zhaoshizi/p/12105143.html
2. 直接使用其自带的Zookeeper建立集群
由于这里主要介绍Kafka,所以Zookeeper没有建立集群,只使用了单节点。
kafka自带的Zookeeper程序脚本与配置文件名与原生Zookeeper稍有不同。
kafka自带的Zookeeper程序使用bin/zookeeper-server-start.sh,以及bin/zookeeper-server-stop.sh来启动和停止Zookeeper。
而Zookeeper的配制文件是config/zookeeper.properties,可以修改其中的参数
(1) 启动Zookeeper
./kafka-server-start.sh -daemon ../config/server.properties
加-daemon参数,可以在后台启动Zookeeper,输出的信息在保存在执行目录的logs/zookeeper.out文件中。
对于小内存的服务器,启动时有可能会出现如下错误
os::commit_memory(0x00000000e0000000, 536870912, 0) failed; error='Not enough space' (errno=12)
可以通过修改bin/zookeeper-server-start.sh中的参数,来减少内存的使用,将下图中的-Xmx512M -Xms512M改小。
(2)关闭Zookeeper
./zookeeper-server-stop.sh -daemon ../config/zookeeper.properties
五、Kafka配置
kafka的配置文件在config/server.properties文件中,主要修改参数如下,更具体的参数说明以后再整理下。
broker.id是kafka broker的编号,集群里每个broker的id需不同。我是从0开始。
listeners是监听地址,需要提供外网服务的话,要设置本地的IP地址
log.dirs是日志目录,需要设置
设置Zookeeper集群地址,我是在同一个服务器上搭建了kafka和Zookeeper,所以填的本地地址
num.partitions 为新建Topic的默认Partition数量,partition数量提升,一定程度上可以提升并发性,数值应该小于等于broker的数量
因为要创建kafka集群,所以kafka的所有文件都复制两份,配置文件做相应的修改,尤其是brokerid、IP地址和日志目录。分别创建软链接kafka1和kafka2。
六、启动及停止Kafka
1. 启动kafka
./kafka-server-start.sh -daemon ../config/server.properties
-daemon 参数会将任务转入后台运行,输出日志信息将写入日志文件,查看日志,如果结尾输同started说明启动成功。
也可以用jps命令,看有没有kafka的进程
2. 停止kafka
./kafka-server-stop.sh ../config/server.properties
七、测试
前提:kafka和Zookeeper已启动完成
1. 创建topic
./kafka-topics.sh --create --zookeeper 192.168.92.128:2181 --replication-factor 2 --partitions 2 --topic test
2. 查看主题
./kafka-topics.sh --list --zookeeper 192.168.92.128:2181
3. 发送消息
./kafka-console-producer.sh --broker-list 192.168.92.128:9092 --topic test
4. 接收消息
./kafka-console-consumer.sh --bootstrap-server 192.168.92.128:9093 --topic test --from-beginning
5. 查看特定主题的详细信息
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
6. 删除主题
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
关注公众号,了解更多
以上是关于kafka安装及配置过程的主要内容,如果未能解决你的问题,请参考以下文章