分布式实时消息队列Kafka
Posted 大数据Manor
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式实时消息队列Kafka相关的知识,希望对你有一定的参考价值。
分布式实时消息队列Kafka(二)
知识点01:课程回顾
-
什么是消息队列?
- 用于两个系统之间或者两个模块之间实现消息传递,基于队列机制实现数据缓存
-
消息队列的优点是什么?
- 实现解耦
- 通过异步,提高性能
-
消息队列的缺点是什么?
- 架构更加复杂:如果消息队列出现故障,整个系统都会故障
- 分布式集群
- 副本机制
- 数据维护更加复杂:不丢失,不重复
- 生产安全:幂等性机制
- 消费安全:Offset
- 架构更加复杂:如果消息队列出现故障,整个系统都会故障
-
什么是同步与异步?
- 同步:立即一致性
- 异步:最终一致性
-
什么是Kafka?
- Kafka是一个基于订阅发布模式的高性能、高吞吐的实时消息队列系统
-
Kafka在大数据的应用场景是什么?
- 用于实时架构中:实现数据的临时存储
-
Kafka中的Producer、Consumer、Consumer Group 、Broker分别是什么?
- Producer:生产者,负责写入数据到Kafka
- Consumer:消费者,负责从Kafka消费读取数据
- Consumer Group:消费者组
- Kafka中的数据消费必须以消费者组为单位
- 一个消费者组可以包含多个消费者,注意多个消费者消费的数据加在一起是一份完整的数据
- 目的:提高性能
- 消费者组消费Topic
- 消费者组中的消费者消费Topic的分区
- Broker:Kafka一个节点
- 多个节点,构建Kafka集群
- 主从架构:类似于Zookeeper
- HDFS:NameNode、DataNode
- Hbase:HMaster、HRegionServer
- Kafka:Kafka
- 主:Kafka Controler
- 从:Kafka Broker
- 启动Kafka时候,会从所有的Broker选举一个Controler,如果Controller故障,会从其他的Broker重新选举一个
- 选举:使用ZK是实现辅助选举
-
Kafka中的Topic与Partition是什么?
-
Topic:逻辑上实现数据存储的分类,类似于数据库中的表概念
-
Partition:Topic中用于实现分布式存储的物理单元,一个Topic可以有多个分区
- 每个分区可以存储在不同的节点,实现分布式存储
-
副本机制:Kafka中每个分区可以构建多个副本【副本个数 <= 机器的个数】
-
将一个分区的多个副本分为两种角色
-
leader副本:负责对外提供读写请求
-
follower副本:负责与leader同步数据,如果leader故障,follower要重新选举一个成为leader
- 选举:不由ZK实现选举,由Kafka Crontroller来决定谁是leader
-
-
-
Kafka中的Segment是什么?
-
Segment:对分区内部的数据进行更细的划分,分区段,文件段
-
类似于Region中划分store
-
规则:按照文件产生的时间或者大小
-
目的:提高写入和查询性能
- 文件名称可以用于检索数据:用offset命名的
-
组成:每个Segment由两个文件组成
- .log:存储的数据
- .index:对应.log文件的索引信息
-
-
-
Kafka中的Offset是什么?
- Offset是kafka中存储数据时给每个数据做的标记或者编号
- 分区级别的编号
- 从0开始编号
- 功能:消费者根据offset来进行消费,保证顺序消费,数据安全
- Offset是kafka中存储数据时给每个数据做的标记或者编号
知识点02:课程目标
-
Kafka的集群如何搭建启动?
- 实现Kafka分布式集群的安装部署【按照笔记一步步搭建】
-
Kafka的Topic如何创建管理?【掌握】
-
命令行实现
-
创建Topic
-
查看Topic信息
-
删除、列举Topic
-
-
Kafka的Java API如何实现?【掌握类和方法】
- Java API
- 开发生产者
- 开发消费者
知识点03:Kafka集群架构
-
目标:了解Kafka集群架构及角色功能
-
路径
- Kafka集群有哪些角色?
- Kafka每个角色的功能是什么?
- Zookeeper在架构中的作用是什么?
-
实施
- 架构角色
- Kafka
- Zookeeper
- Kafka中的每个角色以及对应的功能
- 分布式主从架构
- 主:Kafka Controller
- 负责管理所有从节点:Topic、分区和副本
- 每次启动集群,会从所有Broker中选举一个Controller,由ZK实现
- 从:Kafka Broker
- 对外提供读写请求
- 其他的Broker监听Controller,如果Controller,会重新从Broker选举一个新的
- ZK的功能
- 辅助选举Active的主节点
- 存储元数据
- 架构角色
-
小结
- kafka是一个主从架构,整体对外提供分布式读写
- ZK主要负责选举Controller和实现元数据存储
知识点04:Kafka分布式集群部署
-
目标:实现Kafka分布式集群的搭建部署
-
路径
- step1:选择版本
- step2:下载解压安装
- step:3:修改配置文件
-
实施
-
版本的选型
- 0.8.x:老的版本,很多的问题
- 0.10.x +:消息功能上基本没有问题
- 选择:kafka_2.12-2.4.1.tgz
- Kafka:2.4.1
- Scala:2.12,Kafka是由Scala语言开发
-
下载解压安装
-
下载:http://archive.apache.org/dist/kafka/
-
上传到第一台机器
cd /export/software/ rz
-
解压
tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/ cd /export/server/kafka_2.12-2.4.1/ mkdir logs
- bin:一般用于存放客户端操作命令脚本
- sbin:一般用于存放集群的启动和关闭的命令脚本,如果没有这个命令,脚本放在bin目录中
- conf/etc/config:配置文件目录
- lib:jar包的存放目录
- logs:一般用于存放服务日志
-
-
修改配置
-
切换到配置文件目录
cd /export/server/kafka_2.12-2.4.1/config
-
修改server.properties
#21行:唯一的 服务端id broker.id=0 #60行:指定kafka的日志及数据【segment【.log,.index】】存储的位置 log.dirs=/export/server/kafka_2.12-2.4.1/logs #123行:指定zookeeper的地址 zookeeper.connect=node1:2181,node2:2181,node3:2181
#在最后添加两个配置,允许删除topic,当前kafkaServer的主机名
delete.topic.enable=true
host.name=node1- 分发 ```shell cd /export/server/ scp -r kafka_2.12-2.4.1 node2:$PWD scp -r kafka_2.12-2.4.1 node3:$PWD
-
第二台:server.properties
#21行:唯一的 服务端id
broker.id=1
#最后
host.name=node2- 第三台:server.properties ```properties #21行:唯一的 服务端id broker.id=2 #最后 host.name=node3
-
添加环境变量
vim /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
export PATH=: P A T H : PATH: PATH:KAFKA_HOME/bin```shell source /etc/profile
-
-
-
小结
- 按照笔记一步步来,不做过多要求,只要配置含义,实现安装即可
- 解压安装
- 修改配置:server.properties
知识点05:Kafka启动与关闭
-
目标:掌握kafka集群的启动与关闭命令及脚本封装
-
路径
- step1:如何启动Kafka集群?
- step2:如何关闭Kafka集群?
- step3:如何封装启动及关闭脚本?
-
实施
-
启动Zookeeper
/export/server/zookeeper-3.4.6/bin/start-zk-all.sh
-
启动Kafka
bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 & >>/dev/null 2>&1 &:在后台运行
-
关闭Kafka
bin/kafka-server-stop.sh
-
封装Kafka脚本
-
启动脚本
vim /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
#!/bin/bash KAFKA_HOME=/export/server/kafka_2.12-2.4.1 for number in {1..3} do host=node${number} echo ${host} /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;export JMX_PORT=9988;${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >>/dev/null 2>&1 &" echo "${host} started" done
chmod u+x /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
-
关闭脚本
vim /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
#!/bin/bash KAFKA_HOME=/export/server/kafka_2.12-2.4.1 for number in {1..3} do host=node${number} echo ${host} /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;${KAFKA_HOME}/bin/kafka-server-stop.sh" echo "${host} stoped" done
chmod u+x /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
-
-
-
小结
- 启动:kafka-server-start.sh
- 关闭:kafka-server-stop.sh
知识点06:Topic管理:创建与列举
-
目标:掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic
-
路径
- step1:Topic脚本的使用
- step2:创建Topic
- step3:列举Topic
-
实施
-
Topic管理脚本
-
查看用法
Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions, replica assignment, and/or configuration for the topic. --at-min-isr-partitions if set when describing topics, only show partitions whose isr count is equal to the configured minimum. Not supported with the --zookeeper option. --bootstrap-server <String: server to REQUIRED: The Kafka server to connect connect to> to. In case of providing this, a direct Zookeeper connection won't be required. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs. --config <String: name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.compaction.lag.ms max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs.It is supported only in combination with -- create if --bootstrap-server option is used. --create Create a new topic. --delete Delete a topic --delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). Not supported with the --bootstrap-server option. --describe List details for the given topics. --disable-rack-aware Disable rack aware replica assignment --exclude-internal exclude internal topics when running list or describe command. The internal topics will be listed by default --force Suppress console prompts --help Print usage information. --if-exists if set when altering or deleting or describing topics, the action will only execute if the topic exists. Not supported with the --bootstrap- server option. --if-not-exists if set when creating topics, the action will only execute if the topic does not already exist. Not supported with the --bootstrap- server option. --list List all available topics. --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default. --replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. If not supplied, defaults to the cluster default. --topic <String: topic> The topic to create, alter, describe or delete. It also accepts a regular expression, except for --create option. Put topic name in double quotes and use the '\\' prefix to escape regular expression symbols; e. g. "test\\.topic". --topics-with-overrides if set when describing topics, only show topics that have overridden configs --unavailable-partitions if set when describing topics, only show partitions whose leader is not available --under-min-isr-partitions if set when describing topics, only show partitions whose isr count is less than the configured minimum. Not supported with the --zookeeper option. --under-replicated-partitions if set when describing topics, only show under replicated partitions --version Display Kafka version. --zookeeper <String: hosts> DEPRECATED, The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.
-
-
创建Topic
bin/kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
- –create:创建
- –topic :指定操作的Topic的名称
- –partitions:指定分区个数,默认为1
- –replication-factor:副本因子,默认为1
- –bootstrap-server:指定Kafka服务端地址
-
列举Topic
bin/kafka-topics.sh --list -bootstrap-server node1:9092,node2:9092,node3:9092
-
–list:表示列举
-
-
-
小结
- 创建:create
- 指定分区个数
- 指定副本个数
- 列举:list
- 必选:–bootstrap-server:服务端地址
- 端口:9092
知识点07:Topic管理:查看与删除
-
目标:掌握Kafka集群中Topic的管理命令,实现查看Topic信息及删除Topic
-
路径
- step1:查看Topic详细信息
- step2:删除Topic
-
实施
-
查看Topic信息
bin/kafka-topics.sh --describe --topic bigdata02 --bootstrap-server node1:9092,node2:9092,node3:9092
Topic: bigdata02 PartitionCount: 2 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: bigdata02 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: bigdata02 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
- Partition:分区编号
- Replicas:分区副本所在的Kafka Broker ID
- 每个分区的副本有两种角色
- leader副本
- follower副本
- Leader:leader 副本所在的Kafka节点
- Isr:In-Sync-Replicas:正在同步的副本,可用的副本
- 用于leader故障时,选举新的leader
-
删除Topic
bin/kafka-topics.sh --delete --topic bigdata02 --bootstrap-server node1:9092,node2:9092,node3:9092
-
-
小结
- 查看信息:describe
- 删除:delete
知识点08:生产者及消费者测试
-
目标:了解命令行如何模拟测试生产者和消费者
-
路径
- step1:构建一个生产者往Topic中生产数据
- 指定Topic
- 指定Kafka集群地址
- step2:构建一个消费者从Topic中消费数据
- 指定Topic
- 指定Kafka集群地址
- step1:构建一个生产者往Topic中生产数据
-
实施
-
命令行提供的脚本
-
Console生产者
bin/kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092
-
Console消费者
bin/kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning
- –from-beginning:从每个分区的最初开始消费,默认从最新的offset进行消费
-
-
小结
- 只要生产者不断生产,消费就能实时的消费到Topic中的数据
知识点09:可视化工具Kafka Tool
-
目标:了解Windows版可视化工具Kafka Tool的使用
-
路径
- step1:安装Kafka Tool
- step2:启动构建连接
- step3:查看Kafka集群信息
-
实施
-
安装Kafka Tool:不断下一步即可
-
构建集群连接:连接Kafka集群
-
查看集群信息
-
-
小结
- 可视化工具,界面或者交互性不是很友好
- 后面会学习:Kafka Eagle
知识点10:Kafka集群压力测试
-
目标:了解如何实现Kafka集群的吞吐量及压力测试
-
路径
- step1:生产压力测试
- step2:消费压力测试
-
实施
-
创建Topic
bin/kafka-topics.sh --create --topic bigdata --partitions 2 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
-
生产测试
kafka-producer-perf-test.sh --topic bigdata --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
- –num-records:写入数据的条数
- –throughput:是否做限制,-1表示不限制
- –record-size:每条数据的字节大小
-
消费测试
kafka-consumer-perf-test.sh --topic bigdata --broker-list node1:9092,node2:9092,node3:9092 --fetch-size 1048576 --messages 5000000
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-86jBAgCD-1625805582217)(20210330_分布式实时消息队列Kafka(二).assets/image-20210330105146154.png)]
-
-
小结
- 工作中一般根据实际的需求来调整参数,测试kafka集群的最高性能,判断是否能满足需求
知识点11:Kafka API 的应用
-
目标:了解工作中使用Kafka API的场景
-
路径
- step1:工作中使用Kafka的方式
- step2:Kafka API的分类
-
实施
-
命令行使用Kafka
- 一般只用于topic的管理:创建、删除
-
大数据架构中使用Kafka
-
Java API:构建生产者和消费者
-
工作中一般不用自己开发生产者和消费者
-
生产者:数据采集工具
- Flume:Kafka sink
- 配置kafka集群地址
- Topic的名称
- Flume:Kafka sink
-
消费者:实时计算程序
-
SparkStream:KafkaUtil
KafkaUtil.createDirectStream
-
-
这些软件的API已经将Kafka生产者和消费者的API封装了,只要调用即可
-
重点掌握:用到哪些类和方法
-
-
Kafka的API的分类
- High Level API:高级API
- 基于了SimpleAPI做了封装,让用户开发更加方便
- 但是由于封装了底层的API,有很多的东西不能控制,无法控制数据安全
- Simple API:简单API
- 并不简单,最原始的API
- 自定义控制所有消费和生产、保证数据安全
- High Level API:高级API
-
-
小结
- 大数据工作中一般不自己开发Java API:掌握类和方法即可
- 只使用Simple API来实现开发
知识点12:生产者API:构建KafkaProducer
-
目标:了解如何通过Java API构建生产者
-
路径
- step1:构建集群配置对象
- 指定服务端集群地址
- step2:构建Kafka Porducer对象
- 加载配置
- step1:构建集群配置对象
-
实施
-
构建集群配置对象
//todo:1-构建连接,Kafka生产者对象 //构建配置对象,指定生产者的配置 Properties props = new Properties(); props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定集群地址 /** * acks:表示生产者生产数据时,怎么保证数据不丢失,Kafka接受写入数据以后,可以给生产者返回一个ack,表示收到这条数据,生产者发送下一条 * 0:生产者不管Kafka有没有返回ack,都直接发送下一条 * 快、数据容易丢失 * 1:生产者发送数据给Kafka的某个分区,写入leader副本以后,kafka就返回ack,生产者发送下一条 * 相对安全机制,有一定的概率,数据会丢失 * all:生产者发送数据给Kafka的某个分区,写入leader副本并且所有follower同步成功以后,kafka就返回ack,生产者发送下一条 * 最安全,性能较差 */ props.put("acks", "all"); //指定Key的序列化的类 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //指定Value的序列化的类 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
构建Kafka Producer加载配置
//构建生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
-
-
小结
- Properties:构建生产者的配置
- 集群地址
- acks
- 序列化的类
- KafkaProducer:生产者的对象
知识点13:生产者API:生产数据到Kafka
-
目标:了解如何将数据写入Kafka中
-
路径
- step1:构建ProducerRecord对象
- step2:调用KafkaProducer的send方法将数据写入Kafka
-
实施
-
构建Producer对象
//构建一条数据的对象 //ProducerRecord(String topic, V value) // ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01","itcast"+i); //ProducerRecord(String topic, K key, V value) // ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01",i+"","itcast"+i); //ProducerRecord(String topic, Integer partition, K key, V value) ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01",0,i+"","itcast"+i);
-
调用send方法
//生产数据的数据 producer.send(record);
-
-
小结
- ProducerRecord:表示生产每一条数据
- Topic
- Key
- Value
- 可选:Partition
- KafkaProducer:send:写入数据到Kafka
知识点14:消费者API:构建KafkaConsumer
-
目标:了解如何通过Java API构建消费者
-
路径
- step1:构建集群配置对象
- step2:构建Kafka Consumer对象
-
实施
-
构建集群配置对象
//todo:1-构建连接,消费者对象 Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1:9092");//服务端地址 props.setProperty("group.id", "test01");//消费者组的id props.setProperty("enable.auto.commit", "true");//是否自动提交offset props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间 //指定key和value反序列化的类 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
构建Kafka Consumer加载配置
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
-
-
小结
- Properties:配置对象
- KafkaConsumer:消费者对象
知识点15:消费者API:消费Topic数据
-
目标:了解如何从Kafka中消费数据
-
路径
- step1:消费者订阅Topic
- step2:调用poll方法从Kafka中拉取数据,获取返回值
- step3:从返回值中输出:Topic、Partition、Offset、Key、Value
-
实施
-
消费者订阅Topic
//订阅Topic consumer.subscribe(Arrays.asList("bigdata01"));
-
拉取数据
//消费数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
-
输出数据
//取出每一条数据 for (ConsumerRecord<String, String> record : records) { //获取topic String topic = record.topic(); //获取分区 int partition = record.partition(); //获取offset long offset = record.offset(); //获取Key String key = record.key(); //获取Value String value = record.value(); System.out.println(topic+"\\t"+partition+"\\t"+offset+"\\t"+key+"\\t"+value); }
-
-
小结
- KafkaConsumer:subscribe:负责订阅Kafka的Topic
- KafkaConsumer:poll:负责拉取消费数据
- ConsumerRecords:消费到的所有数据的集合
- ConsumerRecord:消费到的每一条数据
- topic:获取Topic
- partition:获取分区
- offset:获取offset
- key:获取key
- value:获取value
附录一:Maven依赖
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<!-- Kafka的依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
cord.key();
//获取Value
String value = record.value();
System.out.println(topic+"\\t"+partition+"\\t"+offset+"\\t"+key+"\\t"+value);
}
```
-
小结
- KafkaConsumer:subscribe:负责订阅Kafka的Topic
- KafkaConsumer:poll:负责拉取消费数据
- ConsumerRecords:消费到的所有数据的集合
- ConsumerRecord:消费到的每一条数据
- topic:获取Topic
- partition:获取分区
- offset:获取offset
- key:获取key
- value:获取value
附录一:Maven依赖
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<dependencies>
<!-- Kafka的依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
以上是关于分布式实时消息队列Kafka的主要内容,如果未能解决你的问题,请参考以下文章