Kafka的分区机制

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka的分区机制相关的知识,希望对你有一定的参考价值。

参考技术A Kafka在⼀定数量的服务器上对主题分区进⾏复制。
当集群中的⼀个broker宕机后系统可以⾃动故障转移到其他可⽤的副本上,不会造成数据丢失。
--replication-factor 3 1leader+2follower

Follower分区像普通的Kafka消费者⼀样,消费来⾃Leader分区的消息,并将其持久化到⾃⼰的⽇志中。
允许Follower对⽇志条⽬拉取进⾏批处理。
同步节点定义:

下图中
分区P1的Leader是0,ISR是0和1
分区P2的Leader是2,ISR是1和2
分区P3的Leader是1,ISR是0,1,2。

⽣产者和消费者的请求都由Leader副本来处理。Follower副本只负责消费Leader副本的数据和Leader保持同步。
对于P1,如果0宕机会发⽣什么?
Leader副本和Follower副本之间的关系并不是固定不变的,在Leader所在的broker发⽣故障的时候,就需要进⾏
分区的Leader副本和Follower副本之间的切换,需要选举Leader副本。
如何选举?
如果某个分区所在的服务器除了问题,不可⽤,kafka会从该分区的其他的副本中选择⼀个作为新的Leader。之后
所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的Leader。
只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
Kafka会在Zookeeper上针对每个Topic维护⼀个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是
⼀些分区的副本。
只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的⽣产者。
如果这个集合有增减,kafka会更新zookeeper上的记录。
如果某个分区的Leader不可⽤,Kafka就会从ISR集合中选择⼀个副本作为新的Leader。
显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数⽐较⾼。
假设某个topic有N+1个副本,kafka可以容忍N个服务器不可⽤。
为什么不⽤少数服从多数的⽅法
少数服从多数是⼀种⽐较常⻅的⼀致性算发和Leader选举法。
它的含义是只有超过半数的副本同步了,系统才会认为数据已同步;
选择Leader时也是从超过半数的同步的副本中选择。
这种算法需要较⾼的冗余度,跟Kafka⽐起来,浪费资源。
譬如只允许⼀台机器失败,需要有三个副本;⽽如果只容忍两台机器失败,则需要五个副本。
⽽kafka的ISR集合⽅法,分别只需要两个和三个副本。
如果所有的ISR副本都失败了怎么办?
此时有两种⽅法可选,

向已经部署好的Kafka集群⾥⾯添加机器,我们需要从已经部署好的Kafka节点中复制相应的配置⽂件,然后把⾥
⾯的broker id修改成全局唯⼀的,最后启动这个节点即可将它加⼊到现有Kafka集群中。
问题:新添加的Kafka节点并不会⾃动地分配数据,⽆法分担集群的负载,除⾮我们新建⼀个topic。
需要⼿动将部分分区移到新添加的Kafka节点上,Kafka内部提供了相关的⼯具来重新分布某个topic的分区。
在重新分布topic分区之前,我们先来看看现在topic的各个分区的分布位置:

在node11搭建Kafka:
拷⻉JDK并安装

此处不需要zookeeper,切记!!!

让配置⽣效:
. /etc/profile
拷⻉node1上安装的Kafka

修改node11上Kafka的配置:

启动Kafka:

注意观察node11上节点启动的时候的ClusterId,看和zookeeper节点上的ClusterId是否⼀致,如果是,证明node11和node1在同⼀个集群中。
node11启动的Cluster ID:

zookeeper节点上的Cluster ID:

然后使⽤ kafka-reassign-partitions.sh ⼯具⽣成reassign plan

Proposed partition reassignment configuration下⾯⽣成的就是将分区重新分布到broker 1上的结果。我们将这些内容保存到名为result.json⽂件⾥⾯(⽂件名不重要,⽂件格式也不⼀定要以json为结尾,只要保证内容是json即可),然后执⾏这些reassign plan:

执⾏计划:

这样Kafka就在执⾏reassign plan,我们可以校验reassign plan是否执⾏完成:

查看主题的细节:

分区的分布的确和操作之前不⼀样了,broker 1上已经有分区分布上去了。使⽤ kafka-reassign�partitions.sh ⼯具⽣成的reassign plan只是⼀个建议,⽅便⼤家⽽已。其实我们⾃⼰完全可以编辑⼀个reassignplan,然后执⾏它,如下:

将上⾯的json数据⽂件保存到my-topics-to-execute.json⽂件中,然后也是执⾏它:

等这个reassign plan执⾏完,我们再来看看分区的分布:

我们可以在新建主题的时候,⼿动指定主题各个Leader分区以及Follower分区的分配情况,即什么分区副本在哪
个broker节点上。
随着系统的运⾏,broker的宕机重启,会引发Leader分区和Follower分区的⻆⾊转换,最后可能Leader⼤部分都
集中在少数⼏台broker上,由于Leader负责客户端的读写操作,此时集中Leader分区的少数⼏台服务器的⽹络I/O,
CPU,以及内存都会很紧张。
Leader和Follower的⻆⾊转换会引起Leader副本在集群中分布的不均衡,此时我们需要⼀种⼿段,让Leader的分
布重新恢复到⼀个均衡的状态。
执⾏脚本:

上述脚本执⾏的结果是:创建了主题tp_demo_03,有三个分区,每个分区两个副本,Leader副本在列表中第⼀个指定的brokerId上,Follower副本在随后指定的brokerId上。

然后模拟broker0宕机的情况:

是否有⼀种⽅式,可以让Kafka⾃动帮我们进⾏修改?改为初始的副本分配?
此时,⽤到了Kafka提供的⾃动再均衡脚本: kafka-preferred-replica-election.sh
先看介绍:

该⼯具会让每个分区的Leader副本分配在合适的位置,让Leader分区和Follower分区在服务器之间均衡分配。
如果该脚本仅指定zookeeper地址,则会对集群中所有的主题进⾏操作,⾃动再平衡。
具体操作:

执⾏操作:

查看操作的结果:

恢复到最初的分配情况。
之所以是这样的分配,是因为我们在创建主题的时候:

在逗号分割的每个数值对中排在前⾯的是Leader分区,后⾯的是副本分区。那么所谓的preferred replica,就是排在前⾯的数字就是Leader副本应该在的brokerId。

实际项目中,我们可能由于主题的副本因子设置的问题,需要重新设置副本因子。
或者由于集群的扩展,需要重新设置副本因子。
topic⼀旦使用又不能轻易删除重建,因此动态增加副本因子就成为最终的选择。

说明:kafka 1.0版本配置⽂件默认没有default.replication.factor=x, 因此如果创建topic时,不指定–replication-factor 想, 默认副本因⼦为1. 我们可以在⾃⼰的 server.properties 中配置上常⽤的副本因⼦,省去⼿动调整。例如设置default.replication.factor=3, 详细内容可参考官⽅⽂档 https://kafka.apache.org/documentation/#rep

原因分析:
假设我们有2个kafka broker分别broker0,broker1。

2.查看主题细节

3.修改副本因⼦:错误

在Kafka中,每个Topic会包含多个分区,默认情况下⼀个分区只能被⼀个消费组下⾯的⼀个消费者消费,这⾥就产⽣了分区分配的问题。Kafka中提供了多重分区分配算法(PartitionAssignor)的实现:RangeAssignor、RoundRobinAssignor、StickyAssignor。

PartitionAssignor接⼝⽤于⽤户定义实现分区分配算法,以实现Consumer之间的分区分配。
消费组的成员订阅它们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker。协调者选择其中的⼀个消费者来执⾏这个消费组的分区分配并将分配结果转发给消费组内所有的消费者。Kafka默认采⽤RangeAssignor的分配算法。
RangeAssignor对每个Topic进⾏独⽴的分区分配。对于每⼀个Topic,⾸先对分区按照分区ID进⾏数值排序,然后订阅这个Topic的消费组的消费者再进⾏字典排序,之后尽量均衡的将分区分配给消费者。这⾥只能是尽量均衡,因为分区数可能⽆法被消费者数量整除,那么有⼀些消费者就会多分配到⼀些分区。

⼤致算法如下:

RangeAssignor策略的原理是按照消费者总数和分区总数进⾏整除运算来获得⼀个跨度,然后将分区按照跨度进⾏平均分配,以保证分区尽可能均匀地分配给所有的消费者。对于每⼀个Topic,RangeAssignor策略会将消费组内所有订阅这个Topic的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配⼀个分区。
这种分配⽅式明显的⼀个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,⽐如上图中4个分区3个消费者的场景,C0会多分配⼀个分区。如果此时再订阅⼀个分区数为4的Topic,那么C0⼜会⽐C1、C2多分配⼀个分区,这样C0总共就⽐C1、C2多分配两个分区了,⽽且随着Topic的增加,这个情况会越来越严重。
字典序靠前的消费组中的消费者⽐较“贪婪”。

RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进⾏排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进⾏排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与⼀些Topic的分配。

相对于RangeAssignor,在订阅多个Topic的情况下,RoundRobinAssignor的⽅式能消费者之间尽量均衡的分配到分区(分配到的分区数的差值不会超过1——RangeAssignor的分配策略可能随着订阅的Topic越来越多,差值越来越⼤)。
对于消费组内消费者订阅Topic不⼀致的情况:假设有两个个消费者分别为C0和C1,有2个Topic T1、T2,分别拥有3和2个分区,并且C0订阅T1和T2,C1订阅T2,那么RoundRobinAssignor的分配结果如下:

动机
尽管RoundRobinAssignor已经在RangeAssignor上做了⼀些优化来更均衡的分配分区,但是在⼀些情况下依旧会
产⽣严重的分配偏差,⽐如消费组中订阅的Topic列表不相同的情况下。
更核⼼的问题是⽆论是RangeAssignor,还是RoundRobinAssignor,当前的分区分配算法都没有考虑上⼀次的分
配结果。显然,在执⾏⼀次新的分配之前,如果能考虑到上⼀次分配的结果,尽量少的调整分区分配的变动,显然是
能节省很多开销的。
⽬标
从字⾯意义上看,Sticky是“粘性的”,可以理解为分配结果是带“粘性的”:

⾃定义的分配策略必须要实现org.apache.kafka.clients.consumer.internals.PartitionAssignor接⼝。
PartitionAssignor接⼝的定义如下:

PartitionAssignor接⼝中定义了两个内部类:Subscription和Assignment。
Subscription类⽤来表示消费者的订阅信息,类中有两个属性:topics和userData,分别表示消费者所订阅topic
列表和⽤户⾃定义信息。PartitionAssignor接⼝通过subscription()⽅法来设置消费者⾃身相关的Subscription信息,
注意到此⽅法中只有⼀个参数topics,与Subscription类中的topics的相互呼应,但是并没有有关userData的参数体
现。为了增强⽤户对分配结果的控制,可以在subscription()⽅法内部添加⼀些影响分配的⽤户⾃定义信息赋予
userData,⽐如:权重、ip地址、host或者机架(rack)等等。
再来说⼀下Assignment类,它是⽤来表示分配结果信息的,类中也有两个属性:partitions和userData,分别表
示所分配到的分区集合和⽤户⾃定义的数据。可以通过PartitionAssignor接⼝中的onAssignment()⽅法是在每个消费
者收到消费组leader分配结果时的回调函数,例如在StickyAssignor策略中就是通过这个⽅法保存当前的分配⽅案,以
备在下次消费组再平衡(rebalance)时可以提供分配参考依据。
接⼝中的name()⽅法⽤来提供分配策略的名称,对于Kafka提供的3种分配策略⽽⾔,RangeAssignor对应的
protocol_name为“range”,RoundRobinAssignor对应的protocol_name为“roundrobin”,StickyAssignor对应的
protocol_name为“sticky”,所以⾃定义的分配策略中要注意命名的时候不要与已存在的分配策略发⽣冲突。这个命名
⽤来标识分配策略的名称,在后⾯所描述的加⼊消费组以及选举消费组leader的时候会有涉及。
真正的分区分配⽅案的实现是在assign()⽅法中,⽅法中的参数metadata表示集群的元数据信息,⽽
subscriptions表示消费组内各个消费者成员的订阅信息,最终⽅法返回各个消费者的分配信息。
Kafka中还提供了⼀个抽象类org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor,它可以
简化PartitionAssignor接⼝的实现,对assign()⽅法进⾏了实现,其中会将Subscription中的userData信息去掉后,在
进⾏分配。Kafka提供的3种分配策略都是继承⾃这个抽象类。如果开发⼈员在⾃定义分区分配策略时需要使⽤
userData信息来控制分区分配的结果,那么就不能直接继承AbstractPartitionAssignor这个抽象类,⽽需要直接实现
PartitionAssignor接⼝。

在使⽤时,消费者客户端需要添加相应的Properties参数,示例如下:

Kafka核心技术与实战——09 | 生产者消息分区机制原理剖析

  • 如何将这么大的数据量均匀地分配到 Kafka 的各个 Broker 上,就成为一个非常重要的问题
  • 为什么分区?
    • Kafka 有主题(Topic)的概念,它是承载真实数据的逻辑容器
      • 而在主题之下还分为若干个分区,也就是说 Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息
      • 主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份
    • 其实分区的作用就是提供负载均衡的能力
      • 或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)
      • 不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。
      • 并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量
    • 除了提供负载均衡这种最核心的功能之外,利用分区也可以实现其他一些业务级别的需求,比如实现业务级别的消息顺序的问题
  • 都有哪些分区策略?
    • 所谓分区策略是决定生产者将消息发送到哪个分区的算法。
      • Kafka 为我们提供了默认的分区策略,同时它也支持你自定义分区策略
    • 如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class
      • 在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口
      • 这个接口也很简单,只定义了两个方法:partition()和close(),通常你只需要实现最重要的 partition 方法
  • 轮询策略
    • 也称 Round-robin 策略,即顺序分配
    • 这就是所谓的轮询策略。轮询策略是 Kafka Java 生产者 API 默认提供的分区策略。如果你未指定partitioner.class参数,那么你的生产者程序会按照轮询的方式在主题的所有分区间均匀地“码放”消息。
    • 轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一
    • 技术图片
  • 随机策略
    • 也称 Randomness 策略。所谓随机就是我们随意地将消息放置到任意一个分区上
    • 如果要实现随机策略版的 partition 方法,很简单,只需要两行代码即可:
      • List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
      • return ThreadLocalRandom.current().nextInt(partitions.size());
    • 先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
    • 本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以如果追求数据的均匀分布,还是使用轮询策略比较好。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
    • 技术图片
  • 按消息键保序策略
    • 也称 Key-ordering 策略
    • Kafka 允许为每条消息定义消息键,简称为 Key。这个 Key 的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务 ID 等
    • 实现这个策略的 partition 方法同样简单,只需要下面两行代码即可:
      • List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
      • return Math.abs(key.hashCode()) % partitions.size();
    • 前面提到的 Kafka 默认分区策略实际上同时实现了两种策略:如果指定了 Key,那么默认实现按消息键保序策略;如果没有指定 Key,则使用轮询策略
    • 有因果关系的消息
      • 对此标志位设定专门的分区策略,保证同一标志位的所有消息都发送到同一分区
      • 这样既可以保证分区内的消息顺序,也可以享受到多分区带来的性能红利
      • 这种基于个别字段的分区策略本质上就是按消息键保序的思想,其实更加合适的做法是把标志位数据提取出来统一放到 Key 中,这样更加符合 Kafka 的设计思想
      • 经过改造之后,这个企业的消息处理吞吐量一下提升了 40 多倍
      • 从这个案例你也可以看到自定制分区策略的效果可见一斑
      • 技术图片
  • 其他分区策略
    • 基于地理位置的分区策略
    • 根据 Broker 所在的 IP 地址实现定制化的分区策略
  • 小结
    • 切记分区是实现负载均衡以及高吞吐量的关键
    • 故在生产者这一端就要仔细盘算合适的分区策略,避免造成消息数据的“倾斜”,使得某些分区成为性能瓶颈,这样极易引发下游数据消费的性能下降
    • 发现kafka同一个topic是无法保证数据的顺序性的,但是同一个partition中的数据是有顺序的
    • 要保证全局顺序。后来发现其实使用key+多分区也可以实现。反正保证同一批因果依赖的消息分到一个分区就可以

以上是关于Kafka的分区机制的主要内容,如果未能解决你的问题,请参考以下文章

kafka存储结构以及Log清理机制

Kafka的存储机制以及可靠性

Kafka核心技术与实战——09 | 生产者消息分区机制原理剖析

Kafka内核中的分布式机制实现

kafka-2-生产者分区机制/消息压缩/无丢失配置介绍

Kafka的存储机制以及可靠性