大数据之kafka Broker的工作流程

Posted 柳小葱

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据之kafka Broker的工作流程相关的知识,希望对你有一定的参考价值。

🐳在前面的章节中,我们学习了kafka的生产者的原理和数据一致性保证,今天我们来学习Broker的原理,对以往内容感兴趣的小伙伴可以参考👇:

🌟这一章节,主要是介绍kafka Broker的相关原理,希望大家能够了解broker是如何存储数据的,如何保证数据的存储安全,如何保证数据的有效性等。话不多说,让我们开始今日份的学习吧😄。

本文目录

1. zookeeper中的kafka信息

zookeeper中存储的kafka信息如下图,东西太多,我们只需要记住其中几个比较重要的部分。

  1. 文件目录 /kafka/brokers/ids :[0,1,2] 该目录下记录的是哪些服务器(集群)在工作
  2. 文件目录/kafka/brokers/topics/first/partitions/0/state:“leader”:1 ,“isr”:[1,0,2] 该目录下记录的是每一个主题下的分区的leader信息和信息。这里记录的是0号分区的leader信息,根据分区不同记录不同的leader(上图可见)
  3. 文件目录/kafka/consumers:存储消费者的offset信息,但是0.9版本之后存储在kafka主题中。
  4. 文件目录/kafka/controller :“brokerid”:0 当leader出现故障,辅助我们选取新的leader

2. zookeeper与kafka的交互

这一部分我们来介绍zookeeper与kafka是如何运作的,我们来逐一介绍,如下图:

  1. 启动broker集群,每个集群都会在zookeeper的ids中进行注册,如图,3个broker,[0,1,2]
  2. 每个broker集群中的controller抢占zookeeper中的controller,用来责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。图中是broker0的controller辅助判断leader(注意:每一个broker节点都有对应的controller)
  3. 选举出来的controller来监听ids中的在线集群[0,1,2]的变化。
  4. controller根据规则:在isr中存活为前提,按照AR(kafka分区中所有副本的统称)中排在前面的优先。例如 ar[1,0,2], isr [1,0,2],那么leader就会按照1,0,2的顺序轮询,满足条件的为1,则broker1就成为0号分区的leader。
  5. 将选出的leader信息和isr信息发送给对应的zookeeper。
  6. 其他controller从zookeeper中同步相关信息。
  7. 生产者发布信息给leader之后,follower主动同步leader的信息,主要是通过log的方式进行存储(真实存储是一个一个的segment文件 1G大小 和.index的索引文件)
  8. leader出现故障,选举出来的controller可以监控到ids的变化,拉取leader和isr信息,然后根据规则重选选取leader节点。

3. kafka 副本

3.1 副本的概念

  • Kafka 中副本分为:leader 和 follower。kafka 生产者只会把数据发往 leader,然后 follower 找 leader 进行同步数据,消费数据也是找的leader。
  • Kafka 副本作用:备份来提高数据可靠性。默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  • Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)其中 A R = I S R + O S R AR=ISR+OSR AR=ISR+OSR
  • ISR就是前面我们提到的leader和它同步follower的集合,如果follower长时间未向leader发送通信请求或同步数据,那么该follower将被踢出ISR。
  • OSR是指 Follower 与 Leader 副本同步时,延迟过多的副本(被ISR踢出的副本)

3.2 Follower故障处理流程

下图是follower出现故障时,kafka是处理的步骤,在解答之前,我们需要知道两个概念LEO和HW,LEO是指最新的offset+1的位置,而HW是所有leader以及和它同步的follower中最小的LEO。(这两个参数,主要是用来判断所有副本消息同步的情况。)


在每一个集群上都有HW和LEO,而消费者消费的,是HW的前一个offset,如果follower出现故障,则处理流程如下:

  1. 出现故障的follower会被提出ISR,图中是broker2出错,会被提出去。
  2. 在这期间其他的leader和follower会正常接收数据。
  3. 如果这期间broker2的follower恢复了,那么该follower会读取上次出错时的HW,将HW之后的数据都丢弃(因为它认为,我出错之后的数据都是未验证过的),接着会在该HW的位置向leader重新进行同步数据。
  4. 等到等该follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了。

3.3 Leader故障处理流程

leader出现故障后,处理的流程和上面follower挂掉后的流程差不多。也是根据HW和LEO来判断所有副本的情况。


具体的处理流程如下:

  1. leader发生故障后会从ISR中去除,然后选一个新的leader,这里我们选的是broker1.
  2. 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。(这一部分是follower将自己的数据与新的leader进行对齐)

注:这里会出现一种情况,就是我的新leader才处理到4,但是出现故障的leader数据已经收到了7,这就导致5,6,7的数据容易丢失。所以leader出现故障只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

3.4 分区副本的分配

如果 kafka 服务器只有 4 个节点,那么设置 某个主题的分区数大于服务器台数,在 kafka底层如何分配存储副本呢?

这里的意思是leader的位置是一个一个按照顺序轮流下去,但是follower与leader之间的服务器间隔是从1、2、3依次轮流下去的,这样的目的是为了使分副本均匀地分布在集群上。

3.5 副本处理的生产经验

  • 对于每一台服务器配置不一样的情况,我们可以手动进行副本的分区策略,将配置较好的服务器多负担一点副本。


如图,broker1和broker2的配置较好,可以将副本都设置存储在这两台服务器上。

  • kafka本身会将leader partition均匀分配在各个节点上,但是由于broker的宕机,新的follower会被选成leader,即使宕机的broker恢复后也是follower,leader集中在几台服务器上,导致少数几台broker的读写请求压力过高,造成负载不均衡。


如图,因为broker2和broker3宕机了,leader都分配在broker1和broker2上面,这时我们可以设置自动 Leader Partition 平衡,有3种方式,一种是设置自动平衡,一种是设置每个broker的不平衡率达到多少时触发分区平衡,第三种是没经过300s触发分区平衡的检查。

  • 对于重要的主题,我们可以通过增加该主题副本的数量进行来加强数据安全的保障。

4. kafka文件的存储

4.1 文件的存储原理

这一部分我们主要是来了解broker集群存储生产者生产的数据原理,大致原理如图:

  1. topic是逻辑上的概念,其实真实存储都将一个主题的文件进行分区存储。
  2. 每一个partition文件对应一个log文件(log文件也是概念),真实情况是partition被分为一个一个的segment的文件,这个文件大小为1g左右。
  3. segment文件中存储的是.log文件(存储真实数据),.index文件(存储数据文件的索引),.timeindex文件(主要记录文件的时间,方便后续周期性删除。),其他文件。
  4. index文件和log文件都是以当前segment的第一条消息的offerset命名的

注意:Producer生产的数据会被不断追加到该log文件末端,不可删除和修改,一直是追加。


这里和大家解释一下index文件和log文件是如何存储数据的?

  1. index中只存offer的相对位置,而且是每4kb的log文件才存一条索引数据,这种方式叫做稀疏索引。
  2. 相对索引的含义是相对于自己的文件名(文件名是第一条索引)的位置,如上图,相对索引为65,那么65+自己文件名522,就是真实的offset的位置587。
  3. 然后根据index计算出来的结果去log中寻找对应的数据即可。

4.2 文件的清除原理

Kafka中默认的日志保存时间为 7 天,也可以设置保持几分钟、几毫秒,但是一般都是保持7天,那么哪些参数时间超过7天的数据该如何处理呢?

Kafka 中提供了2种日志清理策略: delete 和 compact

  1. delete :将过期数据删除,这里又有2种删除方式,一种是按照时间删除,一种是按照大小删除,所谓时间删除,是将超过保留时间的数据删除,这里的删除单位是segment,时间是以 segment 中所有记录中的最大时间戳作为该文件时间戳。只看最大时间, 还有一种是按照大小,超过多少大小的日志后,将前面的日志删除(按照大小删除不常用)
  2. compact日志压缩:这里的压缩是指将消息队列中相同key的数据只保留最新的数据,而不是使用某种方式将数据进行压缩。这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

5. kafka高效读写数据的原理

这一部分是面试核心问题

  1. kafka采用的是分布式集群,可以采用分区技术,并行度高,处理速度快。
  2. 读数据采用稀疏索引,可以快速定位要消费的数据(4.1的部分)
  3. 顺序读写磁盘,Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写,比随机写硬盘速度快。
  4. 零拷贝技术+页缓存:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。(可以直接理解,Broker不对数据进行处理,只有生产者和消费者会对数据进行处理,broker只管存储,数据传输过程步骤较少)

6. 个人总结

本章节,一口气介绍完了kafka的broker原理,也就是生产者生产后的数据是如何存储在kafka集群上的,从zookeeper与kafka的交互、kafka的副本机制和文件存储的原理方面对kafka进行来详细介绍,对于我个人来说,让我了解到了mq的各个部分设计思想,让我对offset有了更深的理解,希望能够帮助到大家。

7. 参考资料

-《尚硅谷大数据技术之 Kafka》
-《kafka权威指南》

以上是关于大数据之kafka Broker的工作流程的主要内容,如果未能解决你的问题,请参考以下文章

大数据之kafka Broker的工作流程

大数据-kafka学习——Kafka Broker

大数据之Kafka:Kafka之选举机制

Kafka 之Broker工作流程节点服役和退役

大数据日志传输之Kafka实战教程

Hadoop生态架构之kafka