(2021年4月20日)kafka详解

Posted Mr. Dreamer Z

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(2021年4月20日)kafka详解相关的知识,希望对你有一定的参考价值。

kafka详解

kafka是一个分布式的发布/订阅的消息队列

1.kafka的核心组件


Producer:消息生产者,产生的消息会被发送到某个topic。主动将消息发给broker。
Consumer:消息消费者,消费的消息内容来自于某个topic。主动向topic拉取消息(因为消费水平不同,不能指定推给consumer)。
Topic:消息根据topic进行归类,topic本质上是一个目录,将同一主题消息归类到同一个目录。
Broker:每台kafka服务器节点就是一个broker,一个broker可以有多个topic。
Zookeeper:zookeeper集群不属于kafka内的组件,但kafka依赖zookeeper集群保存meta信息。以及leader的选举和follower的同步

2.kafka数据处理步骤

1.producer产生消息,发送到broker中。
2.leader状态的broker接收消息,写入到相应的topic。
3.leader状态的broker接收完毕以后,传给follow状态的broker作为副本备份。
4.consumer消费broker中的消息。

3.kafka名词解释和工作方式

Producer
消息生产者,就是像broker发送消息的客户端

Consumer
消息消费者,就是像broker拉去消息的客户端

Topic
本质上是一个目录

Consumer Group(CG)
消费者组,这个是kafka用来实现topic消息的广播(发送给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partition只会把消息发给该CG中的一个consumer。如果需要实现广播,只需要让每个consumer有一个独立的CG就可以。要实现单播只要所有的consumer在同一个CG中即可。

Broker
一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

Partition
为了实现扩展性,一个非常大的topic可以分布到多个broker(服务器)上,一个topic可以分为多个partition。每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只需要安一个partition中的顺序将消息发给consumer,不保证一个topic整体的顺序(多个partition之间)。

4.kafka和topic的关系

kafka中消息是以topic进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。
topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。producer生产的数据会被不断追加到该log文件的末尾,且每一条数据都有它自己的offset。消费组中的每个消费者,都会实时记录自己消费到哪个offset,以便出错恢复时,从上次的位置继续消费。

由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件——".index"文件和".log"文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如:first这个topic有三个分区,则对应的文件夹名称为first-0,first-1,first-2。

index和log文件以当前segment的第一条消息的offset命名。

“.index"文件存储大量的索引信息,”.log"文件存储大量的数据

5.Consumer和topic的关系

kafka只支持topic

每个group中可以有多个consumer,每个consumer属于一个consumer group。通常情况下,一个group中会包含多个consumer,这个不仅可以提高topic中消息的并发消费能力,而且还能提高“故障容错”性,如果group中的某个consumer失效,那么其消费的partition将会有其他consumer自动接管。

对于topic中的一条特定的消息,只会被订阅此topic中的某一个group中的其中一个consumer消费,此消息不会发送给一个group中的多个consumer;

在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);一个topic中的每个partition,只会被一个group中的一个consumer消费,但是一个consumer可以同时消费多个partition中的消息。

kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从topic角度来说,当有多个partition时,消费仍然不是全局有序的。

注意:同一topic中partition的Leader和Follower不会在同一个broker中。


消费组和分区的关系





如部分上图所示,可以看出,一个主题中的分区必须都被消费。但是一个topic中的分区最多只能被消费者群组中的一个消费者消费

6.kafka消息的发布

Producer客户端负责消息的发布

kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含“集中存活的servers列表”、“partitions leader列表”等信息;

当producer获取到metadata信息之后,producer将会和topic下所有的partition leader保持socket连接;

消息由producer直接通过scoket发送到broker,中间不会经过任何"路由层"。事实上,消息被路由到哪个partition上由producer客户端决定,比如采用"随机",“轮训”,"key-hash"等。

如果一个topic中有个多个partition,那么在producer端实现"消息均衡分发"是必要的。

在producer端的配置文件中,开发者可以指定partition的路由方式。

producer消息发送的应答机制
设置发送数据是否需要服务端的反馈,有三个值0,1,-1
0:producer不会等待broker发送ack,producer发送之后就不管了;
1:当leader接收到消息之后,等待leader落盘成功后,发送ack;
-1:等待leader和follower都落盘成功后,发送ack;此方式不会丢失消息,但是可能会导致消费者重复消费;

request.required.acks=0

这里说明一下,很多面试都会问到避免消息丢失和重复消费。
先说消息丢失,这个通常是在生产者方面丢消息给broker。此时ack设置为-1,会让消息落盘到leader和follower。之后相应ack。
再来说下重复消费:
生产者方面
重复消费其实在生产者方面就会出现,比如出现异常但是offset没有及时修改之类的。此时我们可以在producer方引入幂等性操作,幂等性操作提供了PID(producerID)和sequenceNum ID(决定topic和partition)
ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。

enable.idempotence=true

消费者方面:
手动提交偏移量(可以解决,但是不建议)
如果在生产者方面没有做幂等操作,那么就需要在消费时创建一个表来进行重复排查。

7.Conusmer的负载均衡

当一个group中,有consumer加入或者离开时,会触发partition均衡,均衡的最终目的是提高topic的并发消费能力,步骤:
1.加入topic1,具有partition:P0,P1,P2,P3
2.加入groupA,有如下consumer:C0,C1
3.首先根据partition索引号对partition排序:P0,P1,P2,P3
4.根据consumer.id排序:C0,C1
5.计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6.然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

8.副本数据同步

8.1 同步策略


第二种情况,当ack=-1时,如果所有leader和follower都落盘之后再返回ack。那么会出现该问题:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那么leader就要一直等待下去。那么我们该怎么办呢?
Leader维护一个动态的in-sync relica set(ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则将follower踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会ISR中选举新的leader。
我们可以想想为什么要用时间来来进行判断呢,而不能用leader和follower同步之后相差的条数来进行判定踢出的条件呢?这样不是更方便吗?
试想一下,如果由条数作为条件。那么如果follower向leader拉取条数时条数拉取的较少,而导致follower同步的时候就达到了leader和follower的阈值条件,导致ISR直接丢掉。

ISR : 速率和leader相差低于10秒的follower的集合
OSR : 速率和leader相差大于10秒的follower/集合
AR : 所有分区的follower,AR=ISR+OSR

8.2 故障处理细节


follower故障:
follower发生故障之后会被踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分全部截取,从HW开始向leader进行同步。等待follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了。
注意:截取该ISRHW后面部分,丢掉(为了避免错误)。由于之前已经读取了HW,所以只需要向leader的HW开始同步即可。

leader故障:
leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。
注意:截取HW后面部分,丢掉。然后从新的leader的HW开始同步。

LEO:指的是每个副本最大的offset。
HW:指的是消费者能见到的最大的offset,ISR中最小的LEO

9.zookeeper在kafka中的作用

kafka集群中有一个broker会被选举为controller,负责管理集群broker上下线,所有topic的分区副本分配和leader选举等工作。
而Controller的管理工作都是依赖于zookeeper。

10.消费发送方式

1.异步发送(默认)
2.同步发送
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需要在调用Future对象的get方法即可。

11.消息提交方式

11.1 自动提交

enable.auto.commit:true
注意,如果为false,说明我们不需要由kafka自动进行提交偏移量。因此auto.commit.interval.ms(延时提交)也会失效。

11.2 手动提交

手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是:commitSync阻塞当前线程,一直到提交成功,并且失败会重试;而commitSync则没有重试机制,故有可能提交失败。
其实无论自动提交或者手动提交(同步提交还是异步提交)offset,都可能造成数据的漏消费或者重复消费。先提交offset后消费,有可能造成数据的漏消费;而先消费后提交offset,有可能会造成数据重复消费。
所以,一般比较手动提交保险的办法就是使用异步+同步的方式进行提交。

12.kafka为什么这么快

1.partition 并行处理
2.顺序写磁盘,充分利用磁盘特性
3.利用了现代操作系统分页存储 Page Cache 来利用内存提高 I/O 效率
4.采用了零拷贝技术
5.Producer 生产的数据持久化到 broker,采用 mmap 文件映射,实现顺序的快速写入
6.Customer 从 broker 读取数据,采用 sendfile,将磁盘文件读到 OS 内核缓冲区后,转到 NIO buffer进行网络发送,减少 CPU 消耗

对应面试题

以上是关于(2021年4月20日)kafka详解的主要内容,如果未能解决你的问题,请参考以下文章

2021 认证杯数学建模 第二阶段 比赛通知-赛题思路

caseStudy-20180913-Kafka进程挂掉&解决办法

以太坊2021还会跌吗

kafkakafka Kafka分区leader迁移

一文详解自动驾驶的运行设计域(ODD)| 自动驾驶系列

您的应用可能会在 2015 年 4 月 30 日之后中断