猿创征文 | kafka框架从入门到精通(全)
Posted 码农研究僧
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了猿创征文 | kafka框架从入门到精通(全)相关的知识,希望对你有一定的参考价值。
目录
- 前言
- 1. 入门kafka
- 2. 生产者
- 3. Broker
- 4. 消费者
- 5. Kafka-Eagle监控
- 6. Kafka-Kraft模式
- 7. 实战调优参数(总结)
- 8. 集群压测
- 9. 集成化
- 10. ----kafka源码分析----
前言
关于java其他方面的知识点可看我之前的文章:
java框架零基础从入门到精通的学习路线(超全)
以下内容的学习主要来源于:
【尚硅谷】2022版Kafka3.x教程(从入门到调优,深入全面)
1. 入门kafka
应用场景举例:
前端浏览了网站,记录了数据(点赞、评论量等)变成日志,发送到日志服务器,日志服务器(通过Flume时刻监控服务器,只要一有数据变化)上传到Hadoop。
Flume(上传速度为100m/s左右)和Hadoop(采集速度小于100m/s,而且高峰期可能大于200m/s)两者的传输速率不同。可以增加一个kafka的中间件,将其大量的数据都放在kafka,之后将其数据与后面的Hadoop数据的速率保持一致即可
定义:
Kafka传统定义:分布式、基于发布/订阅(发布的消息分为不同类别,订阅者只接受感兴趣的消息,订阅者订阅的速度通过自身决定)的消息队列,主要用于大数据实时处理领域。
最新的定义:开源的分布式事件流平台、数据通道、流分析、数据集成和关键任务应用
消息队列:kafka(大数据)、ActiveMQ(JAVAEE)、RabbitMQ
功能:
- 解耦(只需保证生产者和消费者两侧的接口即可。也就是数据源放到消息队列中,之后消息 列将其对应的消息分发给消费者)
- 异步(例如注册信息,调用发送信息的接口(此处使用消息队列),内部核心的处理结果在后台,先回馈信息给客户)
- 消峰(数据量多,扛不住这么大的消息,可以将其缓存到消息队列中)
两种模式:
- 点对点(消费完之后MQ删除其信息)
- 发布/订阅(每个消费者相互独立、消费数据后不删除数据)
基础架构:(讲讲基本的框架流程)
分区主要是为了方便扩展,提高了吞吐量,一个topic分为多个partition(一个区可能存储不了所有的数据,所以只能分区)
一个分区的数据只能由一个消费者消费
保证数据的可用性,每个partition增加若干副本
生产者的leader挂掉之后,follower有条件升级为leader
还有一些数据是存储在zookeeper中
记录服务器节点的信息
每个分区的相关信息(谁是消费者,谁是生产者)
kafka2.8之后可以不用zookeeper
1.1 安装配置
关于这部分的安装可看我之前的文章:
Kafka在Linux服务器下载安装配置等详细图文版本(全)
多个服务器同时启动kafka,脚本如下:(对应的启动位置还有服务器名称替换成自已的即可)
#!/bin/bash
case $1 in
"start")
for i in x,y,z
do
echo "启动"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
done
;;
"stop")
for i in x,y,z
do
echo "停止"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
done
;;
esac
1.2 命令操作
本身kafka有生产者、消费者、broker
对应每个模块都有它的启动配置
1.2.1 topic
执行bin/kafka-topics.sh内部有很多的option的操作
大致的option命令如下:
操作 | 描述 |
---|---|
bootstrap-server<String: server to connect to>(可以写多个集群) | 连接kafka broker主机名称和端口号 |
topic<String: topic>(可以执行增删改查) | 指定topic名称 |
create | 增加topic |
delete | 删除topic |
alter | 修改topic |
list | 显示所有topic |
describe | 查看某个topic的详细信息 |
partitions<Integer:#of partitions> | 设置分区数 |
replication-factor<Integer: replication factor> | 设置分区副本 |
config<String:name == value> | 更新系统默认的配置 |
主要的逻辑如下,连接kafka之后,指定某个topic进行增删改查,以及增加分区和副本数量,对某个分区进行升级等
使用的过程中如果出现问题可看这篇文章:
kafka创建、启动topic遇到的bug汇总 解决方法
bin/kafka-topics.sh 模块
-
查看对应的主题:
bin/kafka-topics.sh --bootstrap-server manongkafka:9092 --list
(在配置文件中我的别名为manongkafka 也可使用localhost代表本机)
-
创建主题:
bin/kafka-topics.sh --bootstrap-server manongkafka:9092 --create --partitions 1 --replication-factor 1 --topic first
(创建完主题之后还要创建分区以及指定副本)
对应查看topic以及详细的信息(副本数量此处我创建了一个)
- 查看topic的详细信息:
bin/kafka-topics.sh --bootstrap-server manongkafka:9092 --topic first --describe
分区数只能增加不能减少(减少的时候原先的混合在一起,导致不知道在哪个分区查找)
- 增加分区:
bin/kafka-topics.sh --bootstrap-server manongkafka:9092 --topic first --alter --partitions 3
副本的数量也可增加(但是此处不能使用命令行增加)
- 删除topic:
bin/kafka-topics.sh --bootstrap-server manongkafka:9092 --delete --topic first
1.2.2 生产者 消费者
具体通过configs目录下的bin/kafka-console-producer.sh
大致命令:bin/kafka-console-producer.sh --bootstrap-server manongkafka:9092 --topic first
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接 Kafka Broker 主机名称和端口号 |
–topic <String: topic> | 操作的 topic 名称 |
连接集群,在某个集群中添加数据
生产者的数据打到了topic中,消费者消费对应的topic即可(消费者和生产者参数差不多)
通过configs目录下的bin/kafka-console-consumer.sh
大致命令:bin/kafka-console-consumer.sh --bootstrap-server manongkafka:9092 --topic first
一般数据都会保存在topic中,可以将其保存7天的数据都加载过来(但有些数据可能不会使用到),参数为:--from-beginning
大致命令:bin/kafka-console-consumer.sh --bootstrap-server manongkafka:9092 --topic first --from-beginning
2. 生产者
生产者如何发送数据到topic中
外部的接收数据发送给生产者,具体操作流程如下:
通过生产者工程的主线程,使用send(ProducerRecord)
发送,经过拦截器(加工)、序列化器(一般使用自带)、分区器(哪个分区进行存储,一个分区创建一个队列,这些都是存储在内存中,默认缓冲队列是32m,一个batch为16k)
如下所示:
存放在队列中的数据,通过sender线程(拉取数据)
具体数据什么时间点拉取(通过数据量的大小以及等待数据的时间)
- batch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k
- linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟,也就是只要有数据,就会拉取数据(batch.size就会失效)
拉取数据到kafka的时候,本身要发送到kafka集群,以每个集群的节点为例,一个队列的数据发送的请求放在一个队列中存储,如果请求接收不到,一般还会在发送(超过5个没接收应答,就不会在拉取)。类似滑动窗口
收到数据之后一般都会有个应答机制(0 、1 和 -1),发送成功之后,会进行副本的复制,以及清除之前队列的数据。发送失败则会进行重试
- 0:生产者发送数据,不需要等数据到达后应答
- 1:生产者发送数据,Leader收到数据后应答
- -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答,-1和all等价
主要的参数讲解:
参数 | 描述 |
---|---|
bootstrap.servers | 连接集群所需的 broker 地址,如果有多个可用逗号隔开(并非需要所有地址,给定一个broker就可找到其他broker) |
key.serializer 和 value.serializer | 发送信息的key以及value序列化类型 |
buffer.memory | RecordAccumulator 缓冲区总大小(默认 32m) |
batch.size | 缓冲区一批数据的最大值(默认16k),提高值可增加吞吐量,太高延迟会加大 |
linger.ms | 数据没到达最大值,sender到达时间最大值会发送数据(默认0ms,无延迟)。production建议为5-100ms |
acks | 默认是-1(all),还有0和1 |
max.in.flight.requests.per.connection | 最多没返回的ack次数(默认5),开启幂等性要保证该值是 1-5 的数字 |
retries | 发送错误会重发(设置此处,默认int最大值),若要保证有序性,需设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 |
retry.backoff.ms | 两次重试之间的时间间隔(默认100ms) |
enable.idempotence | 是否开启幂等性(默认true开启) |
compression.type | 生产者发送的所有数据的压缩方式(默认none不压缩),其他类型有:none、gzip、snappy、lz4 和 zstd |
2.1 异步&同步 发送
何为异步通信和同步通信?
同步通信是全部所有任务都完成之后才能返回
异步发送是只要执行接口就可返回,具体核心函数不用执行到结束才返回
在kafka中的异步发送:外部数据发送到队列(不管数据有没到达kafka)
异步接口不带回调函数,(也就是在终端中没有返回值)
示例代码:
引入maven依赖文件
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
核心代码(此处的异步发送是没有回调函数的):
创建kafka生产者对象(再这之前需要配置 、连接服务器对应的topic 以及序列化)
public class CustomProducer
public static void main(String[] args)
// 0 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
// 防止一个挂掉之后还可以启动另外一个
//properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"manongkafka:9092");
// 指定对应的key和value的序列化类型 key.serializer,必须要配置这两个参数
// StringSerializer.class.getName() 等同于它的全类名 org.apache.kafka.common.serialization.StringSerializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1 创建kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 5; i++)
kafkaProducer.send(new ProducerRecord<>("first","码农研究僧"+i));
// 3 关闭资源
kafkaProducer.close();
启动之后,截图如下:
异步接口带回调函数:
发送函数send有两个,其中一个带有回调参数
ProducerRecord有大致如下函数,参数有哪个分区哪个key、value等(在下述的分区中会讲这些函数的区别)
主要修改上面的核心代码:
// 2 发送数据
for (int i = 0; i < 5; i++)
kafkaProducer.send(new ProducerRecord<>("first", "manongyanjiuseng" + i), new Callback()
@Override
public void onCompletion(RecordMetadata metadata, Exception exception)
if (exception == null)
System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
);
Thread.sleep(2);
截图如下:
同步发送:
同样修改上面的核心代码,加入get函数捕捉
类似future线程,只不过这里通过get获取,等待异步调用的结果(需要发送完毕数据之后才会返回结果)
通过send函数发送到队列 ,再从队列发送到broker中来返回结果
// 2 发送数据
for (int i = 0; i < 5; i++)
kafkaProducer.send(new ProducerRecord<>("first","manongyanjiuseng"+i)).get();
2.2 分区
数据过大不适合存放在一台服务器上,需要进行分区存储
分区好处:
- 合理使用存储资源:每个分区都在在一个Broker上存储,海量的数据按照分区切割成一块一块数据存储在多台Broker上(合理控制分区,实现负载均衡,容错性)
- 提高并行度(效率增加):生产者可以以分区为单位发送数据;消费者以分区为单位进行消费数据。
分区策略:(先看图)
- 指定分区策略使用
比如指定分区数,不指定key(按照指定的分区发送)
// 2 发送数据
for (int i = 0; i < 5; i++)
kafkaProducer.send(new ProducerRecord<>("first", 1,"","hello" + i), new Callback()
@Override
public void onCompletion(RecordMetadata metadata, Exception exception)
if (exception == null)
System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
);
Thread.sleep(2);
- 没有指定分区,但是有key,则基于key进行hash(散列)
key1的hash为5,key2的hash为6,分区数为2,则5%2为1分区,6%2为0分区
代码:kafkaProducer.send(new ProducerRecord<>("first", "a","hello" + i),
。(这个在生产环境中用的比较多,将其服务作为key,对应放在同一个分区中)
- 没有分区也没有key,选择sticky分区(当这批数据已经满了)
第一次随机选择0号分区,当批次满了(默认16k)或者时间到了,在随机选择一个分区进行使用
代码:kafkaProducer.send(new ProducerRecord<>("first", "hello" + i),
自定义分区:
背景:过滤的码农研究僧
数据放到0分区,没有该数据在1分区
思路:继承Partitioner接口,主要是重写partition方法
public class MyPartitioner implements Partitioner
// 核心方法是这个
// topic 、key、 序列化后的key、value、序列化之后的value
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
// 获取数据 “码农研究僧”
String msgValues = value.toString();
// 定义分区变量
int partition;
if (msgValues.contains("码农研究僧"))
partition = 0;
else
partition = 1;
return partition;
@Override
public void close()
@Override
public void configure(Map<String, ?> configs)
配置kafka生产者对象的时候,需要再这之前将其自定义分区引入
// 关联自定义分区器
// 后面的value值带的是 自定义分区的类名
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.manong.kafka.producer.MyPartitioner");
大致插入的位置如下:
2.3 调优参数
代码模板:
// 创建properties,配置必须要的参数
// 参数需要是连接kafka参数、key和value的序列化、配置缓冲区参数 批次大小 linger.ms等参数
//1. 创建生产者 通过new KafkaProducer ,参数与需要是properties
//2. 发送数据
// 3. 关闭资源
2.3.1 吞吐量
- 0延迟,有数据就拉取(效率低,无延迟):默认0延迟的时候(linger.ms设置为0,范围为5-100ms),只要有数据就会拉取
- 有延迟,通过批次拉取(效率高,有延迟):设置延迟,结合batch.size(默认为16k),数据批次到达16k的时候,在拉取
基于以上的问题,具体优化如下:
提高吞吐量
- 一般会将其两者进行结合(batch.size设置为32k,默认是16k,linger.ms设置为2ms)
- 将其数据进行压缩(拉取的数据更加多了)
- RecordAccumulatator(缓冲区大小)修改为64m(但也有数据延迟)
对于压缩的方式:默认 none,可配置值 gzip、snappy、lz4 和 zstd
实际代码如下:(异步发送进行修改)
public class CustomProducerParameters
public static void main(String[] args)
// 0 配置
Properties properties = new Properties();
// 连接kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"manongkafka:9092,hadoop103:9092");
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms 毫秒需求
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// 1 创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 5; i++)
kafkaProducer.send(new ProducerRecord<>("first","manongkafka"+i));
// 3 关闭资源
kafkaProducer.close();
步骤大致如下:创建生产者、发送数据、关闭资源
创建生产者参数要带入Properties类,所以一开始创建这个类的同时就要连接集群以及配置
2.3.2 可靠性
关于数据可靠的验证:
集群收到数据之后,一般会对数据进行应答
应答的方式有-1,0,1
- 0:生产者一直发送数据,不需要应答(可靠性比较低,效率最高)
缺陷:
leader挂掉之后,再次发送之后会出现数据丢失(没有数据的应答验证)
- 1:生产者发送数据,leader收到数据后进行应答
缺陷:
只有leader收到数据才应答,但是应答完之后leader挂了,某个follower变为leader之后(由于之前的数据已经默认收到了),所以新的leader不会再收到数据
- -1(等同于all):生产者发送数据,leader和isr队列里的所有节点收齐后再应答
缺陷:
leader以及ISR队列(所有节点都有这个数据)都收到所有节点才应答。但如果某个follower挂掉之后,所有follower以及leader都在同步数据,却因为某个follower不能与leader进行同步而延迟
解决方案:
对于第三种解决方案如下:·(心跳机制,动态的同步)
维持一种动态的ISR(和leader保持同步的follower+leader集合)
如果follower长时间(replica.lag.time.max.ms设置为30s,类似心跳机制)没有向leader发送请求或者同步数据,则该follower被踢出ISR,不用长时间等待故障节点
特殊情况:分区数副本只有一个,或者ISR副本只有一个。等同于ack=1,都会有丢失的情况
为了保证数据的可靠性,需要分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
总结
- ack为0,可靠性差,效率高(生产环境很少使用)
- ack为1,只与leader应答,可靠与效率中等(生产环境的应用在普通日志,丢失个别数据可接受)
- ack为-1,发送的数据要与leader以及ISR队列的所有follower应答,可靠性高,效率低(生产环境的应用在可靠性比较强的场景,支付),但是可能会造成数据的重复(数据接收到之后,原本要应答,但是leader挂掉之后,某个follower成为leader,数据又再次发送一遍)
在代码模块中如下:
// 设置ack
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 如果接受不到,会进行重试,默认是int的最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG,3);
2.3.3 幂等性 事务性
- ack为-1,也就是all的时候,最少要发送一次。保证数据不丢失,但是不能保证数据不重复
- ack为0,只要leader应答即可,最多发送一次。保证数据不重复,但是不能保证数据不丢失
生产环境不能重复也不能丢失
kafka0.11版本之后引入了幂等性和事务
幂等性(只能保证单分区会话的不重复)
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。开启参数 enable.idempotence 默认为 true,false 关闭
通过如下标准判断数据的不重复:(只能保证单分区的单会话不能重复
,一旦重启之后还是不能保证幂等性,所以下面会引入事务
)
大致参数如下:
- PID:每次重启的时候都会配置一个新的(宕机之后又是发送一个一样的数据)
- Partition:分区号(分区之间有重复本身不影响)
- Sequence Number:单调自增
在生产环境中开启幂等性的参数:enable.idempotence默认本身就是true,false为关闭
事务性
不想产生任何一条数据,需通过事务性(前提是开启了幂等性)
宕机之后,pid又是一个新值,为此通过事务的全局id
原理:
kafka生产者 在事务协调器中 请求一个pid,并且返回pid给生产者
协调器会将其请求持久化(这个请求是生产者发送的提交请求),后台发送提交请求到主题分区中(再这之前生产者发送的消息到主题中),主题分区返回成功值之后,协调器会将其成功的信息持久化
具体逻辑代码如下:
- 手动指定事务id(保证全局唯一)
- 初始化事务
- 开启事务
- 通过try 发送数据 来 提交事务
- 出现catch 则放弃事务
// 指定事务id,随便取,保证事务唯一即可
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");
// 1 创建kafka生产者对象
// 再这之前还有连接的一些参数
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
try
// 2 发送数据
for (int i = 0; i < 5; i++)
kafkaProducer.send(new ProducerRecord<>("first", "manong" + i));
// 成功的话提交事务
kafkaProducer.commitTransaction();
catch (Exception e)
// 失败的话放弃事务
kafkaProducer.abortTransaction();
finally
// 3 关闭资源
kafkaProducer.close();
2.3.4 顺序性
本身生产者给broker发送的时候,消费者在拉取
单分区是有序的(有条件)
多分区的分区间是无序的
单分区有序的条件:
- kafka1.x版本以前:
max.in.flight.requests.per.connection=1
(不需要考虑是否开启幂等性) - kafka1.新版本之后:
未开启幂等性:max.in.flight.requests.per.connection=1
开启幂等性:max.in.flight.requests.per.connection需要设置小于等于5
之所以开启幂等性之后可以保持有序,在kafka会缓存生产者发来的最近5个请求的元数据,之后发送的时候会对比当前的5个数据,保证最近5个请求的数据都是有序的
3. Broker
以下章节做了整体顺序的调换
3.1 工作原理
kafka中是如何存储数据的,以及如何和zookeeper之间通信
关于zookeeper的补充知识点可看我这篇文章:Zookeeper从入门到精通(全)
对应环境的安装可看这篇文章:猿创征文|分布式国产数据库 TiDB 从入门到实战