Kafka快速入门——Kafka架构
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka快速入门——Kafka架构相关的知识,希望对你有一定的参考价值。
Kafka快速入门(二)——Kafka架构一、Kafka架构简介
1、Kafka架构简介
2、Record
Record即Kafka消息,是Kafka处理的主要对象。
3、Topic
Topic是承载Kafka消息数据的逻辑容器,用于区分具体的业务,但在物理上,不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存在一个或多个Broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存储在何处。
4、Partition
Topic被分割为一个或多个Partition,Partition是一个物理概念,对应一个或若干个目录。Partition内部的消息是有序的,Partition间的消息是无序的。
Kafka Broker配置文件中的num.partitions参数用于指定Topic的Partition数量,在创建Topic时也可以通过partitions参数指定Partition数量。
5、Broker
Kafka集群包含一个或多个服务器,每个服务器节点称为一个Broker。
Broker存储Topic的数据。如果某Topic有N个Partition,集群有N个Broker,那么每个Broker存储该Topic的一个Partition。
如果某Topic有N个Partition,集群有(N+M)个Broker,那么其中有N个Broker各存储Topic的一个Partition,剩下的M个Broker不存储Topic的Partition数据。
如果某Topic有N个Partition,集群中Broker数目少于N个,那么每个Broker存储Topic的一个或多个Partition。实际生产环境中应尽量避免,否则容易导致Kafka集群数据不均衡。
6、Producer
Producer(生产者)是消息的发布者,生产者负责选择将消息数据分配给Topic中的某个分区,即生产者生产的每一条消息,会被写入到某一个Partition。
7、Consumer
Consumer(消费者)从Broker中消费消息,一个Consumer可以消费多个Topic的消息,也可以消费同一个Topic中的多个Partition中的消息,Partition允许多个Consumer同时消费,提高Kafka Broker的吞吐量。
8、Consumer Group
Consumer Group是Kafka提供的可扩展且具有容错性的消费者机制,多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。Consumer Group内可以有多个消费者,共享一个公共的ID,即Group ID,Consumer Group内的所有消费者协调在一起来消费订阅Topic的所有分区。
Kafka保证同一个Consumer Group中只有一个Consumer会消费某条消息,Kafka保证的是稳定状态下每一个Consumer 实例只会消费某一个或多个特定的Partition,而某个Partition的数据只会被某一个特定的Consumer实例所消费。
两台Broker组成的Kafka群集,包含四个属于两个Consumer Group的分区(P0-P3)。Consumer Group A有两个消费者实例,Consumer Group B有四个消费者实例。
Consumers使用Consumer Group的名字来标识自己,并且每个发布到Topic的消息(Record)都会被传递到每个Consumer Group中的一个消费者实例,消费者实例可以在单独的进程中或者在不同的机器中。
如果所有消费者实例都划分到一个Consumer Group中,那么消息将会轮流被Consumer Group中的消费者消费,即单播;如果所有的消费者实例都在不同的Consumer Group中,那么每一条消息都会被所有的消费者消费,即广播。
Consumer Group包含Consumer的数量可以预先在配置文件中配置。多个Consumer可以组成一个Consumer Group,Partition中的每一条消息只能被一个Consumer Group中的一个Consumer进行消费,其它Consumer不能消费同一个Topic中同一个Partition的数据,不同Consumer Group的Consumer可以消费同一个Topic的同一个Partition的数据。
9、Replica
Replica(分区副本)是一个分区的备份,是为了防止消息丢失而创建的分区备份。Kafka中同一条消息能够被拷贝到多个地方以提供数据冗余。副本分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
10、Partition Leader
每个Partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责消息读写的Partition,所有读写操作只能发生于Leader分区上。
11、Partition Follower
所有Follower都需要从Leader同步消息,Follower与Leader始终保持消息同步。Leader与Follower的关系是主备关系,而非主从关系。
12、Zookeeper
Apache Zookeeper是一个分布式配置和同步服务,是Apache Kafka的一个关键依赖,是Kafka Broker和Consumer之间的协调接口。Kafka Broker通过Zookeeper集群共享信息,Kafka Broker在Zookeeper中存储基本元数据,例如关于主题、代理、消费者偏移(队列读取器)等的信息。Zookeeper负责维护和协调Kafka Broker以及Broker Controller的选举。
Kafka 0.9版本前,offset由Zookeeper负责管理。
二、Kafka分区机制
1、分区机制简介
分区机制指的是将每个Topic划分成多个分区(Partition),每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中。
对每个?Topic,Kafka集群维护了一个分区的记录(log),分区 (Partition) 都是一个有序的、不可变的数据序列,消息数据被不断的添加到序列的尾部。分区中的每一条消息数据都被赋予了一个连续的数字ID,即偏移量 (offset) ,用于唯一标识分区中的每条消息数据。
分区的作用就是提供负载均衡的能力,实现系统的高伸缩性(Scalability)。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区进行的,每个节点的机器都能独立地执行各自分区的读写请求处理,同时可以通过添加新的节点机器来增加整体系统的吞吐量。
2、消息位移
Offset(消息位移)是消息在分区内的偏移量。每条消息都有一个当前Partition下唯一的64字节的消息位移,是相对于当前分区第一条消息的偏移量。
3、Log Segment机制
Kafka使用消息日志(Log)来保存数据,日志是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,因此避免了缓慢的随机I/O操作,改为性能较好的顺序I/O写操作。Kafka通过日志段(Log Segment)机制定期地删除消息以回收磁盘。Kafka日志文件分为多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka会自动切分出一个新的日志段,并将旧的日志段封存。Kafka在后台会定期地检查旧的日志段是否能够被删除,从而实现回收磁盘空间的目的。
Kafka将消息数据根据Partition进行存储,Partition分为若干Segment,每个Segment的大小相等。
Segment由index file?和?data file组成,后缀为".index"和".log",分别表示为Segment索引文件、数据文件,每一个Segment存储着多条信息。
Segment文件的生命周期由Broker配置参数决定,默认24x7小时后删除。
依据Kafka消息在分区的全局offset,可以使用二分查找算法查找到相应的Segment的.index索引文件和.log数据文件; 根据索引文件找到消息在分区的逻辑偏移量和物理地址偏移量,并取出消息数据。
4、分区策略
分区策略是决定生产者将消息发送到哪个分区的算法。Kafka提供默认的分区策略,同时支持自定义分区策略。如果要自定义分区策略,需要显式地配置生产者端的参数partitioner.class。编写生产者程序时,可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口(partition()和close()),通常只需要实现最重要的partition方法。
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前Kafka集群共有多少主题、多少Broker等)。设置partitioner.class参数为自己实现类的Full Qualified Name,生产者程序就会按照自定义分区策略的代码逻辑对消息进行分区。
5、轮询策略
轮询策略(Round-robin),即顺序分配策略。如果一个Topic有3个分区,则第1条消息被发送到分区0,第2条被发送到分区1,第3条被发送到分区2,以此类推。当生产第4条消息时又会重新轮询将其分配到分区0。
轮询策略是Kafka Java生产者API默认提供的分区策略。如果未指定partitioner.class参数,那么生产者程序会按照轮询的方式在Topic的所有分区间均匀地存储消息。轮询策略有非常优秀的负载均衡表现,能保证消息最大限度地被平均分配到所有分区上。
6、随机策略
随机策略(Randomness)是将消息随机地放置到任意一个分区上。
如果要实现随机策略版的partition方法,Java版如下:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
先计算出Topic的总分区数,然后随机地返回一个小于分区数的正整数。随机策略本质上是力求将数据均匀地分散到各个分区,但实际表现要逊于轮询策略,如果追求数据的均匀分布,推荐使用轮询策略。
7、按消息键保序策略
Kafka允许为每条消息定义消息键,简称为Key。Key可以是一个有着明确业务含义的字符串,如客户代码、部门编号或是业务ID等,也可以用来表征消息元数据。一旦消息被定义了Key,就可以保证同一个Key的所有消息都进入到相同的分区中。
实现分区策略的partition方法只需要两行代码即可:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
Kafka 默认分区策略同时实现了两种策略:如果指定Key,那么默认实现按消息键保序策略;如果没有指定Key,则使用轮询策略。
8、基于地理位置的分区策略
基于地理位置的分区策略通常只针对大规模的Kafka集群,特别是跨城市、跨国家甚至跨大洲的集群。假设天猫计划为每个新注册用户提供一份注册礼品,比如欧美的用户注册天猫时可以免费得到一台iphone SE手机,而中国的新注册用户可以得到一台华为P40 Pro。为了实现相应的注册业务逻辑,只需要创建一个双分区的主题,然后再创建两个消费者程序分别处理欧美和中国用户的注册用户逻辑即可,同时必须把不同地理位置的用户注册的消息发送到不同机房中,因为处理注册消息的消费者程序只可能在某一个机房中启动着。基于地理位置的分区策略可以根据Broker的IP地址实现定制化的分区策略。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isChina(p.leader().host())).map(PartitionInfo::partition).findAny().get();
可以从所有分区中找出Leader副本在中国的所有分区,然后随机挑选一个进行消息发送。
9、消息保序
为了实现消息保序,可以将Topic设置成单分区,单分区的Topic的所有的消息都只在一个分区内读写,保证全局的顺序性,但会丧失Kafka多分区带来的高吞吐量和负载均衡的性能优势。
实现消息保序的另一种方式是按消息键保序策略。通过对具体业务进行分析,提取出需要保序的消息的逻辑主体并建立消息标志位ID,
对标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区,既可以保证分区内的消息顺序,也可以享受到多分区带来的搞吞吐量。
消息重试只是简单将消息重新发送到原来的分区,不会重新选择分区。
10、消息生产过程
生产者生产消息,将消息发送给Broker并形成可供消费者消费的消息数据的过程如下:
(1)Producer先从Zookeeper中找到Partition的Leader
(2)Producer将消息发送给分区的Leader。
(3)Leader将消息接入本地的Log,并通知ISR的Followers。
(4)ISR中的Followers从Leader中pull消息,写入本地Log后向Leader发送ACK。
(5)Leader收到所有ISR中的Followers的ACK后,增加HW并向Producer发送ACK,表示消息写入成功。
11、消息路由策略
在通过API方式发布消息时,生产者是以Record为消息进行发布的。Record中包含key与value,value才是消息本身,而key用于路由消息所要存放Partition。消息要写入到哪个Partition并不是随机的,而是由路由策略决定。
(1)如果指定了Partition,则直接写入到指定的Partition。
(2)如果没有指定Partition但指定了key,则通过对key的hash值与Partition数量取模,结果就是要选出的Partition索引。
(3)如果Partition和key都未指定,则使用轮询算法选出一个Partition。
(4)增加分区时,Partition内的消息不会重新进行分配,随着数据继续写入,新分区才会参与再平衡。
三、Kafka Consumer Group
1、Consumer Group简介
传统消息队列模型的消息一旦被消费,就会从队列中被删除,而且只能被一个Consumer消费,因此伸缩性(scalability)很差;发布/订阅模型允许消息被多个Consumer消费,但其伸缩性也不高,因为每个订阅者都必须要订阅Topic的所有分区。
Consumer Group(消费者组)是Kafka提供的可扩展且具有容错性的消费者机制,可以避免传统消息队列模型、发布/订阅模型的缺陷。
当Consumer Group订阅多个Topic后,组内的每个Consumer实例不要求一定要订阅Topic的所有分区,只会消费部分分区中的消息;而Consumer Group间彼此独立,互不影响,能够订阅相同的一组Topic而互不干涉。再加上Broker端的消息留存机制,Kafka的Consumer Group避免了伸缩性差的问题,同时实现了传统消息引擎系统的两大模型:如果所有Consumer实例都属于同一个Group,是消息队列模型;如果所有Consumer实例分别属于不同的Group,是发布/订阅模型。理想情况下,Consumer 实例的数量应该等于Consumer Group订阅主题的分区总数。
Consumer Group可以有多个消费者或消费者实例(Consumer Instance),共享一个公共的ID(Group ID)。Consumer Group组内的所有消费者协调在一起来消费订阅Topic(Subscribed Topics)的所有分区(Partition),每个分区只能由同一个消费者组内的一个Consumer实例来消费。
2、Consumer Group特性
(1)Consumer Group可以有一个或多个Consumer实例,Consumer实例可以是一个单独的进程,也可以是同一进程下的线程。
(2)Group ID是一个字符串,在一个Kafka集群中标识唯一的一个Consumer Group。
(3)Consumer Group的所有Consumer实例订阅的Topic的某个分区只能分配给组内的某个Consumer实例消费,但可以被其它Consumer Group消费。
3、消费者组位移
Consumer Group的位移是一组 KV 对,Key是分区,V是Consumer消费Key分区的最新位移。
旧版本的Consumer Group把位移保存在ZooKeeper中。ZooKeeper 是一个分布式的协调服务框架,Kafka重度依赖ZooKeeper实现各种各样的协调管理,将位移保存在ZooKeeper的优点是减少了Kafka Broker端的状态保存开销。但ZooKeeper并不适合进行频繁的写更新,而Consumer Group的位移更新是一个非常频繁的操作,大吞吐量的写操作会极大地拖慢ZooKeeper集群的性能,因此Kafka 0.9版本的 Consumer Group中重新设计了Consumer Group的位移管理方式,将位移保存在Kafka内置Topic(__consumer_offsets
)中。
4、Rebalance
Rebalance本质上是一种协议,规定了一个Consumer Group下的所有Consumer如何达成一致,来分配订阅Topic的每个分区。如某个Group下有20个Consumer实例,订阅了一个具有100个分区的Topic。正常情况下,Kafka平均会为每个Consumer分配5个分区,分区的分配的过程就叫Rebalance。Consumer Group触发Rebalance的条件有 3 个:
(1)组成员数发生变更。如有新的Consumer实例加入组或者离开组,或有Consumer实例崩溃从组中删除。
(2)订阅Topic数发生变更。Consumer Group可以使用正则表达式的方式订阅主题,在Consumer Group运行过程中,如果新创建一个满足正则匹配条件的Topic,那么Consumer Group就会触发Rebalance。
(3)订阅Topic的分区数发生变更。Kafka当前只能允许增加一个Topic的分区数。当分区数增加时,就会触发订阅Topic的所有Consumer Group开启Rebalance。
Rebalance发生时,Consumer Group下所有的Consumer实例都会协调在一起共同参与。Kafka默认提供了3种Topic分区的分配策略,每个Consumer实例都能够得到较为平均的分区数。
Rebalance的缺点如下:
(1)Rebalance过程对Consumer Group消费过程有极大的影响。在Rebalance过程中,所有Consumer实例都会停止消费,等待Rebalance完成。
(2)Rebalance过程中所有Consumer实例共同参与,全部重新分配所有分区,更高效的做法是尽量减少分配方案的变动。
(3)Rebalance过程太慢。
5、Coordinator
Coordinator一般指的是运行在每个Broker上的Group Coordinator进程,用于管理Consumer Group中的各个成员,主要用于offset位移管理和Rebalance。一个Coordinator可以同时管理多个消费者组。
kafka引入Coordinator有其历史背景,原来consumer信息依赖于Zookeeper存储,当代理或消费者发生变化时,引发消费者平衡,此时消费者之间是互不透明的,每个消费者和Zookeeper单独通信,容易造成羊群效应和脑裂问题。
为了解决这些问题,kafka引入了Coordinator。服务端引入组协调器(Group Coordinator),消费者端引入消费者协调器(Consumer Coordinator)。每个Broker启动的时候,都会创建Group Coordinator实例,管理部分消费组(集群负载均衡)和组下每个消费者消费的偏移量(offset)。每个Consumer实例化时,同时实例化一个Consumer Coordinator对象,负责同一个消费组下各个消费者和服务端组协调器之前的通信。
四、Kafka Rebalance机制
1、Rebalance简介
当消费者组中的数量发生变化,或者Topic中的Partition数量发生了变化时,Partition的所有权会在消费者间转移,即Partition会重新分配,分配过程称为Rebalance。
Rebalance能够给消费者组及Broker带来高性能、高可用性和伸缩,但在Rebalance期间消费者是无法读取消息的,即整Broker集群有小一段时间是不可用的。因此要避免不必要的Rebalance。
Rebalance是让一个Consumer Group下所有的Consumer实例就如何消费订阅主题的所有分区达成共识的过程。Rebalance过程中,所有Consumer实例共同参与,在Coordinator组件的帮助下,完成订阅主题分区的分配。Rebalance过程中,所有Consumer实例都不能消费任何消息,因此对Consumer的TPS影响很大。
在Kafka中,Coordinator负责为Consumer Group 执行Rebalance以及提供位移管理和组成员管理等。Consumer端应用程序在提交位移时,向Coordinator所在的Broker提交位移。当Consumer应用启动时,向Coordinator所在的Broker发送各种请求,然后由Coordinator负责执行消费者组的注册、成员管理记录等元数据管理操作。
所有Broker在启动时,都会创建和开启相应的Coordinator组件,Consumer Group确定为其服务的Coordinator在哪台Broker上的算法有2 个步骤:
(1)确定由位移主题的哪个分区来保存Consumer Group数据:partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount)。
(2)找出分区Leader副本所在的Broker,即为对应Coordinator。
2、StickyAssignor粘性分区分配策略
Kafka每次Rebalance时,所有Consumer实例会共同参与重新分配所有分区,效率不高,因此Kafka 0.11.0.0版本推出了StickyAssignor(有粘性的分区分配策略)。有粘性分区分配是指每次Rebalance时会尽可能地保留原来的分区分配方案,尽量实现分区分配的最小变动。
3、Rebalance触发条件
触发Rebalance的情况有三种:Consumer Group的Consumer成员数量发生变化;订阅主题数量发生变化;订阅主题的分区数发生变化。
如果Consumer Group下的Consumer实例数量发生变化,就一定会引发Rebalance,也是Rebalance发生的最常见的原因。
当启动一个配置有相同group.id值的Consumer程序时,就向Group添加一个新的Consumer实例。此时,Coordinator会接纳新实例,将其加入到组中,并重新分配分区。增加Consumer实例的操作都是计划内的,可能出于增加TPS或提高伸缩性的需要。
当Consumer Group完成Rebalance后,每个Consumer实例都会定期地向Coordinator发送心跳请求。如果某个Consumer实例不能及时地发送心跳请求,Coordinator会认为Consumer已经死,从而将其从Group中移除,然后开启新一轮Rebalance。
Consumer端session.timeout.ms参数用于指定Consumer的心跳超时,默认值10秒,如果Coordinator在 10 秒内没有收到Group下某Consumer实例的心跳,会认为Consumer实例已经挂了。
Consumer的heartbeat.interval.ms参数允许控制发送心跳请求频率,值越小,Consumer实例发送心跳请求的频率就越高,频繁地发送心跳请求会额外消耗带宽资源,但能够更加快速地知晓当前是否开启Rebalance。Coordinator通知各个Consumer实例开启Rebalance的方法就是将REBALANCE_NEEDED标志封装进心跳请求的响应体中。
Consumer端max.poll.interval.ms参数用于控制Consumer实际消费能力对Rebalance的影响,限定Consumer端应用程序两次调用poll方法的最大时间间隔,默认值是5分钟,表示Consumer程序如果在5 分钟内无法消费完poll方法返回的消息,Consumer会主动发起离开组的请求,Coordinator会开启新一轮Rebalance。
有多种原因会导致产生非必要的Rebalance:
(1)未能及时发送心跳,导致Consumer被踢出Group而引发。因此,需要仔细地设置session.timeout.ms和heartbeat.interval.ms的值。生产环境中推荐设置session.timeout.ms = 6s,heartbeat.interval.ms = 2s。将session.timeout.ms设置成6s主要是为了让Coordinator能够更快地定位已经挂掉的Consumer。
(2)Consumer消费时间过长导致的。max.poll.interval.ms参数值的设置需要为业务处理逻辑留下充足的时间。
(3)生产环境中,Consumer端如果出现频繁的Full GC导致的长时间停顿,也会引发Rebalance。
五、Kafka内置Topic
1、位移主题简介
__consumer_offsets
是Kafka的内置Topic,即位移主题(Offsets Topic)。旧版本Consumer的位移管理是依托于Apache ZooKeeper,会自动或手动地将位移数据提交到ZooKeeper中保存,当Consumer重启后,会自动从ZooKeeper中读取位移数据,从而在上次消费截止的地方继续消费。因此,Kafka Broker不需要保存位移数据,减少了Broker端需要持有的状态空间,因而有利于实现高伸缩性。但ZooKeeper不适合高吞吐量的写操作,因此,Kafka 0.9版本Consumer中正式推出全新的位移管理机制,将Consumer的位移数据作为普通的Kafka消息,提交到__consumer_offsets
中。
2、位移主题的消息格式
__consumer_offsets
位移主题的消息格式是Kafka自己定义的,用户不能修改,即不能随意地向__consumer_offsets
主题写消息,因为如果写入的消息不满足Kafka规定的格式,那Kafka内部无法成功解析,就会造成Broker崩溃。Kafka Consumer的位移提交API用于提交位移,即向__consumer_offsets
位移主题写消息。__consumer_offsets
位移主题的消息格式如下:
(1)位移消息
位移消息的Key和Value分别表示消息的键值和消息体。
Key中保存3部分内容:<Group ID,主题名,分区号>
。
(2)Consumer Group注册消息
Consumer Group注册消息用于注册Consumer Group。
(3)tombstone消息
tombstone消息,即墓碑消息,也称delete mark,用于删除Group过期位移或是删除Group的消息,墓碑消息的消息体是null。
如果某个Consumer Group下的所有Consumer实例都停止,而且其位移数据都已被删除时,Kafka会向位移主题的对应分区写入 tombstone消息,表明要彻底删除Group的信息。
3、位移主题创建
当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题。位移主题是普通的Kafka主题,其分区数由Broker端参数offsets.topic.num.partitions参数的值指定,默认值是 50,因此Kafka会自动创建一个50分区的位移主题。
位移主题的副本数由Broker端参数offsets.topic.replication.factor指定,默认值是3。
如果位移主题是Kafka自动创建的,则分区数是50,副本数是3。可以选择手动创建位移主题,在Kafka集群尚未启动任何Consumer前,使用Kafka API创建位移主题。
4、位移提交
Kafka Consumer提交位移的方式有两种:自动提交位移和手动提交位移。Consumer端参数enable.auto.commit用于控制位移提交方式。
enable.auto.commit参数值为true,表示自动提交,Consumer在后台定期提交位移,提交间隔由一个专属的参数auto.commit.interval.ms控制。对于自动提交位移,如果只要Consumer一直启动着,Consumer会定期的向位移主题写入消息。
enable.auto.commit参数值为false,表示手动提交,Consumer应用需要进行位移提交。Kafka Consumer API提供了位移提交的方法consumer.commitSync。当调用位移提交方法时,Kafka会向位移主题写入相应的消息。
5、位移消息删除
位移主题中的某类消息只需要保存最新的一条即可,其它过期消息可以删除,否则位移主题的消息会越来越多,撑爆磁盘。Kafka使用Compaction对位移主题的过期消息进行删除。
上图,位移为0、2和3的消息的Key都是K1。Compact后,分区只需要保存最新的消息(位移为3的消息)。Kafka提供专门的后台线程Log Cleaner定期地巡检待Compact的主题,看看是否存在满足条件的可删除数据。生产环境中如果出现过位移主题无限膨胀占用过多磁盘空间的问题,需要检查Log Cleaner线程的状态。
六、Consumer Offset
1、消费位移简介
Consumer Offset是消费位移,记录Consumer要消费的下一条消息的位移,用于表征消费者消费进度,每个消费者都有自己的消费者位移。
Consumer需要向Kafka汇报自己的位移数据,即提交位移(Committing Offsets)。Consumer能够同时消费多个分区的数据,因此位移的提交是在分区粒度上进行的,即Consumer需要为消费的每个分区提交各自的位移数据。提交位移主要是为了表征Consumer的消费进度,当Consumer发生故障重启后,就能够从Kafka中读取原来提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
对位移提交的管理直接影响Consumer所能提供的消息语义保障。
Kafka Consumer API提供了多种提交位移的方法。基于用户角度,位移提交分为自动提交和手动提交;基于Consumer角度,位移提交分为同步提交和异步提交。
2、offset commit
Consumer从Broker中取一批消息写入buffer进行消费,在规定的时间内消费完消息后,会自动将其消费消息的offset提交给Broker,以记录下哪些消息是消费过的。当然,若在时限内没有消费完毕,其是不会提交offset的。
3、自动提交位移
自动提交是指Kafka Consumer在后台定期自动提交位移;手动提交是指要自己提交位移。Consumer端参数enable.auto.commit用于控制提交位移的方式,默认值为true,表示自动提交,即Java Consumer 默认自动提交位移。Consumer端参数auto.commit.interval.ms用于控制自动提交位移的间隔,默认值是5秒,表明Kafka Consumer每5秒会自动提交一次位移。
enable.auto.commit参数设置为false表示手动提交位移,Kafka Consumer不会自动提交位移,需要调用相应的API手动提交位移。KafkaConsumer#commitSync()方法会提交KafkaConsumer#poll()返回的最新位移,是一个同步操作,会一直等待直到位移被成功提交才会返回;如果提交过程中出现异常,会将异常信息抛出。commitSync()使用示例如下:
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handle(e); // 处理提交失败异常
}
}
需要在处理完poll()方法返回的所有消息后调用consumer.commitSync()方法。如果过早提交位移,就可能会出现消费数据丢失的情况。对于自动提交位移,Kafka Consumer会保证在开始调用poll方法时,提交上次poll返回的所有消息。因此,poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此能保证不出现消费丢失的情况,但自动提交位移存在可能会出现重复消费的缺陷。在默认情况下,Consumer每 5秒自动提交一次位移,如果提交位移后3秒发生Rebalance操作,在Rebalance后,所有Consumer从上一次提交的位移处继续消费,但位移已经是3秒前的位移数据,因此在Rebalance发生前3秒消费的所有数据都要重新再消费一次。虽然能够通过减少auto.commit.interval.ms的值来提高提交频率,但只能缩小重复消费的时间窗口,不可能完全消除。
4、手动提交位移
手动提交位移则更加灵活,完全能够把控位移提交的时机和频率,但在调用commitSync()时,Consumer程序会处于阻塞状态,直到远端的Broker返回提交结果。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的TPS。虽然可以选择拉长提交间隔,但会导致Consumer的提交频率下降,在下次Consumer重启回来后,会有更多的消息被重新消费。
因此,Kafka为手动提交位移提供了一个异步API方法:KafkaConsumer#commitAsync()。调用commitAsync() 后,会立即返回,不会阻塞,因此不会影响Consumer应用的TPS。同时,Kafka Consumer针对异步提交位移接口提供了回调函数(callback),供实现提交后的逻辑,比如记录日志或处理异常等。commitAsync()使用示例如下:
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
consumer.commitAsync((offsets, exception) -> {
if (exception != null)
handle(exception);
});
}
commitAsync在提交失败时不会自动重试,不能替代commitSync,因为commitAsync是异步操作,倘若提交失败后自动重试,那么重试时提交的位移值可能早已经过期。因此,针对手动提交位移,需要将commitSync和commitAsync组合使用才能到达最理想的效果,可以利用commitSync的自动重试来规避瞬时错误(如网络瞬时抖动,Broker端GC等),使Consumer程序不会一直处于阻塞状态,不影响TPS。commitSync和commitAsync组合使用示例如下:
try {
while(true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
process(records); // 处理消息
commitAysnc(); // 使用异步提交规避阻塞
}
} catch(Exception e) {
handle(e); // 处理异常
} finally {
try {
consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
} finally {
consumer.close();
}
}
上述代码对于常规性、阶段性的手动提交,调用commitAsync()避免程序阻塞;而在Consumer要关闭前,调用commitSync()方法执行同步阻塞式的位移提交,以确保Consumer关闭前能够保存正确的位移数据。因此,既实现了异步无阻塞式的位移管理,也确保了Consumer位移的正确性。
commitAsync()和commitSync()提交的位移是poll方法返回的所有消息的位移,如果想更加细粒度化地提交位移,如提交poll返回的部分消息的位移,或是在消费的中间进行位移提交,需要新的Kafka Consumer API接口。为了帮助实现更精细化的位移管理功能,Kafka Consumer API还针对同步/异步提交提供了一组更为方便的方法,commitSync(Map<TopicPartition, OffsetAndMetadata>)
和commitAsync(Map<TopicPartition, OffsetAndMetadata>)
,参数是Map对象,键就是TopicPartition,即消费的分区,而值是一个OffsetAndMetadata对象,保存的主要是位移数据。示例代码如下:
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
process(record); // 处理消息
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1);
if(count % 100 == 0)
consumer.commitAsync(offsets, null); // 回调处理逻辑是null
count++;
}
}
创建Map对象,用于保存Consumer消费处理过程中要提交的分区位移,开始逐条处理消息,并构造要提交的位移值。要提交的位移是下一条消息的位移,因此构造OffsetAndMetadata对象时,使用当前消息位移加1。与调用无参的commitAsync不同,带Map对象参数的commitAsync进行细粒度的位移提交。
5、CommitFailedException
CommitFailedException表示Consumer客户端在提交位移时出现不可恢复错误或异常。如果异常是可恢复的瞬时错误,提交位移的API可以避免,很多提交位移的API方法支持自动错误重试。
CommitFailedException异常通常发生在手动提交位移时,即用户显式调用KafkaConsumer.commitSync()方法时。
当消息处理的总时间超过预设的max.poll.interval.ms参数值时,Kafka Consumer端会抛出CommitFailedException异常。CommitFailedException异常复现代码如下:
…
Properties props = new Properties();
…
props.put("max.poll.interval.ms", 5000);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
// 使用Thread.sleep模拟真实的消息处理逻辑
Thread.sleep(6000L);
consumer.commitSync();
}
使用KafkaConsumer.subscribe方法随意订阅一个Topic,设置Consumer端参数max.poll.interval.ms=5秒,最后在循环调用 KafkaConsumer.poll 方法间,插入Thread.sleep(6000)和手动提交位移,成功复现异常。
避免CommitFailedException异常:
(1)缩短单条消息处理的时间。
(2)增加Consumer端允许下游系统消费一批消息的最大时长。如果消费逻辑不能简化,那么应该提高Consumer端参数max.poll.interval.ms(Kafka 0.10.1.0版本引入)值。
(3)减少下游系统一次性消费的消息总数。减少Consumer端参数 max.poll.records的值,默认值是500条,因此调用一次Kafka Consumer.poll方法,最多返回500条消息。
(4)下游系统使用多线程来加速消费。使用Kafka Consumer消费数据是单线程,当消费速度无法匹配Kafka Consumer消息返回的速度时,会抛出CommitFailedException异常。如果多线程进行消费,可以灵活地控制线程数量,随时调整消费承载能力,但实现较为复杂。主流的大数据流处理框架使用多线程消费方案,如Apache Flink在集成 Kafka 时就是创建了多个Kafka Consumer Thread线程,自行处理多线程间的数据消费。
Kafka Java Consumer端还提供了一个名为Standalone Consumer的独立消费者,每个Standalone Consumer实例都是独立工作的,彼此之间毫无联系。独立消费者的位移提交机制和Consumer Group是一样的,比如独立消费者也需要指定group.id参数才能提交位移。
如果应用中同时出现具有相同group.id值的Consumer Group程序和Standalone Consumer程序,那么当独立消费者程序手动提交位移时,Kafka就会立即抛出CommitFailedException异常,因为Kafka无法识别具有相同group.id的消费者实例,返回一个错误,表明不是消费者组内合法的成员。
七、Kafka副本机制
1、副本机制简介
副本机制(Replication)是指分布式系统在多台网络互联的机器上保存有相同的数据拷贝。副本机制的优点如下:
(1)提供数据冗余。即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
(2)提供高伸缩性。支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
(3)改善数据局部性。允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
2、Kafka副本简介
Kafka副本(Replica)本质是一个只能追加写消息的提交日志。根据Kafka副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,并分散保存在不同的Broker上,从而能够对抗部分Broker宕机带来的数据不可用。生产环境中,每台Broker都可能保存有各个主题下不同分区的不同副本,因此,单个Broker上存有成百上千个副本是正常的。
3台Broker的Kafka集群上,主题1分区0的3个副本分散在3台Broker上,其它主题分区的副本分散在不同的Broker上,从而实现数据冗余。
3、Kafka副本机制简介
Kafka采用基于领导者(Leader-based)的副本机制。
(1)在Kafka中,副本分为领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为Leader副本,其余副本自动称为Follower副本。
(2)Kafka副本机制比其它分布式系统要更严格。Kafka中Follower副本不对外提供服务,任何一个Follower副本都不能响应消费者和生产者的读写请求,所有的请求都必须由Leader副本来处理,即所有的读写请求都必须发往Leader副本所在的Broker进行处理。Follower副本不处理客户端请求,唯一的任务是从Leader副本异步拉取消息,并写入到自己的提交日志中,从而实现与Leader副本的同步。
(3)当Leader副本挂掉时,Kafka依托于ZooKeeper提供的监控功能能够实时感知到,并立即开启新一轮的Leader选举,从Follower副本中选一个作为新的Leader。原Leader副本重启后,只能作为Follower副本加入到集群中。
4、Kafka副本机制优点
Kafka副本机制的Follower副本不对外提供服务,因此Kafka不能提供读操作横向扩展以及改善局部性,但Kafka副本机制的优点如下:
(1)方便实现Read-your-writes。Read-your-writes是当使用生产者API向Kafka成功写入消息后,马上使用消费者API去读取刚才生产的消息。如果允许Follower副本对外提供服务,由于副本同步是异步的,因此有可能出现Follower副本还没有从Leader副本拉取到最新的消息,从而使得客户端看不到最新写入的消息。
(2)方便实现单调读(Monotonic Reads)一致性。对于一个消费者用户,在多次消费消息时,不会看到某条消息一会儿存在一会儿不存在。如果允许Follower副本提供读服务,那么假设当前有2个Follower副本 F1 和 F2,异步地拉取Leader副本数据。倘若F1拉取了Leader 的最新消息而F2还未及时拉取,那么,此时如果有一个消费者先从F1读取消息后又从F2拉取消息,可能会出现第一次消费时看到的最新消息在第二次消费时不见。
5、ISR简介
Kafka引入了In-sync Replicas(副本同步列表)。ISR中的副本都是与Leader同步的副本,不在ISR中的Follower副本是与Leader不同步的。Leader副本天然就在 ISR 中,但ISR不只是Follower副本集合,是一个动态调整的集合,由Leader负责维护。
AR(Assigned Replicas)即已分配的副本列表,是指某个Partition的所有副本。
OSR,Out of-Sync Replicas, 即非同步的副本列表。
AR = ISR + OSR
Follower副本要进入ISR需要满足一定条件:
图中Leader副本当前写入10条消息,Follower1副本同步了其中的6条消息,而Follower2副本只同步了其中的3条消息。但Kafka判断 Follower是否与Leader同步的标准,并不是看Follower副本与Leader副本相差的消息数,而是Broker端replica.lag.time.max.ms参数值。replica.lag.time.max.ms参数表示Follower副本能够落后Leader副本的最长时间间隔,默认值是10秒。只要一个Follower副本落后Leader 副本的时间不连续超过10秒,Kafka认为Follower副本与Leader是同步的,即使此时Follower副本中保存的消息明显少于Leader副本中的消息。Follower副本唯一的工作就是不断地从Leader副本拉取消息,然后写入到自己的提交日志中。如果同步过程的速度持续慢于Leader副本的消息写入速度,在replica.lag.time.max.ms时间后,Follower副本会被认为是与Leader副本不同步的,因此不能再放入ISR中。Kafka会自动收缩ISR集合,将Follower副本踢出ISR。
6、Leader选举
ISR是可以动态调整的,如果ISR为空,则表明Leader副本已经挂掉,因此Kafka需要重新选举一个新的Leader。
Kafka把所有不在ISR中的存活副本都称为非同步副本。通常,非同步副本落后Leader副本太多,因此,如果选择非同步副本作为新Leader,可能会出现数据丢失。Kafka中,如果ISR为空,从非同步副本选举Leader的过程称为Unclean领导者选举(Unclean Leader Election)。
Broker端参数unclean.leader.election.enable控制是否允许Unclean领导者选举。开启Unclean领导者选举可能会造成数据丢失,但会使得分区Leader副本一直存在,不会停止对外提供服务,提高可用性。禁止Unclean领导者选举可以维护数据的一致性,避免消息丢失,但牺牲了高可用性。
当Leader宕机后,Broker Controller会从ISR中挑选一个Follower成为新的Leader。如果ISR中没有其它副本,可以通过unclean.leader.election.enable的值来设置Leader选举范围。
false:必须等到ISR列表中所有的副本都活过来才进行新的选举,可靠性有保证,但可用性低。
true:在ISR列表中没有副本的情况下,可以选择任意一个没有宕机的Broker作为新的Leader,可用性高,但可靠性没有保证。
以上是关于Kafka快速入门——Kafka架构的主要内容,如果未能解决你的问题,请参考以下文章
大数据技术之KafkaKafka概述Kafka快速入门Kafka架构深入
大数据技术之KafkaKafka概述Kafka快速入门Kafka架构深入
大数据技术之KafkaKafka概述Kafka快速入门Kafka架构深入