Kafka工作流程
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka工作流程相关的知识,希望对你有一定的参考价值。
参考技术A 生产者-Kafka集群-消费者组当生产者往某一个不存在的主题里发数据的时候,它自己会创建一个主题、一个分区和一个副本(server.properties里定义的)。
一般是先会创建一个主题,比如说TopicA,有三个分区,有两个副本(leader+follower总共2个),同一个分区的两个副本肯定不在一个服务器。生产者往三个分区发送消息(发送消息可以批量发送、也可以一个个发送),其中0,1,2,3,4,5叫做偏移量,如图发送了15条消息,每一个分区维护了一个从头开始的偏移量。follower会主动同步(备份)leader的消息(就算同步了,某种特定的情况下也会丢数据),消费者只会找leader消费。
Kafka并不能保证消息的全局有序性,只能保证区内有序。就是说消费消息的时候不是按分区顺序来。
Kafka中的消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。
topic是逻辑上的概念,而partition是物理上的概念。每一个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
Kafka原理你真懂了吗?四万字Kafka教程
文章目录
概述
Kafka 是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。
使用场景
- 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
- 传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
- 度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 流式处理:流式处理是有一个能够提供多种应用程序的领域。
- 限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。
kafka的特性
高吞吐、低延迟
:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。高伸缩性
: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。持久性、可靠性
: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。容错性
: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作。高并发
: 支持数千个客户端同时读写。
消息队列的流派
消息队列的选型需要根据具体应用需求而定,ZeroMQ小而美,RabbitMQ大而稳,Kakfa和RocketMQ快而强劲。
区别:
重topic:Kafka、RocketMQ、ActiveMQ整个broker,依据topic来进⾏消息的中转。在重topic的消息队列⾥必然需要topic的存在
轻topic:RabbitMQ topic只是⼀种中转模式。RabbitMQ的几种消息模型
kafka环境搭建
Docker安装Kafka
环境:CentOS7 docker
-
安装zookeeper
docker pull zookeeper docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper
-
安装Kafka
docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeperIP地址:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://外网IP地址:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka 参数说明: -e KAFKA_BROKER_ID=0 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己 -e KAFKA_ZOOKEEPER_CONNECT=172.21.10.10:2181/kafka 配置zookeeper管理kafka的路径172.21.10.10:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.21.10.10:9092 把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的监听端口 -v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间
-
测试Kafka
进入kafka docker exec -it kafka bash 生产者: ./kafka-console-producer.sh --broker-list localhost:9092 --topic test > hello world 消费者: ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning hello world
生产:
消费:
Kafka配置
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
Kafka操作
1️⃣通过Docker进入kafka容器内部
2️⃣ 常用操作
-
查看当前服务器中所有的topic
kafka-topics.sh --zookeeper zkIP:2181/kafka --list # KAFKA_ZOOKEEPER_CONNECT如果配置了/kafka, 在命令中也要写/kafka
-
创建topic
kafka-topics.sh --zookeeper zkIP:2181/kafka --create --replication-factor 3 --partitions 1 --topic first --topic 定义topic名 --replication-factor 定义副本数 --partitions 定义分区数
-
删除topic
kafka-topics.sh --zookeeper zkIP:2181/kafka --delete --topic first #需要server.properties中设置delete.topic.enable=true否则只是标记删除。
-
发送消息
kafka-console-producer.sh --broker-list KafkaIP:9092 --topic first > hello world > wz
-
消费消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning
-
查看某个topic详情
kafka-topics.sh --zookeeper zkIP:2181/kafka --describe --topic first Topic: first TopicId: izkEI9-GR9uoIiHpmZSulw PartitionCount: 1 ReplicationFactor: 1 Configs:Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
-
修改分区数量
kafka-topics.sh --zookeeper zkIP:2181/kafka --alter --topic first --partitions 6
-
修改某一Topic的数据留存时间
./kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name topicName --alter --add-config log.retention.hours=120
线上集群部署方案:
既然是集群,那必然就要有多个 Kafka 节点机器,因为只有单台机器构成的 Kafka 伪集群只能用于日常测试之用,根本无法满足实际的线上生产需求。而真正的线上环境需要仔细地考量各种因素,结合自身的业务需求而制定。
从操作系统、磁盘、磁盘容量、带宽等方面考虑:
1️⃣ 操作系统:
Kafka 由 Scala 语言和 Java 语言编写而成,编译之后的源代码就是普通的“.class”文件。本来部署到哪个操作系统应该都是一样的,但是不同操作系统的差异还是给 Kafka 集群带来了相当大的影响。
如果考虑操作系统与 Kafka 的适配性,Linux 系统显然要比其他两个特别是 Windows 系统更加适合部署 Kafka。虽然这个结论可能你不感到意外,但其中具体的原因你也一定要了解。主要是在下面这三个方面上,Linux 的表现更胜一筹。
- I/O 模型的使用: Kafka 客户端底层使用了 Java的 selector,selector 在 Linux 上的实现机制是 epoll,而在 Windows 平台上的实现机制
是 select。因此在这一点上将 Kafka 部署在 Linux 上是有优势的,因为能够获得更高效的I/O 性能。 - 数据网络传输效率: 在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。
- 社区支持度: Windows 平台上部署 Kafka 只适合于个人测试或用于功能验证,千万不要应用于生产环境。
2️⃣ 磁盘:
Kafka 大量使用磁盘不假,可它使用的方式多是顺序读写操作,一定程度上规避了机械磁盘最大的劣势,即随机读写操作慢。从这一点上来说,使用 SSD 似乎并没有太大的性能优势,毕竟从性价比上来说,机械磁盘物美价廉,而它因易损坏而造成的可靠性差等缺陷,又由 Kafka 在软件层面提供机制来保证,故使用普通机械磁盘是很划算的。
需不需要使用RAID【[磁盘阵列](RAID(独立磁盘冗余阵列)简介 - 菜鸟-传奇 - 博客园 (cnblogs.com))】?
RAID的两个主要优势是:提供冗余的磁盘存储空间,提供负载均衡。这两个优势对于Kafka而言,Kafka自身实现了冗余机制来提供高可用,另一方面Kafka也通过分区的概念在软件层面自行实现了负载均衡。当然不考虑性价比的情况下,RAID可以搭建。
3️⃣ 磁盘容量:
Kafka 集群到底需要多大的存储空间?这是一个非常经典的规划问题。Kafka 需要将消息保存在底层的磁盘上,这些消息默认会被保存一段时间然后自动被删除。虽然这段时间是可以配置的,但你应该如何结合自身业务场景和存储需求来规划 Kafka 集群的存储容量呢?
在规划磁盘容量时你需要考虑下面这几个元素:
- 新增消息数
- 消息留存时间
- 平均消息大小
- 备份数
- 是否启用压缩
例如:你所在公司有个业务每天需要向Kafka 集群发送 1 亿条消息,每条消息保存两份以防止数据丢失,另外消息默认保存两周时间。现在假设消息的平均大小是 1KB。
数据实际需要总空间:
1 亿 * 1KB * 2 / 1000 / 1000 = 200GB。
预留10%空间,保存两周
220GB * 14 = 3TB
Kafka支持数据压缩,假设比例0.75:
0.75 * 3 = 2.25TB
4️⃣ 带宽:
对于 Kafka 这种通过网络大量进行数据传输的框架而言,带宽特别容易成为瓶颈。
与其说是带宽资源的规划,其实真正要规划的是所需的 Kafka 服务器的数量。假设你公司的机房环境是千兆网络,即 1Gbps,现在你有个业务,其业务目标或 SLA 是在 1 小时内处理 1TB 的业务数据。那么问题来了,你到底需要多少台 Kafka 服务器来完成这个业务呢?
根据实际使用经验,超过 70% 的阈值就有网络丢包的可能性了,故 70% 的设定是一个比较合理的值,也就是说单台 Kafka 服务器最多也就能使用大约 700Mb 的带宽资源。这只是它能使用的最大带宽资源,你不能让 Kafka 服务器常规性使用这么多资源,故通常要再额外预留出 2/3 的资源,即单台服务器使用带宽 700Mb / 3 ≈ 240Mbps。需要提示的是,这里的 2/3 其实是相当保守的,你可以结合你自己机器的使用情况酌情减少此值。
好了,有了 240Mbps,我们就可以计算 1 小时内处理 1TB 数据所需的服务器数量了。根据这个目标,我们每秒需要处理 2336Mb 的数据(1024*1024/3600*8
),除以 240,约等于 10 台服务器。如果消息还需要额外复制两份,那么总的服务器台数还要乘以 3,即 30 台。
计算机网络相关知识:带宽资源一般用Mbps而不是MBps衡量
集群参数配置
Kafka架构和工作流程
基本概念
整体架构图:
生产者:Producer
。消息生产者,就是向Kafka broker发消息的客户端。
消费者: Consumer
。消息消费者,向Kafka borker取消息的客户端。
消费者组:Consumer Group (CG)
。由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
消息:Record
。这里的消息就是指 Kafka 处理的主要对象。
主题:Topic
。主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务;可以理解为一个队列,生产者和消费者面向的都是一个topic。
分区:Partition
。一个有序不变的消息序列。每个主题下可以有多个分区。(为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;)
批次:为了提高效率, 消息会分批次
写入 Kafka,批次就代指的是一组消息。
消息位移:Offset
。表示分区中每条消息的位置信息,是一个单调递增且不变的值。
Broker: 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的follower。
总结图示:
工作流程
topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,**该log文件中存储的就是producer生产的数据。**Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。
文件存储(持久化)机制
磁盘存储位置可以在config/server.properties中的log.dirs中配置;
进入到目录下:
打开.log文件,可以看到之前发送的消息:
(抱歉,我的终端工具似乎有问题,乱码了,fuck是因为后续测试了消息脏话过滤器)
文件存储机制原理:
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制。
将每个partition分为多个segment。每个segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,first这个topic有三个分区,则其对应的文件夹为first-0,first-1,first-2。
Segment 被译为段,将 Partition 进一步细分为若干个 segment,每个 segment 文件的大小相等。
index和log文件以当前segment的第一条消息的offset命名。
# 几个index和log文件例子:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
-
“.index”文件存储大量的索引信息
-
“.log”文件存储大量的数据
-
索引文件中的元数据指向对应数据文件中message的物理偏移地址。
下图为index文件和log文件的结构示意图:
Kafka默认消息保留时间是7天,若想更改可以到server.properties中修改 log.retention.hours属性
Kafka生产者细节
分区策略
1️⃣: 分区的原因:
(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以Partition为单位读写了。
2️⃣ 分区的原则:
我们需要将producer发送的数据封装成一个ProducerRecord对象。
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
数据可靠性保障-ACK
为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。
收到ACK的情况:
没收到ACK的情况:
何时发送ACK??
副本同步策略
方案 | 优点 | 缺点 |
---|---|---|
半数以上完成同步,就发送ack | 延迟低 | 选举新的leader时,容忍n台节点的故障,需要2n+1个副本 |
全部完成同步,才发送ack | 选举新的leader时,容忍n台节点的故障,需要n+1个副本 | 延迟高 |
Kafka选择了第二种方案,原因如下:
1.同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
2.虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。
ISR
采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。这个问题怎么解决呢?
Leader维护了一个动态的in-sync replica set (ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。
ACK应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。
所以Kafka为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。
ack参数配置:
-
0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;
-
1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
-
-1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。
0,1两种配置容易理解,-1的情况如下图所示:
offset细节 LEO、 HW
LEO:指的是每个副本最大的offset;
HW:指的是消费者能见到的最大的offset,ISR队列中最小的LEO。
故障处理细节
(1)follower故障
follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。
(2)leader故障
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
Exactily Once
翻译:【精确一次】
- 将服务器的ACK级别设置为**-1**,可以保证Producer到Server之间不会丢失数据,即At Least Once语义。
- 将服务器的ACK级别设置为0,可以保证生产者每条消息只会被发送一次,即At Most Once语义。
但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即Exactly Once语义
在0.11版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
0.11版本的Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指Producer不论向Server发送多少次重复数据,Server端都只会持久化一条。幂等性结合At Least Once语义,就构成了Kafka的Exactly Once语义。即:
At Least Once + 幂等性 = Exactly Once
要启用幂等性,只需要将Producer的参数中enable.idompotence
设置为true
即可。
Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一Partition的消息会附带Sequence Number。而Broker端会对<PID, Partition, SeqNumber>
做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。
但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
拓展: 如果想要实现跨分区跨会话上的消息无重复该怎么做呢?—> 事务
Kafka消费者细节
消费方式
- consumer采用pull(拉)模式从broker中读取数据。
- push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。
push 模式中broker的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。
pull模式则可以根据consumer的消费能力以适当的速率消费消息。
**pull模式不足之处:**如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,这段时长即为timeout。
轮询
我们知道,Kafka 是支持订阅/发布模式的,生产者发送数据给 Kafka Broker,那么消费者是如何知道生产者发送了数据呢?其实生产者产生的数据消费者是不知道的,KafkaConsumer 采用轮询的方式定期去 Kafka Broker 中进行数据的检索,如果有数据就用来消费,如果没有就再继续轮询等待。
轮询源码:
try
while (true)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records)
int updateCount = 1;
if (map.containsKey(record.value()))
updateCount = (int) map.get(record.value() + 1);
map.put(record.value(), updateCount);
finally
consumer.close();
- 这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过轮询的方式向 Kafka 请求数据。
- 第三行代码非常重要,Kafka 必须定期循环请求数据,否则就会认为该 Consumer 已经挂了,会触发重平衡,它的分区会移交给群组中的其它消费者。**传给
poll()
方法的是一个超时时间,**用java.time.Duration
类来表示,如果该参数被设置为 0 ,poll() 方法会立刻返回,否则就会在指定的毫秒数内一直等待 broker 返回数据。 - poll() 方法会返回一个记录列表。每条记录都包含了记录所属主题的信息,记录所在分区的信息、记录在分区中的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理每条记录。
- 在退出应用程序之前使用
close()
方法关闭消费者。网络连接和 socket 也会随之关闭,并立即触发一次重平衡,而不是等待群组协调器发现它不再发送心跳并认定它已经死亡。
分区分配策略
一个consumer group中有多个consumer,一个 topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费。
Kafka有两种分配策略,RoundRobin,Range。
RoundRobin:
Range:
offset的维护
由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
Kafka 0.9版本之前,consumer默认将offset保存在Zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为**__consumer_offsets**。
consumer.properties中:
#这个参数用于是否把内部topic的信息(例如offset)暴露给cosumer,如果设置为true,就只能通过订阅的方式来获取内部topic的数据。
exclude.internal.topics=false
消费者重平衡
情景说明:
最初是一个消费者订阅一个主题并消费其全部分区的消息,后来有一个消费者加入群组,随后又有更多的消费者加入群组,而新加入的消费者实例
分摊
了最初消费者的部分消息,这种把分区的所有权通过一个消费者转到其他消费者的行为称为重平衡
重平衡非常重要,它为消费者群组带来了高可用性
和 伸缩性
,我们可以放心的添加消费者或移除消费者,不过在正常情况下我们并不希望发生这样的行为。在重平衡期间,消费者无法读取消息,造成整个消费者组在重平衡的期间都不可用。另外,当分区被重新分配给另一个消费者时,消息当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
消费者通过向组织协调者
(Kafka Broker)发送心跳来维护自己是消费者组的一员并确认其拥有的分区。对于不同不的消费群体来说,其组织协调者可以是不同的。只要消费者定期发送心跳,就会认为消费者是存活的并处理其分区中的消息。当消费者检索记录或者提交它所消费的记录时就会发送心跳。
如果过了一段时间 Kafka 停止发送心跳了,会话(Session)就会过期,组织协调者就会认为这个 Consumer 已经死亡,就会触发一次重平衡。如果消费者宕机并且停止发送消息,组织协调者会等待几秒钟,确认它死亡了才会触发重平衡。在这段时间里,死亡的消费者将不处理任何消息。在清理消费者时,消费者将通知协调者它要离开群组,组织协调者会触发一次重平衡,尽量降低处理停顿。
总结:
- 重平衡会影响我们程序的吞吐量
- 排查重平衡发生的原因,避免Consumer端异常情况下的重平衡
- 通过设定Consumer端的参数,或者进行JVM调优避免不必要的重平衡
拓展资料:极客时间-Kafka核心技术实战:25|消费者组重平衡全流程解析
Kafka高效读写数据
顺序写磁盘
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械结构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
零复制(零拷贝)技术
从字面上理解就是数据不需要多次拷贝,系统性能大幅度提升。其实,不仅在kafka中,Java NIO,netty,rocketMQ等框架中也都用到了零拷贝。
缓存I/O又被称作标准I/O,是大多少文件系统默认I/O。为了减少读盘的次数,同时也为了保护系统本身的安全,缓存I/O在一定程度上分离了内核空间和用户空间。但也因此,数据在传输过程中需要在用户空间和内核空间之间进行多次的拷贝,这些数据拷贝操作所带来的CPU以及内存开销是非常大的。
下图示例列出了一次缓存IO读和写需要经过的步骤:
对于缓存I/O,一个读操作有3次数据拷贝,一次写操作又会有3次的数据拷贝。
读操作:磁盘->内核缓存区->用户空间缓存区->应用程序内存。
写操作:应用程序内存->用户空间缓存区->Socket缓存区->网络。
直接I/O
直接IO就是指没有用户级的缓存区,但是内核缓存区还是有的。这样就减少一次从内核缓冲区到用户程序缓存的数据拷贝。如下图所示:
内存映射文件
首先,映射的意思就是建立一种一一对应关系,是指硬盘上文件的位置与进程逻辑地址的对应关系。这种对应关系纯属是逻辑上的概念,物理上是不存在的。在内存映射的过程中,并没有实际的数据拷贝,文件没有被载入内存,只是逻辑上被放入了内存,内存中实际只是一个逻辑地址。
如上图所示,数据不再经过应用程序内存,直接从内核缓存区到socket缓冲区。
零拷贝
零拷贝连内核缓存区到socket缓存区也省了,底层的网卡驱动程序直接读取内核缓存区的数据并发送到网络。在整个过程中,只发生了2次数据拷贝。一次是从磁盘到内核缓存区,另一次是从内核缓存区到网络。既然有发生数据拷贝,为什么还叫“零拷贝”,那是因为所说的零拷贝是指数据在内存中没有发生数据拷贝。
Kafka的Controller
Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。
Controller的管理工作都是依赖于Zookeeper的。
下面是partition 的 leader 选举过程:
如果leader节点挂掉了:
Kafka事务
Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka在Exactly Once语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。
Producer事务
为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。
为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。
Consumer事务
上述事务机制主要是从Producer方面考虑,对于Consumer而言,事务的保证就会相对较弱,尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。
Kafka的压缩机制
Kafka 的消息分为两层:消息集合 和 消息。一个消息集合中包含若干条日志项,而日志项才是真正封装消息的地方。Kafka 底层的消息日志由一系列消息集合日志项组成。Kafka 通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入
操作。
在 Kafka 中,压缩会发生在两个地方:Kafka Producer 和 Kafka Consumer,为什么启用压缩?说白了就是消息太大,需要变小一点
来使消息发的更快一些。
Kafka Producer 中使用 compression.type
来开启压缩
private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String,String> record =
new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");
上面代码表明该 Producer 的压缩算法使用的是 GZIP
有压缩必有解压缩,Producer 使用压缩算法压缩消息后并发送给服务器后,由 Consumer 消费者进行解压缩,因为采用的何种压缩算法是随着 key、value 一起发送过去的,所以消费者知道采用何种压缩算法。
Kafka API
Producer API
消息发送流程
Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator【记录累计器,充当一个队列】。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。
消息发送的流程图:
相关参数:
- batch.size: 只有数据积累到batch.size之后,sender才会发送数据
- linger.ms: 如果数据迟迟未到达batch.size,sender等待linger.time 之后就会发送数据。
异步发送API
-
导入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency>
-
需要的类:
- KafkaProducer:需要创建一个生产者对象,用来发送数据 - ProducerConfig:获取所需的一系列配置参数 - ProducerRecord:每条数据都要封装成一个ProducerRecord对象
-
API
不带回调函数的API:
public class CustomProducer public static void main(String[] args) throws ExecutionException, InterruptedException Properties props = new Properties(); //kafka集群,broker-list props.put("bootstrap.servers","kafka集群地址"); props.put("acks", "all"); //重试次数 props.put(以上是关于Kafka工作流程的主要内容,如果未能解决你的问题,请参考以下文章