06 APACHE KAFKA基础
Posted IT BOY
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了06 APACHE KAFKA基础相关的知识,希望对你有一定的参考价值。
目录
APACHE KAFKA基础
PT1 KAFKA介绍
Pt1.1 Kafka发展史
Kafka到底是什么?在没有接触到Kafka之前,一直觉得Kafka好像很神秘的感觉。他可以应用在普通业务中,承担通信队列的任务,和其它MQ作用一样,但同时他又被大量应用在大数据、流式计算的场景,那他的定义和定位到底是什么呢?不妨先来简单了解下Kafka是什么。
Apache Kafka命名和定义
Kafka的架构师jay kreps对于Kafka的名称由来是这样讲的,由于jay kreps非常喜欢franz kafka,并且觉得Kafka这个名字很酷,因此取了个和消息传递系统完全不相干的名称Kafka,该名字并没有特别的含义。
Kafka定义(官网):
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Apache Kafka诞生历程
Kafka诞生于LinkedIn。和所有大公司一样,LinkedIn内部系统非常之多,服务之间交互复杂,起初系统间通过点对点进行通信,交互关系非常复杂,难以维护。
后来,考虑使用MQ来替代服务依赖,将点对点通信改造为基于ActiveMQ的通信模式,但是LinkedIn体量太大,ActiveMQ还远远无法满足linkedin对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问。
无奈,LinkedIn只能考虑自己开发消息通信系统。
可以看出,Kafka的出现是为了解决业务上下游系统复杂的耦合关系而诞生的,解耦正是消息引擎的重要特性。
Apache Kafka定位和特性
Apache Kafka 是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:
-
以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
-
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
-
支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
-
同时支持离线数据处理和实时数据处理。
-
Scale out:支持在线水平扩展。
Apache Kafka是一个分布式流处理平台。具有以下特性:
-
可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
-
可以储存流式的记录,并且有较好的容错性。
-
可以在流式记录产生时就进行处理。
Pt1.2 Kafka应用场景
Kafka被广泛应用于消息通信、大数据领域和流计算领域。
(1) 消息通信
和ActiveMQ、RabbitMQ一样,作为TCP、HTTP和RPC点对点服务通信的替代者,Kafka可以处理消息通信,从而实现异步通信、服务解耦、流量消峰和消息广播的能力。
与大多数消息系统相比,Kafka拥有更好的吞吐量、内置分区、具有复制和容错的功能,这使它成为一个非常理想的大型消息处理应用。根据我们的经验,通常消息传递使用较低的吞吐量,但可能要求较低的端到端延迟,Kafka提供强大的持久性来满足这一要求。在这方面,Kafka 可以与传统的消息传递系统(ActiveMQ 和 RabbitMQ)相媲美。
(2) 大数据领域
跟踪网站活动
Kafka 的初始用例是将用户活动跟踪管道重建为一组实时发布-订阅源。这意味着网站活动(浏览网页、搜索或其他的用户操作)将被发布到中心topic,其中每个活动类型有一个topic。 这些订阅源提供一系列用例,包括实时处理、实时监视、对加载到Hadoop或离线数据仓库系统的数据进行离线处理和报告等。
每个用户浏览网页时都生成了许多活动信息,因此活动跟踪的数据量通常非常大。
应用指标监控
Kafka 通常用于监控数据。这涉及到从分布式应用程序中汇总数据,然后生成可操作的集中数据源。
日志聚合
许多人使用Kafka来替代日志聚合解决方案。
日志聚合系统通常从服务器收集物理日志文件,并将其置于一个中心系统(可能是文件服务器或HDFS)进行处理。Kafka 从这些日志文件中提取信息,并将其抽象为一个更加清晰的消息流。这样可以实现更低的延迟处理且易于支持多个数据源及分布式数据的消耗。与Scribe或Flume等以日志为中心的系统相比,Kafka具备同样出色的性能、更强的耐用性(因为复制功能)和更低的端到端延迟。
采集日志
Event sourcing是一种应用程序设计风格,按时间来记录状态的更改。Kafka可以存储非常多的日志数据,为基于 event sourcing 的应用程序提供强有力的支持。
提交日志
Kafka可以从外部为分布式系统提供日志提交功能。日志有助于记录节点和行为间的数据,采用重新同步机制可以从失败节点恢复数据。Kafka的日志压缩功能支持这一用法,这一点与Apache BookKeeper 项目类似。
(3) 流计算领域
流处理
许多Kafka用户通过管道来处理数据,有多个阶段:从Kafka topic中消费原始输入数据,然后聚合,修饰或通过其他方式转化为新的topic,以供进一步消费或处理。
例如,一个推荐新闻文章的处理管道可以从RSS订阅源抓取文章内容并将其发布到“文章”topic; 然后对这个内容进行标准化或者重复的内容,并将处理完的文章内容发布到新的topic; 最终它会尝试将这些内容推荐给用户。这种处理管道基于各个topic创建实时数据流图。
从0.10.0.0开始,在Apache Kafka中,Kafka Streams 可以用来执行上述的数据处理,它是一个轻量但功能强大的流处理库。除Kafka Streams外,可供替代的开源流处理工具还包括Apache Storm 和Apache Samza.
PT2 KAFKA安装
Pt2.1 CentOS安装单机版Kafka
(1) 环境准备
1、准备CentOS环境;
2、安装JDK环境依赖;
3、安装ZK集群;
(2) 安装JDK环境
1、官网下载JDK11安装包
官网(Java Downloads | Oracle)下载JDK11安装包jdk-11.0.11_linux-x64_bin.tar.gz,上传到CentOS服务器下。
2、解压安装包
[root@VM-0-17-centos jdk]# tar -zxvf jdk-11.0.11_linux-x64_bin.tar.gz
jdk-11.0.11/README.html
jdk-11.0.11/bin/jaotc
jdk-11.0.11/bin/jar
......
jdk-11.0.11/lib/tzdb.dat
jdk-11.0.11/release
3、配置环境变量
在/etc/profile文件后追加:
export JAVA_HOME=/root/app/jdk/jdk-11.0.11
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH
编译使之生效:
source /etc/profile
4、验证是否安装成功
[root@VM-0-17-centos jdk-11.0.11]# java -version
java version "11.0.11" 2021-04-20 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.11+9-LTS-194)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.11+9-LTS-194, mixed mode)
(3) 安装ZooKeeper
1、获取ZooKeeper镜像
docker pull zookeeper:3.5.8
2、单机启动ZooKeeper
docker run -d --name zookeeper -p 2181:2181 -t zookeeper:3.5.8
(4) 安装Kafka
1、获取Kafka镜像
docker pull wurstmeister/kafka:latest
2、单机启动Kafka
docker run -d --name kafka \\
-p 9092:9092 \\
-e KAFKA_BROKER_ID=0 \\
-e KAFKA_ZOOKEEPER_CONNECT=121.14.133.151:2181 \\
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://121.14.133.151:9092 \\
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
启动后,目录如下:
LICENSE NOTICE bin config libs logs site-docs
3、启动报错
如果启动时闪退,可以使用docker logs kafka查看错误日志,很大可能是因为内存不足(我用的云服务器,买的内存很小),需要修改Kafka启动内存。
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
我这边报了内存错误,然后我把rabbitmq都停掉再重启kafka就好了。
(5) 验证
1、创建topic
# 进入容器
docker exec -it kafka /bin/bash
# 进入命令目录
cd /opt/kafka_2.13-2.7.0/bin
# 创建topic,1各分区,1个副本
kafka-topics.sh --create --zookeeper 121.14.133.151:2181 --replication-factor 1 --partitions 1 --topic mykafka
查看创建的topic:
kafka-topics.sh -list -zookeeper 121.14.133.151:2181
2、启动Producer
在新窗口中启动Producer,启动后光标会进入输入模式,这时候输入回车就是发送消息。
# 启动Producer,关联刚才创建的topic
kafka-console-producer.sh --broker-list 121.14.133.151:9092 --topic mykafka
3、启动Consumer
在新窗口启动Consumer,同样光标会进入闪烁模式,同时等待生产者消息。
kafka-console-consumer.sh --bootstrap-server 121.14.133.151:9092 --topic mykafka --from-beginning
4、发送消息
在生产者光标中输入消息。
# 生产者发送消息
bash-4.4# kafka-console-producer.sh --broker-list 121.14.133.151:9092 --topic mykafka
>hello, this is my first kafka message.
>
消费者接收消息。
bash-4.4# kafka-console-consumer.sh --bootstrap-server 121.4.33.15:9092 --topic mykafka --from-beginning
hello, this is my first kafka message.
Pt2.2 Kafka命令
脚本 | 作用 |
---|---|
connect-distributed.sh | 用于启动多节点的Distributed模式的Kafka Connect组件 |
connect-standalone.sh | 用于启动单节点的Standalone模式的Kafka Connect组件 |
kafka-acls.sh | 用于设置Kafka权限,比如设置哪些用户可以访问Kafka的哪些TOPIC的权限 |
kafka-broker-api-versions.sh | 主要用于验证不同Kafka版本之间服务器和客户端的适配性 |
kafka-configs.sh | 配置管理脚本 |
kafka-console-consumer.sh | kafka消费者控制台 |
kafka-console-producer.sh | kafka生产者控制台 |
kafka-consumer-groups.sh | kafka消费者组相关信息 |
kafka-consumer-perf-test.sh | kafka消费者性能测试脚本 |
kafka-delegation-tokens.sh | 用于管理Delegation Token。基于Delegation Token的认证是一种轻量级的认证机制,是对SASL认证机制的补充。 |
kafka-delete-records.sh | 用于删除Kafka的分区消息 |
kafka-dump-log.sh | 用于查看Kafka消息文件的内容 |
kafka-log-dirs.sh | 用于查询各个Broker上的各个日志路径的磁盘占用情况 |
kafka-mirror-maker.sh | 用于在Kafka集群间实现数据镜像 |
kafka-preferred-replica-election.sh | 用于执行Preferred Leader选举,可以为指定的主题执行更换Leader的操作 |
kafka-producer-perf-test.sh | kafka生产者性能测试脚本 |
kafka-reassign-partitions.sh | 用于执行分区副本迁移以及副本文件路径迁移。 |
kafka-replica-verification.sh | 复制进度验证脚本 |
kafka-run-class.sh | 用于执行任何带main方法的Kafka类 |
kafka-server-start.sh | 启动kafka服务 |
kafka-server-stop.sh | 停止kafka服务 |
kafka-streams-application-reset.sh | 用于给Kafka Streams应用程序重设位移,以便重新消费数据 |
kafka-topics.sh | topic管理脚本 |
kafka-verifiable-consumer.sh | 可检验的kafka消费者 |
kafka-verifiable-producer.sh | 可检验的kafka生产者 |
trogdor.sh | Kafka的测试框架,用于执行各种基准测试和负载测试 |
zookeeper-server-start.sh | 启动zk服务 |
zookeeper-server-stop.sh | 停止zk服务 |
zookeeper-shell.sh | zk客户端 |
Pt2.3 Kafka管理控台
# 拉取最新版kafka-manager镜像
docker pull kafkamanager/kafka-manager
# 运行镜像
docker run -d --name kafka-manager \\
--link zookeeper:zookeeper \\
--link kafka:kafka -p 9001:9000 \\
--restart=always \\
--env ZK_HOSTS=zk_ip:2181 \\
kafkamanager/kafka-manager
运行成功后,http://ip:9001登陆控台。
具体控台使用就不介绍了,大概就是这样一个界面。
以上是关于06 APACHE KAFKA基础的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Camel Source 从 S3 到 Kafka
kafka使用Apache Kafka构建可靠的再处理和死信队列