kafka 消息分发机制分区和副本机制
Posted J.King
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka 消息分发机制分区和副本机制相关的知识,希望对你有一定的参考价值。
一、消息分发机制
1.1 kafka 消息分发策略
消息是 kafka 中最基本的数据单元,在 kafka 中,一条消息由key、value两部分构成,在发送一条消息 时,我们可以指定这个key,那么 producer 会根据 key 和partition 机制来判断当前这条消息应该发送并 存储到哪个 partition 中。我们可以根据需要进行扩展 producer 的 partition 机制。
代码示例可移步:kafka 应用实战的第五点自定义分区(Partitioner)
1.2、消息默认的分发机制
默认情况下,kafka采用的是 hash 取模的分区算法。如果 Key 为 null,则会随机分配一个分区。这个随机是在这个参数metadata.max.age.ms
的时间范围内随机选择一个。对于这个时间段内,如果key为 null,则只会发送到唯一的分区。这个值默认情况下是10分钟更新一次。
关于 Metadata,简单理解就是 Topic/Partition 和 broker 的映射关系,每一个 topic 的每一个 partition,需要知道对应的 broker 列表是什么,leader 是谁、follower 是谁。这些信息都是存储在 Metadata 这个类里面。
1.3 消费端如何消费指定的分区
通过下面的代码,就可以消费指定该topic下的0号分区。其他分区的数据就无法接收
//消费指定分区的时候,不需要再订阅
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消费指定的分区
TopicPartition topicPartition=new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));
二、消息的消费原理
2.1 kafka消息消费原理演示
在实际生产过程中,每个 topic 都会有多个 partitions,多个 partitions 的好处在于,一方面能够对 broker 上的数据进行分片有效减少了消息的容量从而提升io性能。另外一方面,为了提高消费端的消费能力,一般会通过多个 consumer 去消费同一个 topic ,也就是消费端的负载均衡机制,也就是我们接下来要了解的,在多个 partition 以及多个 consumer 的情况下,消费者是如何消费消息的。
同时,kafka 存在 consumer group 的概念,也就是 group.id 一样的 consumer,这些 consumer 属于一个 consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的 consumer 来消费。
那么同一个 consumer group 里面的 consumer 是怎么去分配该消费哪个分区里的数据的呢?
3个 partiton 对应3个 consumer
对于上面这个图来说,这3个消费者会分别消费 test 这个 topic 的3个分区(p0,p1, p2),也就是每个 consumer 消费一 个 partition
3个 partiton 对应2个 consumer
对于上面这个图来说,可能会出现的情况是 consumer1 消费 p0,p1,consumer2 消费 p2。也可能是反过来consumer2 消费2个partition,具体根据分区策略来定。
3个 partition 对应4个或以上 consumer
对于上面这个图来说,仍然只有3个consumer对应3个partition,consumer4无法消费消息。
consumer和partition的数量建议
- 如果 consumer 比 partition 多,是浪费,因为 kafka 的设计是在一个 partition 上是不允许并发的, 所以 consume r数不要大于 partition 数
- 如果 consumer 比 partition 少,一个 consumer 会对应于多个 partitions,这里主要合理分配 consumer 数和 partition 数,否则会导致 partition 里面的数据被取的不均匀。最好 partiton 数目是 consumer数目的整数倍,所以 partition 数目很重要,比如取24,就很容易设定 consumer 数目
- 如果 consumer 从多个 partition 读到数据,不保证数据间的顺序性,kafka 只保证在一个 partition 上数据是有序的,但多个 partition,根据你读的顺序会有不同
- 增减 consumer,broker,partition 会导致 rebalance,所以 rebalance 后 consumer 对应的 partition会发生变化
2.2 分区分配策略
2.2.1 什么是分区分配策略
通过前面的案例演示,我们应该能猜到,同一个 group 中的消费者对于一个 topic 中的多个 partition,存在一定的分区分配策略。
在 kafka 中,存在三种分区分配策略:
- Range(默认)
- RoundRobin(轮询)
- StickyAssignor(粘性)
在消费端中的 ConsumerConfig 中,通过这个属性来指定分区分配策略
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
2.2.2 分区分配策略触发时机
当出现以下几种情况时,kafka 会进行一次分区分配操作,也就是 kafka consumer 的 rebalance
- 同一个consumer group内新增了消费者
- 消费者离开当前所属的 consumer group,比如主动停机或者宕机
- topic 新增了分区(也就是分区数量发生了变化)
kafka consuemr 的 rebalance 机制规定了一个 consumer group下的所有 consumer 如何达成一致来分配订阅 topic 的每个分区。
2.2.3 RangeAssignor(范围分区)
Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
假设n = 分区数/消费者数量
m= 分区数%消费者数量
那么前m个消费者每个分配n+l个分区,后面的(消费者数量-m)个消费者每个分配n个分区
假设我们有10个分区,3个消费者,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将 会是C1-0, C2-0, C3-0。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个 分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
在我们的例子里面,我们有10个分 区,3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区 结果看起来是这样的:
- C1-0 将消费 0, 1, 2, 3 分区
- C2-0 将消费 4, 5, 6 分区
- C3-0 将消费 7, 8, 9 分区
假如我们有11个分区,那么最后分区分配的结果看起来是这样的:
- C1-0 将消费 0, 1, 2, 3 分区
- C2-0 将消费 4, 5, 6, 7 分区
- C3-0 将消费 8, 9, 10 分区
假如我们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:
- C1-0 将消费 T1主题的 0, 1, 2, 3 分区以及 T2主题的 0, 1, 2, 3分区
- C2-0 将消费 T1主题的 4, 5, 6 分区以及 T2主题的 4, 5, 6分区
- C3-0 将消费 T1主题的 7, 8, 9 分区以及 T2主题的 7, 8, 9分区 可以看出,
C1-0 消费者线程比其他消费者线程多消费了2个分区,这就是 Range strategy 的一个很明显的弊端
2.2.4 RoundRobinAssignor(轮询分区)
轮询分区策略是把所有 partition 和所有 consumer 线程都列出来,然后按照 hashcode 进行排序。最后通过轮询算法分配 partition 给消费线程。如果所有consumer 实例的订阅是相同的,那么 partition 会均匀 分布。
在我们的例子里面,假如按照 hashCode 排序完的 topic-partitions 组依次为 T1-5, T1-3, T1-0, T1-8, T1- 2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为 C1-0, C1-1, C2-0, C2-1,
最后分区分配的结果 为:
- C1-0 将消费 T1-5, T1-2, T1-6 分区;
- C1-1 将消费 T1-3, T1-1, T1-9 分区;
- C2-0 将消费 T1-0, T1-4 分区;
- C2-1 将消费 T1-8, T1-7 分区;
使用轮询分区策略必须满足两个条件:
- 每个主题的消费者实例具有相同数量的流
- 每个消费者订阅的主题必须是相同的
2.2.5 StrickyAssignor 分配策略
kafka在0.11.x版本支持了StrickyAssignor, 翻译过来叫粘滞策略,它主要有两个目的
- 分区的分配尽可能的均匀
- 分区的分配尽可能和上次分配保持相同
当两者发生冲突时, 第 一 个目标优先于第二个目标。 鉴于这两个目标, StickyAssignor 分配策略的具体实现要比 RangeAssignor 和 RoundRobinAssignor 这两种分配策略要复杂得多,假设我们有这样一个 场景
假设消费组有3个消费者:C0,C1,C2,它们分别订阅了4个Topic(t0,t1,t2,t3),并且每个主题有两个分 区(p0,p1),也就是说,整个消费组订阅了8个分区:tOpO 、 tOpl 、 tlpO 、 tlpl 、 t2p0 、 t2pl 、t3p0 、 t3pl
那么最终的分配场景结果为 :
- CO: tOpO、tlpl 、 t3p0
- Cl: tOpl、t2p0 、 t3pl
- C2: tlpO、t2pl
这种分配方式有点类似于轮询策略,但实际上并不是,因为假设这个时候,C1这个消费者挂了,就势必会造成 重新分区(reblance),如果是轮询,那么结果应该是:
- CO: tOpO、tlpO、t2p0、t3p0
- C2: tOpl、tlpl、t2pl、t3pl
然后,strickyAssignor 它是一种粘滞策略,所以它会满足分区的分配尽可能和上次分配保持相同
,所以分配结果应该是:
- CO: tOpO、tlpl 、 t3p0、t2p0
- C2: tlpO、t2pl、tOpl、t3pl
也就是说,C0和C2保留了上一次是的分配结果,并且把原来C1的分区分配给了C0和C2。 这种策略的好处是使得分区发生变化时,由于分区的粘性
,减少了不必要的分区移动
2.3 如何保存消费端的消费位置
2.3.1 什么是offset
每个 topic 可以划分多个分区(每个Topic至少有一个分 区),同一 topic 下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset 保证消息在分区内的顺 序,offset 的顺序不跨分区,即 kafka 只保证在同一个分区内的消息是有序的; 对于应用层的消费来 说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset。
2.3.2 offset在哪里维护?
在 kafka 中,提供了一个 consumer_offsets_* 的一个 topic,把 offset 信息写入到这个 topic 中。 consumer_offsets ——按保存了每个 consumer group 某一时刻提交的 offset 信息。
__consumer_offsets 默认有50个分区。
可以根据以下计算公式计算保存位置:
//默认情况下 groupMetadataTopicPartitionCount 有50个分区
Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount;
获取保存位置后,执行如下命令,可以查看当前 consumer_goup 中的 offset 位移提交的信息
kafka-console-consumer.sh --topic __consumer_offsets --partition 15 --bootstrap-server 192.168.10.150:9092 --formatter
'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
从输出结果中,我们就可以看到 tes t这个 topic 的 offset 的位移日志
三、分区的副本机制
我们已经知道 Kafka 的每个 topi c都可以分为多个 Partition ,并且多个 partition 会均匀分布在集群的各个 节点下。虽然这种方式能够有效的对数据进行分片,但是对于每个 partition 来说,都是单点的,当其中 一个 partition 不可用的时候,那么这部分消息就没办法消费。所以 kafka 为了提高 partition 的可靠性而提供了副本的概念(Replica),通过副本机制来实现冗余备份。
每个分区可以有多个副本,并且在副本集合中会存在一个 leader 的副本,所有的读写请求都是由 leader 副本来进行处理。剩余的其他副本都做为 follower副本,follower 副本会从leader副本同步消息日志。 这个有点类似 zookeeper 中 leader 和 follower 的概念,但是具体的时间方式还是有比较大的差异。所以 我们可以认为,副本集会存在一主多从的关系。
一般情况下,同一个分区的多个副本会被均匀分配到集群中的不同 broker上,当 leade r副本所在的 broker出现故障后,可以重新选举新的 leade 副本继续对外提供服务。通过这样的副本机制来提高 kafka 集群的可用性。
3.1 创建一个带副本机制的topic
通过下面的命令去创建带2个副本的topic
sh kafka-topics.sh --create --zookeeper 192.168.10.150:2181 --replication-factor 3 --partitions 3 --topic secondTopic
然后我们可以在 /tmp/kafka-log 路径(kafka的server.properties中配置)下看到对应 topic 的副本信息了。
针对 secondTopic 这个 topic 的3个分区对应的3个副本用下图展示
leader 表示当前分区的 leader 是那个 broker-id。
broker0 中的 secondTopic_0 是 leader,broker1和broker2中的 secondTopic_0 是 follower 副本。
需要注意的是:
- kafka 集群中的一个 broker 中最多只能有一个副本
- leader 副本所在的 broker 节点的分区叫 leader 节点
- follower 副本所在的 broker 节点的分区叫 follower 节点
3.2 副本的leader选举
Kafka提供了数据复制算法保证,如果leader副本所在的broker节点宕机或者出现故障,或者分区的 leader节点发生故障; 那么,kafka必须要保证从follower副本中选择一个新的leader副本。
那么kafka是如何实现选举的呢? 要了解leader选举,我们需要了解几个概念 Kafka分区下有可能有很多个副本(replica)用于实现冗余,从而进一步实现高可用。
副本根据角色的不同可分为3类:
- leader副本:响应 clients 端读写请求的副本
- follower副本:被动地备份 leader 副本中的数据,不能响应 clients 端读写请求。
- ISR副本:包含了 leader 副本和所有与 leader 副本保持同步的 follower 副本——如何判定是否与leader同步。后面会提到每个Kafka副本对象都有两个重要的属性:LEO 和 HW。注意是所有的副本,而不只是 leader 副本。
- LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下 一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。
- HW:即上面提到的水位值。对于同一个副本对象而言,其 HW 值不会大于 LEO 值。小于等于 HW 值的所有消息都被认为是
已备份
的(replicated)。同理,leader 副本和 followe r副本的 HW 更新是有区别的
3.3 副本协同机制
消息的读写操作都只会由leader节点来接收和处理。follower副本只负责同步数据以及当 leader副本所在的broker挂了以后,会从follower副本中选取新的leader。
写请求首先由 Leader 副本处理,之后 follower 副本会从 leader 上拉取写入的消息,这个过程会有一定的 延迟,导致 follower 副本中保存的消息略少于 leader 副本,但是只要没有超出阈值都可以容忍。但是如果一个 follower 副本出现异常,比如宕机、网络断开等原因长时间没有同步到消息,那这个时候, leader 就会把它踢出去。kafka 通过 ISR 集合来维护一个分区副本信息。
一个新 leader 被选举并被接受客户端的消息成功写入。Kafka 确保从同步副本列表中选举一个副本为 leader;leader负责维护和跟踪ISR(in-Sync replicas , 副本同步队列)中所有 follower 滞后的状态。当 producer 发送一条消息到 broker 后,leader 写入消息并复制到所有 follower。消息提交之后才被成功复制到所有的同步副本。
3.4 ISR
ISR表示目前可用且消息量与leader相差不多的副本集合,这是整个副本集合的一个子集。
ISR集合中的副本必须满足两个条件:
- 副本所在节点必须维持着与 zookeeper 的连接
- 副本最后一条消息的 offset 与 leader 副本的最后一条消息的 offset 之间的差值不能超过指定的阈值 (replica.lag.time.max.ms)
- replica.lag.time.max.ms:如果该 follower 在此时间间隔内一直没有追上过 leade r的所有消息,则该 follower 就会被剔除 isr 列表
ISR 数据保存在 Zookeeper 的/brokers/topics//partitions//state
节点中
follower 副本把 leader 副本 LEO之 前的日志全部同步完成时,则认为 follower 副本已经追赶上了 leader 副本,这个时候会更新这个副本的lastCaughtUpTimeMs
标识,kafk 副本管理器会启动一个副本过期检查的定时任务,这个任务会定期检查当前时间与副本的 lastCaughtUpTimeMs
的差值是否大于参数replica.lag.time.max.ms
的值,如果大于,则会把这个副本踢出ISR
集合
保存在 Zookeeper 的/brokers/topics//partitions//state
节点中
follower 副本把 leader 副本 LEO之 前的日志全部同步完成时,则认为 follower 副本已经追赶上了 leader 副本,这个时候会更新这个副本的lastCaughtUpTimeMs
标识,kafk 副本管理器会启动一个副本过期检查的定时任务,这个任务会定期检查当前时间与副本的 lastCaughtUpTimeMs
的差值是否大于参数replica.lag.time.max.ms
的值,如果大于,则会把这个副本踢出ISR
集合
以上是关于kafka 消息分发机制分区和副本机制的主要内容,如果未能解决你的问题,请参考以下文章