Kafka核心组件之控制器和协调器

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka核心组件之控制器和协调器相关的知识,希望对你有一定的参考价值。

参考技术A

[TOC]

我们已经知道Kafka的集群由n个的broker所组成,每个broker就是一个kafka的实例或者称之为kafka的服务。其实控制器也是一个broker,控制器也叫leader broker。

他除了具有一般broker的功能外,还负责分区leader的选取,也就是负责选举partition的leader replica。

kafka每个broker启动的时候,都会实例化一个KafkaController,并将broker的id注册到zookeeper,集群在启动过程中,通过选举机制选举出其中一个broker作为leader,也就是前面所说的控制器。

包括集群启动在内,有三种情况触发控制器选举:

1、集群启动

2、控制器所在代理发生故障

3、zookeeper心跳感知,控制器与自己的session过期

按照惯例,先看图。我们根据下图来讲解集群启动时,控制器选举过程。

假设此集群有三个broker,同时启动。

(一)3个broker从zookeeper获取/controller临时节点信息。/controller存储的是选举出来的leader信息。此举是为了确认是否已经存在leader。

(二)如果还没有选举出leader,那么此节点是不存在的,返回-1。如果返回的不是-1,而是leader的json数据,那么说明已经有leader存在,选举结束。

(三)三个broker发现返回-1,了解到目前没有leader,于是均会触发向临时节点/controller写入自己的信息。最先写入的就会成为leader。

(四)假设broker 0的速度最快,他先写入了/controller节点,那么他就成为了leader。而broker1、broker2很不幸,因为晚了一步,他们在写/controller的过程中会抛出ZkNodeExistsException,也就是zk告诉他们,此节点已经存在了。

经过以上四步,broker 0成功写入/controller节点,其它broker写入失败了,所以broker 0成功当选leader。

此外zk中还有controller_epoch节点,存储了leader的变更次数,初始值为0,以后leader每变一次,该值+1。所有向控制器发起的请求,都会携带此值。如果控制器和自己内存中比较,请求值小,说明kafka集群已经发生了新的选举,此请求过期,此请求无效。如果请求值大于控制器内存的值,说明已经有新的控制器当选了,自己已经退位,请求无效。kafka通过controller_epoch保证集群控制器的唯一性及操作的一致性。

由此可见,Kafka控制器选举就是看谁先争抢到/controller节点写入自身信息。

控制器的初始化,其实是初始化控制器所用到的组件及监听器,准备元数据。

前面提到过每个broker都会实例化并启动一个KafkaController。KafkaController和他的组件关系,以及各个组件的介绍如下图:

图中箭头为组件层级关系,组件下面还会再初始化其他组件。可见控制器内部还是有些复杂的,主要有以下组件:

1、ControllerContext,此对象存储了控制器工作需要的所有上下文信息,包括存活的代理、所有主题及分区分配方案、每个分区的AR、leader、ISR等信息。

2、一系列的listener,通过对zookeeper的监听,触发相应的操作,黄色的框的均为listener

3、分区和副本状态机,管理分区和副本。

4、当前代理选举器ZookeeperLeaderElector,此选举器有上位和退位的相关回调方法。

5、分区leader选举器,PartitionLeaderSelector

6、主题删除管理器,TopicDeletetionManager

7、leader向broker批量通信的ControllerBrokerRequestBatch。缓存状态机处理后产生的request,然后统一发送出去。

8、控制器平衡操作的KafkaScheduler,仅在broker作为leader时有效。

Kafka集群的一些重要信息都记录在ZK中,比如集群的所有代理节点、主题的所有分区、分区的副本信息(副本集、主副本、同步的副本集)。每个broker都有一个控制器,为了管理整个集群Kafka选利用zk选举模式,为整个集群选举一个“中央控制器”或”主控制器“,控制器其实就是一个broker节点,除了一般broker功能外,还具有分区首领选举功能。中央控制器管理所有节点的信息,并通过向ZK注册各种监听事件来管理整个集群节点、分区的leader的选举、再平衡等问题。外部事件会更新ZK的数据,ZK中的数据一旦发生变化,控制器都要做不同的响应处理。

故障转移其实就是leader所在broker发生故障,leader转移为其他的broker。转移的过程就是重新选举leader的过程。

重新选举leader后,需要为该broker注册相应权限,调用的是ZookeeperLeaderElector的onControllerFailover()方法。在这个方法中初始化和启动了一系列的组件来完成leader的各种操作。具体如下,其实和控制器初始化有很大的相似度。

1、注册分区管理的相关监听器

2、注册主题管理的相关监听

3、注册代理变化监听器

4、重新初始化ControllerContext,

5、启动控制器和其他代理之间通信的ControllerChannelManager

6、创建用于删除主题的TopicDeletionManager对象,并启动。

7、启动分区状态机和副本状态机

8、轮询每个主题,添加监听分区变化的PartitionModificationsListener

9、如果设置了分区平衡定时操作,那么创建分区平衡的定时任务,默认300秒检查并执行。

除了这些组件的启动外,onControllerFailover方法中还做了如下操作:

1、/controller_epoch值+1,并且更新到ControllerContext

2、检查是否出发分区重分配,并做相关操作

3、检查需要将优先副本选为leader,并做相关操作

4、向kafka集群所有代理发送更新元数据的请求。

下面来看leader权限被取消时,调用的方法onControllerResignation

1、该方法中注销了控制器的权限。取消在zookeeper中对于分区、副本感知的相应监听器的监听。

2、关闭启动的各个组件

3、最后把ControllerContext中记录控制器版本的数值清零,并设置当前broker为RunnignAsBroker,变为普通的broker。

通过对控制器启动过程的学习,我们应该已经对kafka工作的原理有了了解, 核心是监听zookeeper的相关节点,节点变化时触发相应的操作

有新的broker加入集群时,称为代理上线。反之,当broker关闭,推出集群时,称为代理下线。

代理上线:

1、新代理启动时向/brokers/ids写数据

2、BrokerChangeListener监听到变化。对新上线节点调用controllerChannelManager.addBroker(),完成新上线代理网络层初始化

3、调用KafkaController.onBrokerStartup()处理

3.5恢复因新代理上线暂停的删除主题操作线程

代理下线:

1、查找下线节点集合

2、轮询下线节点,调用controllerChannelManager.removeBroker(),关闭每个下线节点网络连接。清空下线节点消息队列,关闭下线节点request请求

3、轮询下线节点,调用KafkaController.onBrokerFailure处理

4、向集群全部存活代理发送updateMetadataRequest请求

顾名思义,协调器负责协调工作。本节所讲的协调器,是用来协调消费者工作分配的。简单点说,就是消费者启动后,到可以正常消费前,这个阶段的初始化工作。消费者能够正常运转起来,全有赖于协调器。

主要的协调器有如下两个:

1、消费者协调器(ConsumerCoordinator)

2、组协调器(GroupCoordinator)

kafka引入协调器有其历史过程,原来consumer信息依赖于zookeeper存储,当代理或消费者发生变化时,引发消费者平衡,此时消费者之间是互不透明的,每个消费者和zookeeper单独通信,容易造成羊群效应和脑裂问题。

为了解决这些问题,kafka引入了协调器。服务端引入组协调器(GroupCoordinator),消费者端引入消费者协调器(ConsumerCoordinator)。每个broker启动的时候,都会创建GroupCoordinator实例,管理部分消费组(集群负载均衡)和组下每个消费者消费的偏移量(offset)。每个consumer实例化时,同时实例化一个ConsumerCoordinator对象,负责同一个消费组下各个消费者和服务端组协调器之前的通信。如下图:

消费者协调器,可以看作是消费者做操作的代理类(其实并不是),消费者很多操作通过消费者协调器进行处理。

消费者协调器主要负责如下工作:

1、更新消费者缓存的MetaData

2、向组协调器申请加入组

3、消费者加入组后的相应处理

4、请求离开消费组

5、向组协调器提交偏移量

6、通过心跳,保持组协调器的连接感知。

7、被组协调器选为leader的消费者的协调器,负责消费者分区分配。分配结果发送给组协调器。

8、非leader的消费者,通过消费者协调器和组协调器同步分配结果。

消费者协调器主要依赖的组件和说明见下图:

可以看到这些组件和消费者协调器担负的工作是可以对照上的。

组协调器负责处理消费者协调器发过来的各种请求。它主要提供如下功能:

组协调器在broker启动的时候实例化,每个组协调器负责一部分消费组的管理。它主要依赖的组件见下图:

这些组件也是和组协调器的功能能够对应上的。具体内容不在详述。

下图展示了消费者启动选取leader、入组的过程。

消费者入组的过程,很好的展示了消费者协调器和组协调器之间是如何配合工作的。leader consumer会承担分区分配的工作,这样kafka集群的压力会小很多。同组的consumer通过组协调器保持同步。消费者和分区的对应关系持久化在kafka内部主题。

消费者消费时,会在本地维护消费到的位置(offset),就是偏移量,这样下次消费才知道从哪里开始消费。如果整个环境没有变化,这样做就足够了。但一旦消费者平衡操作或者分区变化后,消费者不再对应原来的分区,而每个消费者的offset也没有同步到服务器,这样就无法接着前任的工作继续进行了。

因此只有把消费偏移量定期发送到服务器,由GroupCoordinator集中式管理,分区重分配后,各个消费者从GroupCoordinator读取自己对应分区的offset,在新的分区上继续前任的工作。

下图展示了不提交offset到服务端的问题:

开始时,consumer 0消费partition 0 和1,后来由于新的consumer 2入组,分区重新进行了分配。consumer 0不再消费partition2,而由consumer 2来消费partition 2,但由于consumer之间是不能通讯的,所有consumer2并不知道从哪里开始自己的消费。

因此consumer需要定期提交自己消费的offset到服务端,这样在重分区操作后,每个consumer都能在服务端查到分配给自己的partition所消费到的offset,继续消费。

由于kafka有高可用和横向扩展的特性,当有新的分区出现或者新的消费入组后,需要重新分配消费者对应的分区,所以如果偏移量提交的有问题,会重复消费或者丢消息。偏移量提交的时机和方式要格外注意!!

1、自动提交偏移量

设置 enable.auto.commit为true,设定好周期,默认5s。消费者每次调用轮询消息的poll() 方法时,会检查是否超过了5s没有提交偏移量,如果是,提交上一次轮询返回的偏移量。

这样做很方便,但是会带来重复消费的问题。假如最近一次偏移量提交3s后,触发了再均衡,服务器端存储的还是上次提交的偏移量,那么再均衡结束后,新的消费者会从最后一次提交的偏移量开始拉取消息,此3s内消费的消息会被重复消费。

2、手动提交偏移量

设置 enable.auto.commit为false。程序中手动调用commitSync()提交偏移量,此时提交的是poll方法返回的最新的偏移量。

commitSync()是同步提交偏移量,主程序会一直阻塞,偏移量提交成功后才往下运行。这样会限制程序的吞吐量。如果降低提交频次,又很容易发生重复消费。

这里我们可以使用commitAsync()异步提交偏移量。只管提交,而不会等待broker返回提交结果

commitSync只要没有发生不可恢复错误,会进行重试,直到成功。而commitAsync不会进行重试,失败就是失败了。commitAsync不重试,是因为重试提交时,可能已经有其它更大偏移量已经提交成功了,如果此时重试提交成功,那么更小的偏移量会覆盖大的偏移量。那么如果此时发生再均衡,新的消费者将会重复消费消息。

Apache Kafka核心组件和流程-协调器(消费者和组协调器)-设计-原理(入门教程轻松学)


作者:稀有气体
来源:CSDN
原文:https://blog.csdn.net/liyiming2017/article/details/82805479
版权声明:本文为博主原创文章,转载请附上博文链接!

本入门教程,涵盖Kafka核心内容,通过实例和大量图表,帮助学习者理解,任何问题欢迎留言。

目录:

本章是学习kafka的核心章节,涵盖内容比较多,在理解上有一定的难度,需要反复阅读理解,才能参透Kafka的设计思想。

1、Kafka集群结构

在第一章我给出过一个消息系统通用的结构图,也就是下图:

实际上kafka的结构图是有些区别的,现在我们看下面的图:

producer和consumer想必大家都很熟悉,一个生产消息,一个消费掉消息。这里就不再做太多解释。

此图和第一张图可以看到有几个区别:

1、多了zookeeper集群,通过前几章的学习我们已经知道kafka是配合zookeeper进行工作的。

2、kafka集群中可以看到有若干个Broker,其中一个broker是leader,其他的broker是follower

3、consumer外面包裹了一层Consumer group。

我们先讲解一下Broker和consumer group的概念,以及Topic。

 

Broker

一个Borker就是Kafka集群中的一个实例,或者说是一个服务单元。连接到同一个zookeeper的多个broker实例组成kafka的集群。在若干个broker中会有一个broker是leader,其余的broker为follower。leader在集群启动时候选举出来,负责和外部的通讯。当leader死掉的时候,follower们会再次通过选举,选择出新的leader,确保集群的正常工作。

 

Consumer Group

Kafka和其它消息系统有一个不一样的设计,在consumer之上加了一层group。同一个group的consumer可以并行消费同一个topic的消息,但是同group的consumer,不会重复消费。这就好比多个consumer组成了一个团队,一起干活,当然干活的速度就上来了。group中的consumer是如何配合协调的,其实和topic的分区相关联,后面我们会详细论述。

如果同一个topic需要被多次消费,可以通过设立多个consumer group来实现。每个group分别消费,互不影响。

通过本节学习,我们从全局的层面了解了kafka的结构,接下来我们会深入到kafka内部,来看看它是怎么工作的。

 

Topic

kafka中消息订阅和发送都是基于某个topic。比如有个topic叫做NBA赛事信息,那么producer会把NBA赛事信息的消息发送到此topic下面。所有订阅此topic的consumer将会拉取到此topic下的消息。Topic就像一个特定主题的收件箱,producer往里丢,consumer取走。

 

2、Kafka核心概念简介

kafka采用分区(Partition)的方式,使得消费者能够做到并行消费,从而大大提高了自己的吞吐能力。同时为了实现高可用,每个分区又有若干份副本(Replica),这样在某个broker挂掉的情况下,数据不会丢失。

接下来我们详细分析kafka是如何基于Partition和Replica工作的。

 

分区(Partition)

大多数消息系统,同一个topic下的消息,存储在一个队列。分区的概念就是把这个队列划分为若干个小队列,每一个小队列就是一个分区,如下图:

这样做的好处是什么呢?其实从上图已经可以看出来。无分区时,一个topic只有一个消费者在消费这个消息队列。采用分区后,如果有两个分区,最多两个消费者同时消费,消费的速度肯定会更快。如果觉得不够快,可以加到四个分区,让四个消费者并行消费。分区的设计大大的提升了kafka的吞吐量!!

我们再结合下图继续讲解Partition。

此图包含如下几个知识点:

1、一个partition只能被同组的一个consumer消费(图中只会有一个箭头指向一个partition)

2、同一个组里的一个consumer可以消费多个partition(图中第一个consumer消费Partition 0和3)

3、消费效率最高的情况是partition和consumer数量相同。这样确保每个consumer专职负责一个partition。

4、consumer数量不能大于partition数量。由于第一点的限制,当consumer多余partition时,就会有consumer闲置。

5、consumer group可以认为是一个订阅者的集群,其中的每个consumer负责自己所消费的分区

 

为了加深理解,我举个吃苹果的例子。

问题:有一篮子苹果,你如何把这一篮子苹果尽可能快的吃完?

办法一:

我一个人,一个一个苹果吃,如下图。这样显然很慢,我吃完一个才能拿下一个。

办法二:

我再找两个人来一块吃,第一个人拿走一个去吃,然后第二个人拿一个去吃,接着第三个人拿一个去吃,如此循环。速度肯定快了,但是三个人还是会排队等待。三个人排队时间可能很短,但是如果叫了100个人帮忙吃呢?会有大量时间消耗在排队上。

办法三:

我还是找两个人来一块吃,但我把苹果提前分到三个盘子里,每人分一个盘子,自己吃自己的,这样不但能三个人同时吃苹果,还无须排队。速度显然是最快的。

办法三正是kafka所采用的设计方式,盘子就是partition,每个人就是一个consumer,每个苹果就是一条message。办法三每个盘子中苹果的消费是有序的,而办法二的消费是完全无序的。

相信通过这个例子你一定能充分理解partition的概念,以及为什么kafka会如此设计。

关于partition暂时说到这里,接下来介绍副本。

 

副本(Replica)

提到副本,肯定就会想到正本。副本是正本的拷贝。在kafka中,正本和副本都称之为副本(Repalica),但存在leader和follower之分。活跃的称之为leader,其他的是follower。

每个分区的数据都会有多份副本,以此来保证Kafka的高可用。

Topic、partition、replica的关系如下图:

topic下会划分多个partition,每个partition都有自己的replica,其中只有一个是leader replica,其余的是follower replica。

消息进来的时候会先存入leader replica,然后从leader replica复制到follower replica。只有复制全部完成时,consumer才可以消费此条消息。这是为了确保意外发生时,数据可以恢复。consumer的消费也是从leader replica读取的。

由此可见,leader replica做了大量的工作。所以如果不同partition的leader replica在kafka集群的broker上分布不均匀,就会造成负载不均衡。

kafka通过轮询算法保证leader replica是均匀分布在多个broker上。如下图。

可以看到每个partition的leader replica均匀的分布在三个broker上,follower replica也是均匀分布的。

关于Replica,有如下知识点:

1、Replica均匀分配在Broker上,同一个partition的replica不会在同一个borker上

2、同一个partition的Replica数量不能多于broker数量。多个replica为了数据安全,一台server存多个replica没有意义。server挂掉,上面的副本都要挂掉。

3、分区的leader replica均衡分布在broker上。此时集群的负载是均衡的。这就叫做分区平衡

分区平衡是个很重要的概念,接下来我们就来讲解分区平衡。

 

分区平衡

在讲分区平衡前,先讲几个概念:

1、AR: assigned replicas,已分配的副本。每个partition都有自己的AR列表,里面存储着这个partition最初分配的所有replica。注意AR列表不会变化,除非增加分区。

2、PR(优先replica):AR列表中的第一个replica就是优先replica,而且永远是优先replica。最初,优先replica和leader replica是同一个replica。

3、ISR:in sync replicas,同步副本。每个partition都有自己的ISR列表。ISR是会根据同步情况动态变化的。

最初ISR列表和AR列表是一致的,但由于某个节点死掉,或者某个节点的follower replica落后leader replica太多,那么该节点就会被从ISR列表中移除。此时,ISR和AR就不再一致

接下来我们通过一个例子来理解分区平衡。

1、根据以上信息,一个拥有3个replica的partition,最初是下图的样子。

可以看到AR和ISR保持一致,并且初始时刻,优先副本和leader副本都指向replica 0.

2、接下来,replica 0所在的机器下线了,那么情况会变成如下图所示:

可以看到replica 0已经从ISR中移除掉了。同时,由于重新选举,leader副本变成了replica 1,而优先副本还是replica 0。优先副本是不会改变的。

由于最初时,leader副本在broker均匀分布,分区是平衡的。但此时,由于此partition的leader副本换成了另外一个,所以此时分区平衡已经被破坏。

3、replica 0所在的机器修复了,又重新上线,情况如下图:

可以看到replica 0重新回到ISR列表中,不过此时他没能恢复leader的身份。只能作为follower当一名小弟。

此时分区依旧是不平衡的。那是否意味着分区永远都会不平衡下去呢?不是的。

4、kafka会定时触发分区平衡操作,也可以主动触发分区平衡。这就是所谓的分区平衡操作,操作完后如下图。

可以看到此时leader副本通过选举,会重新变回来replica 0,因为replica 0是优先副本,其实优先的含义就是选择leader时被优先选择。这样整个分区又回到了初始状态,而初始时,leader副本是均匀分布的。此时已经分区平衡了。

由此可见,分区平衡操作就是使leader副本和优先副本保持一致的操作。可以把优先副本理解为分区的平衡状态位,平衡操作就是让leader副本归位。

 

Partition的读和写

通过之前的学习,我们知道topic下划分了多个partition,消息的生产和消费最终都是发生在partition之上。下图是一个三个partition的topic的读写示意。

我们先看右边的producer,可以看到写的时候,采用round-robin算法,轮询往每个partition写入。

而在消费者端,每个consumer都维护一个offset值,指向的是它所消费到的消息坐标。

我们先看group A的三个consumer,他们分别独立消费不同的三个partition。每个consumer维护了自己的offset。

再结group B,可以看到两个group是并行消费整个topic,同一条消息会被不同group消费到。

 

此处有如下知识点:

1、每个partition都是有序的不可变的。

2、Kafka可以保证partition的消费顺序,但不能保证topic消费顺序。

3、无论消费与否,保留周期默认两天(可配置)。

4、每个consumer维护的唯一元数据是offset,代表消费的位置,一般线性向后移动。

5、consumer也可以重置offset到之前的位置,可以以任何顺序消费,不一定线性后移。

 

回顾

本章是理解kafka设计的核心,通过本章学习你应该理解如下知识点:

  1. producer
  2. consumer
  3. consumer group
  4. broker
  5. 分区(partition)
  6. 副本(replica)
  7. 分区平衡
  8. 消息读写

如果对上面提到的知识点还有不清晰的地方,请再复习,或者找其它学习资料进行学习。

下一步:开始kafka核心组件和流程-控制器的学习

以上是关于Kafka核心组件之控制器和协调器的主要内容,如果未能解决你的问题,请参考以下文章

Apache Kafka核心组件和流程-协调器(消费者和组协调器)-设计-原理(入门教程轻松学)

Apache Kafka核心组件和流程-协调器(消费者和组协调器)-设计-原理(入门教程轻松学)

Apache Kafka核心组件和流程-协调器(消费者和组协调器)-设计-原理(入门教程轻松学)

Apache Kafka 核心组件和流程-控制器-设计-原理(入门教程轻松学)

Apache Kafka 核心组件和流程-控制器-设计-原理(入门教程轻松学)

一文理解 Kafka 的 Controller 领导选举!