猿创征文 | kafka框架从入门到精通(全)

Posted 码农研究僧

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了猿创征文 | kafka框架从入门到精通(全)相关的知识,希望对你有一定的参考价值。

目录

前言

关于java其他方面的知识点可看我之前的文章:
java框架零基础从入门到精通的学习路线(超全)

以下内容的学习主要来源于:
【尚硅谷】2022版Kafka3.x教程(从入门到调优,深入全面)

1. 入门kafka

应用场景举例:
前端浏览了网站,记录了数据(点赞、评论量等)变成日志,发送到日志服务器,日志服务器(通过Flume时刻监控服务器,只要一有数据变化)上传到Hadoop。
Flume(上传速度为100m/s左右)和Hadoop(采集速度小于100m/s,而且高峰期可能大于200m/s)两者的传输速率不同。可以增加一个kafka的中间件,将其大量的数据都放在kafka,之后将其数据与后面的Hadoop数据的速率保持一致即可

定义:
Kafka传统定义:分布式、基于发布/订阅(发布的消息分为不同类别,订阅者只接受感兴趣的消息,订阅者订阅的速度通过自身决定)的消息队列,主要用于大数据实时处理领域。
最新的定义:开源的分布式事件流平台、数据通道、流分析、数据集成和关键任务应用

消息队列:kafka(大数据)、ActiveMQ(JAVAEE)、RabbitMQ

功能:

  1. 解耦(只需保证生产者和消费者两侧的接口即可。也就是数据源放到消息队列中,之后消息 列将其对应的消息分发给消费者)
  2. 异步(例如注册信息,调用发送信息的接口(此处使用消息队列),内部核心的处理结果在后台,先回馈信息给客户)
  3. 消峰(数据量多,扛不住这么大的消息,可以将其缓存到消息队列中)

两种模式:

  • 点对点(消费完之后MQ删除其信息)
  • 发布/订阅(每个消费者相互独立、消费数据后不删除数据)

基础架构:(讲讲基本的框架流程)
分区主要是为了方便扩展,提高了吞吐量,一个topic分为多个partition(一个区可能存储不了所有的数据,所以只能分区)
一个分区的数据只能由一个消费者消费
保证数据的可用性,每个partition增加若干副本
生产者的leader挂掉之后,follower有条件升级为leader

还有一些数据是存储在zookeeper中
记录服务器节点的信息
每个分区的相关信息(谁是消费者,谁是生产者)
kafka2.8之后可以不用zookeeper

1.1 安装配置

关于这部分的安装可看我之前的文章:
Kafka在Linux服务器下载安装配置等详细图文版本(全)

多个服务器同时启动kafka,脚本如下:(对应的启动位置还有服务器名称替换成自已的即可)

#!/bin/bash

case $1 in
"start")
	for i in x,y,z
	do
		echo "启动"
		ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
		done
;;
"stop")
	for i in x,y,z
	do
		echo "停止"
		ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
		done
;;
esac

1.2 命令操作

本身kafka有生产者、消费者、broker
对应每个模块都有它的启动配置

1.2.1 topic

执行bin/kafka-topics.sh内部有很多的option的操作

大致的option命令如下:

操作描述
bootstrap-server<String: server to connect to>(可以写多个集群)连接kafka broker主机名称和端口号
topic<String: topic>(可以执行增删改查)指定topic名称
create增加topic
delete删除topic
alter修改topic
list显示所有topic
describe查看某个topic的详细信息
partitions<Integer:#of partitions>设置分区数
replication-factor<Integer: replication factor>设置分区副本
config<String:name == value>更新系统默认的配置

主要的逻辑如下,连接kafka之后,指定某个topic进行增删改查,以及增加分区和副本数量,对某个分区进行升级等

使用的过程中如果出现问题可看这篇文章:
kafka创建、启动topic遇到的bug汇总 解决方法

bin/kafka-topics.sh 模块

  • 查看对应的主题:bin/kafka-topics.sh --bootstrap-server manongkafka:9092 --list(在配置文件中我的别名为manongkafka 也可使用localhost代表本机)

  • 创建主题:bin/kafka-topics.sh --bootstrap-server manongkafka:9092 --create --partitions 1 --replication-factor 1 --topic first(创建完主题之后还要创建分区以及指定副本)

对应查看topic以及详细的信息(副本数量此处我创建了一个)

  • 查看topic的详细信息:bin/kafka-topics.sh --bootstrap-server manongkafka:9092 --topic first --describe

分区数只能增加不能减少(减少的时候原先的混合在一起,导致不知道在哪个分区查找)

  • 增加分区:bin/kafka-topics.sh --bootstrap-server manongkafka:9092 --topic first --alter --partitions 3

副本的数量也可增加(但是此处不能使用命令行增加)

  • 删除topic:bin/kafka-topics.sh --bootstrap-server manongkafka:9092 --delete --topic first

1.2.2 生产者 消费者

具体通过configs目录下的bin/kafka-console-producer.sh
大致命令:bin/kafka-console-producer.sh --bootstrap-server manongkafka:9092 --topic first

参数描述
–bootstrap-server <String: server toconnect to>连接 Kafka Broker 主机名称和端口号
–topic <String: topic>操作的 topic 名称

连接集群,在某个集群中添加数据

生产者的数据打到了topic中,消费者消费对应的topic即可(消费者和生产者参数差不多)
通过configs目录下的bin/kafka-console-consumer.sh
大致命令:bin/kafka-console-consumer.sh --bootstrap-server manongkafka:9092 --topic first

一般数据都会保存在topic中,可以将其保存7天的数据都加载过来(但有些数据可能不会使用到),参数为:--from-beginning

大致命令:bin/kafka-console-consumer.sh --bootstrap-server manongkafka:9092 --topic first --from-beginning

2. 生产者

生产者如何发送数据到topic中

外部的接收数据发送给生产者,具体操作流程如下:
通过生产者工程的主线程,使用send(ProducerRecord)发送,经过拦截器(加工)、序列化器(一般使用自带)、分区器(哪个分区进行存储,一个分区创建一个队列,这些都是存储在内存中,默认缓冲队列是32m,一个batch为16k)

如下所示:

存放在队列中的数据,通过sender线程(拉取数据)
具体数据什么时间点拉取(通过数据量的大小以及等待数据的时间)

  • batch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k
  • linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟,也就是只要有数据,就会拉取数据(batch.size就会失效)

拉取数据到kafka的时候,本身要发送到kafka集群,以每个集群的节点为例,一个队列的数据发送的请求放在一个队列中存储,如果请求接收不到,一般还会在发送(超过5个没接收应答,就不会在拉取)。类似滑动窗口

收到数据之后一般都会有个应答机制(0 、1 和 -1),发送成功之后,会进行副本的复制,以及清除之前队列的数据。发送失败则会进行重试

  • 0:生产者发送数据,不需要等数据到达后应答
  • 1:生产者发送数据,Leader收到数据后应答
  • -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答,-1和all等价

主要的参数讲解:

参数描述
bootstrap.servers连接集群所需的 broker 地址,如果有多个可用逗号隔开(并非需要所有地址,给定一个broker就可找到其他broker)
key.serializer 和 value.serializer发送信息的key以及value序列化类型
buffer.memoryRecordAccumulator 缓冲区总大小(默认 32m)
batch.size缓冲区一批数据的最大值(默认16k),提高值可增加吞吐量,太高延迟会加大
linger.ms数据没到达最大值,sender到达时间最大值会发送数据(默认0ms,无延迟)。production建议为5-100ms
acks默认是-1(all),还有0和1
max.in.flight.requests.per.connection最多没返回的ack次数(默认5),开启幂等性要保证该值是 1-5 的数字
retries发送错误会重发(设置此处,默认int最大值),若要保证有序性,需设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1
retry.backoff.ms两次重试之间的时间间隔(默认100ms)
enable.idempotence是否开启幂等性(默认true开启)
compression.type生产者发送的所有数据的压缩方式(默认none不压缩),其他类型有:none、gzip、snappy、lz4 和 zstd

2.1 异步&同步 发送

何为异步通信和同步通信?

同步通信是全部所有任务都完成之后才能返回
异步发送是只要执行接口就可返回,具体核心函数不用执行到结束才返回
在kafka中的异步发送:外部数据发送到队列(不管数据有没到达kafka)

异步接口不带回调函数,(也就是在终端中没有返回值)

示例代码:

引入maven依赖文件

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

核心代码(此处的异步发送是没有回调函数的):

创建kafka生产者对象(再这之前需要配置 、连接服务器对应的topic 以及序列化)

public class CustomProducer 

    public static void main(String[] args) 

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        // 防止一个挂掉之后还可以启动另外一个
        //properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"manongkafka:9092");
		
        // 指定对应的key和value的序列化类型 key.serializer,必须要配置这两个参数
        // StringSerializer.class.getName()  等同于它的全类名 org.apache.kafka.common.serialization.StringSerializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) 
            kafkaProducer.send(new ProducerRecord<>("first","码农研究僧"+i));
        

        // 3 关闭资源
        kafkaProducer.close();
    

启动之后,截图如下:


异步接口带回调函数:

发送函数send有两个,其中一个带有回调参数

ProducerRecord有大致如下函数,参数有哪个分区哪个key、value等(在下述的分区中会讲这些函数的区别)

主要修改上面的核心代码:

// 2 发送数据
for (int i = 0; i < 5; i++) 
    kafkaProducer.send(new ProducerRecord<>("first", "manongyanjiuseng" + i), new Callback() 
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) 

            if (exception == null)
                System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
            
        
    );

    Thread.sleep(2);

截图如下:


同步发送
同样修改上面的核心代码,加入get函数捕捉
类似future线程,只不过这里通过get获取,等待异步调用的结果(需要发送完毕数据之后才会返回结果)
通过send函数发送到队列 ,再从队列发送到broker中来返回结果

// 2 发送数据
for (int i = 0; i < 5; i++) 
    kafkaProducer.send(new ProducerRecord<>("first","manongyanjiuseng"+i)).get();

2.2 分区

数据过大不适合存放在一台服务器上,需要进行分区存储

分区好处:

  • 合理使用存储资源:每个分区都在在一个Broker上存储,海量的数据按照分区切割成一块一块数据存储在多台Broker上(合理控制分区,实现负载均衡,容错性)
  • 提高并行度(效率增加):生产者可以以分区为单位发送数据;消费者以分区为单位进行消费数据。

分区策略:(先看图)

  • 指定分区策略使用

比如指定分区数,不指定key(按照指定的分区发送)

// 2 发送数据
for (int i = 0; i < 5; i++) 
    kafkaProducer.send(new ProducerRecord<>("first", 1,"","hello" + i), new Callback() 
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) 

            if (exception == null)
                System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
            
        
    );

    Thread.sleep(2);

  • 没有指定分区,但是有key,则基于key进行hash(散列)
    key1的hash为5,key2的hash为6,分区数为2,则5%2为1分区,6%2为0分区

代码:kafkaProducer.send(new ProducerRecord<>("first", "a","hello" + i),。(这个在生产环境中用的比较多,将其服务作为key,对应放在同一个分区中)

  • 没有分区也没有key,选择sticky分区(当这批数据已经满了)
    第一次随机选择0号分区,当批次满了(默认16k)或者时间到了,在随机选择一个分区进行使用

代码:kafkaProducer.send(new ProducerRecord<>("first", "hello" + i),


自定义分区:

背景:过滤的码农研究僧数据放到0分区,没有该数据在1分区

思路:继承Partitioner接口,主要是重写partition方法

public class MyPartitioner implements Partitioner 
	// 核心方法是这个
	// topic 、key、 序列化后的key、value、序列化之后的value
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) 

        // 获取数据 “码农研究僧”
        String msgValues = value.toString();
		
		// 定义分区变量
        int partition;

        if (msgValues.contains("码农研究僧"))
            partition = 0;
        else 
            partition = 1;
        

        return partition;
    

    @Override
    public void close() 

    

    @Override
    public void configure(Map<String, ?> configs) 

    

配置kafka生产者对象的时候,需要再这之前将其自定义分区引入

// 关联自定义分区器
// 后面的value值带的是 自定义分区的类名
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.manong.kafka.producer.MyPartitioner");

大致插入的位置如下:

2.3 调优参数

代码模板:

// 创建properties,配置必须要的参数
// 参数需要是连接kafka参数、key和value的序列化、配置缓冲区参数 批次大小 linger.ms等参数

//1. 创建生产者 通过new KafkaProducer ,参数与需要是properties

//2. 发送数据

// 3. 关闭资源

2.3.1 吞吐量

  • 0延迟,有数据就拉取(效率低,无延迟):默认0延迟的时候(linger.ms设置为0,范围为5-100ms),只要有数据就会拉取
  • 有延迟,通过批次拉取(效率高,有延迟):设置延迟,结合batch.size(默认为16k),数据批次到达16k的时候,在拉取

基于以上的问题,具体优化如下:

提高吞吐量

  • 一般会将其两者进行结合(batch.size设置为32k,默认是16k,linger.ms设置为2ms)
  • 将其数据进行压缩(拉取的数据更加多了)
  • RecordAccumulatator(缓冲区大小)修改为64m(但也有数据延迟)

对于压缩的方式:默认 none,可配置值 gzip、snappy、lz4 和 zstd

实际代码如下:(异步发送进行修改)

public class CustomProducerParameters 

    public static void main(String[] args) 

        // 0 配置
        Properties properties = new Properties();

        // 连接kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"manongkafka:9092,hadoop103:9092");

        // 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        // 批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        // linger.ms  毫秒需求
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");


        // 1 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) 
            kafkaProducer.send(new ProducerRecord<>("first","manongkafka"+i));
        

        // 3 关闭资源
        kafkaProducer.close();
    

步骤大致如下:创建生产者、发送数据、关闭资源
创建生产者参数要带入Properties类,所以一开始创建这个类的同时就要连接集群以及配置

2.3.2 可靠性

关于数据可靠的验证:

集群收到数据之后,一般会对数据进行应答
应答的方式有-1,0,1

  • 0:生产者一直发送数据,不需要应答(可靠性比较低,效率最高)

缺陷:
leader挂掉之后,再次发送之后会出现数据丢失(没有数据的应答验证)

  • 1:生产者发送数据,leader收到数据后进行应答

缺陷:
只有leader收到数据才应答,但是应答完之后leader挂了,某个follower变为leader之后(由于之前的数据已经默认收到了),所以新的leader不会再收到数据

  • -1(等同于all):生产者发送数据,leader和isr队列里的所有节点收齐后再应答

缺陷:
leader以及ISR队列(所有节点都有这个数据)都收到所有节点才应答。但如果某个follower挂掉之后,所有follower以及leader都在同步数据,却因为某个follower不能与leader进行同步而延迟

解决方案:
对于第三种解决方案如下:·(心跳机制,动态的同步)
维持一种动态的ISR(和leader保持同步的follower+leader集合)
如果follower长时间(replica.lag.time.max.ms设置为30s,类似心跳机制)没有向leader发送请求或者同步数据,则该follower被踢出ISR,不用长时间等待故障节点

特殊情况:分区数副本只有一个,或者ISR副本只有一个。等同于ack=1,都会有丢失的情况
为了保证数据的可靠性,需要分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

总结

  • ack为0,可靠性差,效率高(生产环境很少使用)
  • ack为1,只与leader应答,可靠与效率中等(生产环境的应用在普通日志,丢失个别数据可接受)
  • ack为-1,发送的数据要与leader以及ISR队列的所有follower应答,可靠性高,效率低(生产环境的应用在可靠性比较强的场景,支付),但是可能会造成数据的重复(数据接收到之后,原本要应答,但是leader挂掉之后,某个follower成为leader,数据又再次发送一遍)

在代码模块中如下:

// 设置ack
properties.put(ProducerConfig.ACKS_CONFIG,"1");

// 如果接受不到,会进行重试,默认是int的最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG,3);

2.3.3 幂等性 事务性

  1. ack为-1,也就是all的时候,最少要发送一次。保证数据不丢失,但是不能保证数据不重复
  2. ack为0,只要leader应答即可,最多发送一次。保证数据不重复,但是不能保证数据不丢失

生产环境不能重复也不能丢失
kafka0.11版本之后引入了幂等性和事务

幂等性(只能保证单分区会话的不重复)

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。开启参数 enable.idempotence 默认为 true,false 关闭

通过如下标准判断数据的不重复:(只能保证单分区的单会话不能重复,一旦重启之后还是不能保证幂等性,所以下面会引入事务
大致参数如下:

  • PID:每次重启的时候都会配置一个新的(宕机之后又是发送一个一样的数据)
  • Partition:分区号(分区之间有重复本身不影响)
  • Sequence Number:单调自增

在生产环境中开启幂等性的参数:enable.idempotence默认本身就是true,false为关闭

事务性
不想产生任何一条数据,需通过事务性(前提是开启了幂等性)
宕机之后,pid又是一个新值,为此通过事务的全局id

原理:
kafka生产者 在事务协调器中 请求一个pid,并且返回pid给生产者

协调器会将其请求持久化(这个请求是生产者发送的提交请求),后台发送提交请求到主题分区中(再这之前生产者发送的消息到主题中),主题分区返回成功值之后,协调器会将其成功的信息持久化

具体逻辑代码如下:

  1. 手动指定事务id(保证全局唯一)
  2. 初始化事务
  3. 开启事务
  4. 通过try 发送数据 来 提交事务
  5. 出现catch 则放弃事务
// 指定事务id,随便取,保证事务唯一即可
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");

// 1 创建kafka生产者对象
// 再这之前还有连接的一些参数
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 初始化事务
kafkaProducer.initTransactions();

// 开启事务
kafkaProducer.beginTransaction();

try 
    // 2 发送数据
    for (int i = 0; i < 5; i++) 
        kafkaProducer.send(new ProducerRecord<>("first", "manong" + i));
    
	
	// 成功的话提交事务
    kafkaProducer.commitTransaction();
 catch (Exception e) 
	// 失败的话放弃事务
    kafkaProducer.abortTransaction();
 finally 
    // 3 关闭资源
    kafkaProducer.close();

2.3.4 顺序性

本身生产者给broker发送的时候,消费者在拉取
单分区是有序的(有条件)
多分区的分区间是无序的

单分区有序的条件:

  • kafka1.x版本以前:max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)
  • kafka1.新版本之后:
    未开启幂等性:max.in.flight.requests.per.connection=1
    开启幂等性:max.in.flight.requests.per.connection需要设置小于等于5

之所以开启幂等性之后可以保持有序,在kafka会缓存生产者发来的最近5个请求的元数据,之后发送的时候会对比当前的5个数据,保证最近5个请求的数据都是有序的

3. Broker

以下章节做了整体顺序的调换

3.1 工作原理

kafka中是如何存储数据的,以及如何和zookeeper之间通信

关于zookeeper的补充知识点可看我这篇文章:Zookeeper从入门到精通(全)
对应环境的安装可看这篇文章:猿创征文|分布式国产数据库 TiDB 从入门到实战

猿创征文|Promethues入门,看懂不会写

猿创征文|Promethues入门,看懂不会写

猿创征文|Promethues入门,看懂不会写

猿创征文|我的四个月Java学习成长之路——从基础到框架再到项目

猿创征文|kafka从零到1的全过程