(6)消息的消费原理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(6)消息的消费原理相关的知识,希望对你有一定的参考价值。

参考技术A 在实际生产过程中,每个 topic 都会有多个 partitions,多个 partitions 的好处在于,一方面能够对 broker 上的数据进行分片有效减少了消息的容量从而提升 io 性能。

另外一方面,为了提高消费端的消费能力,一般会通过多个consumer 去消费同一个 topic ,也就是消费端的负载均衡机制,也就是我们接下来要了解的,在多个 partition 以及多个 consumer 的情况下,消费者是如何消费消息的
kafka 存在 consumer group的 概 念 , 也 就是 group.id 一 样 的 consumer ,这些consumer 属于一个 consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的 consumer 来消费,那么同一个consumer group 里面的 consumer 是怎么去分配该消费
哪个分区里的数据的呢?

如下图所示,3 个分区,3 个消费者,那么哪个消费者消分哪个分区

对于上面这个图来说,这 3 个消费者会分别消费 test 这个topic 的 3 个分区,也就是每个 consumer 消费一个partition

在 kafka 中,存在三种分区分配策略,一种是 Range(默认)、 另 一 种 还 是 RoundRobin ( 轮 询 )还有一种就是StickyAssignor。 通 过partition.assignment.strategy 这个参数来设置

范围分区可以参考这篇博客: https://blog.csdn.net/u013256816/article/details/81123600

可以参考博客: https://blog.csdn.net/u013256816/article/details/81123625

当出现以下几种情况时,kafka 会进行一次分区分配操作,也就是 kafka consumer 的 rebalance

kafka consuemr 的 rebalance 机制规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic的每个分区。而具体如何执行分区策略,就是前面提到过的三种内置的分区策略。而 kafka 对于分配策略这块,提供了可插拔的实现方式, 也就是说,除了这三种之外,我们还可以创建自己的分配机制

Kafka 提供了一个角色:coordinator 来执行对于 consumer group 的管理,Kafka 提供了一个角色:coordinator 来执行对于 consumer group 的管理,当 consumer group 的第一个 consumer 启动的时候,它会去和 kafka server 确定谁是它们组的 coordinator。之后该 group 内的所有成员都会和该coordinator 进行协调通信

consumer group 如何确定自己的coordinator是谁呢,消费者向kafka集群中的任意一个broker发送一个GroupCoordinatorRequest 请求,服务端会返回一个负载最小的broker 节点的id,并将该broker 设置为coordinator

在 rebalance 之前,需要保证 coordinator 是已经确定好了的,整个 rebalance 的过程分为两个步骤,Joing 和 Syncjoin: 表示加入到 consumer group 中,在这一步中,所有的成员都会向 coordinator 发送 joinGroup 的请求。一旦所有成员都发送了 joinGroup 请求,那么 coordinator 会
选择一个 consumer 担任 leader 角色,并把组成员信息和订阅信息发送消费者

protocol_metadata: 序列化后的消费者的订阅信息
leader_id: 消费组中的消费者,coordinator 会选择一个座位 leader,对应的就是 member_id
member_metadata 对应消费者的订阅信息
members:consumer group 中全部的消费者的订阅信息
generation_id:年代信息,类似于之前讲解 zookeeper 的时候的 epoch 是一样的,对于每一轮 rebalance ,generation_id 都会递增。主要用来保护 consumer group。隔离无效的 offset 提交。也就是上一轮的 consumer 成员无法提交 offset 到新的 consumer group 中。

完成分区分配之后,就进入了 Synchronizing Group State阶段,主要逻辑是向 GroupCoordinator 发 送SyncGroupRequest 请求,并且处理 SyncGroupResponse响应,简单来说,就是 leader 将消费者对应的 partition 分配方案同步给 consumer group 中的所有 consumer

每个消费者都会向 coordinator 发送 syncgroup 请求,不过只有 leader 节点会发送分配方案,其他消费者只是打打酱油而已。当 leader 把方案发给 coordinator 以后,coordinator 会把结果设置到 SyncGroupResponse 中。这样所有成员都知道自己应该消费哪个分区

consumer group 的分区分配方案是在客户端执行的!Kafka 将这个权利下放给客户端主要是因为这样做可以有更好的灵活性

每个 topic可以划分多个分区(每个 Topic 至少有一个分区),同一topic 下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 kafka 只保证
在同一个分区内的消息是有序的;对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个 offset。那么 offset 保存在哪里?

在 kafka 中,提供了一个_ consumer_offsets * 的一个topic , 把 offset 信 息 写 入 到 这 个 topic 中 。__consumer_offsets——按保存了每个 consumer group,某一时刻提交的 offset 信息。
__consumer_offsets 默认有50 个分区。
根 据 前 面 我 们 演 示 的 案 例 , 我 们 设 置 了 一 个KafkaConsumerDemo 的 groupid。首先我们需要找到这个 consumer_group 保存在哪个分区中
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");

Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由 于 默 认 情 况 下
groupMetadataTopicPartitionCoun有 50 个分区, KafkaConsumerDemo的hashcode取50的模计算得到的结果为:35, 意味着当前的 consumer_group 的位移信息保存在__consumer_offsets 的第 35 个分区

sh kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 35 --broker-list
192.168.11.153:9092,192.168.11.154:9092,192.168.11.157:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"

从输出结果中,我们就可以看到 test 这个 topic 的 offset的位移

以上是关于(6)消息的消费原理的主要内容,如果未能解决你的问题,请参考以下文章

P8架构师参透Kafka:设计原理消息存储消息消费原理等等

Kafka消息的偏移量和顺序消费原理

消息队列-kafka消费者原理

ActiveMQ---ActiveMQ原理分析之消息消费

RocketMQ消息存储原理

kafka——消费者原理解析