聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

Posted 老周聊架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR相关的知识,希望对你有一定的参考价值。

四、ApiKeys.FIND_COORDINATOR

我们前面的文章说过,与消费组相关的两个组件,一个是消费者客户端的 ConsumerCoordinator,一个是 Kafka Broker 服务端的 GroupCoordinator。ConsumerCoordinator 负责与 GroupCoordinator 通信,Broker 启动的时候,都会启动一个 GroupCoordinator 实例,而一个集群中,会有多个 Broker,那么如何确定一个新的 Consumer 加入 Consumer Group 后,到底和哪个 Broker 上的 GroupCoordinator 进行交互呢?

这个问题就就交给服务端的 ApiKeys.FIND_COORDINATOR 命令来处理。

4.1 客户端源码分析


coordinator 即获取到的 group 节点对象,client.isUnavailable(coordinator) 是在与 group 建立连接,每次判断 coordinator 不为空且 client 与 group 连接失败,则将 coordinator 置空,为什么会这样呢?很有可能是请求到 group 的信息之后发现该节点已下线或者不可用,此时服务端很有可能也在进行选举,所以我们需要将 coordinator 清空,待服务端选举完成后再次通信。


如果通信一次发现该 GroupCoordinator 的信息还未获取到则继续重试,直到超时,这里的超时时间即为 poll 时传入的超时时间,这个时间设置贯穿了整个 consume 的运行代码。


我们来看看是如何寻找负载最小节点的,首先就是取随机数,防止每次都从第一个节点连接,如果判断没有在途的 request 则直接返回该节点,否则取在途 request 最小的节点,如果该节点不存在,则依次取连接的节点、需要重试的节点,如果找到不为 null 的节点则返回该节点,否则返回 null。

4.2 FindCoordinatorRequest 请求报文




key_type 有两种枚举,一种是 GROUP,另一种是 TRANSACTION,如果 type 为 GROUP 的话那 key 就是 groupId,反之是 transactionId。

4.3 服务端源码分析

直接看到 FIND_COORDINATOR 命令调用的方法 kafka.server.KafkaApis#handleFindCoordinatorRequest


kafka.coordinator.group.GroupMetadataManager#partitionFor

def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

// 记录 offsets topic 的分区数量,这个字段会调用 getGroupMetadataTopicPartitionCount() 进行初始化,默认 50。
private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount

private def getGroupMetadataTopicPartitionCount: Int = 
	zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions)


val DefaultOffsetsTopicNumPartitions = 50

__consumer_offsets 这个内部 Topic,专门用来存储 Consumer Group 消费的情况,内置 Topic 初始化时由 offsets.topic.num.partitions 参数来决定分区数,默认值是 50。相同 Consumer Group 的 offset 最终会保存在其中一个分区中,而保存在哪个分区就由上面这段代码来决定,可以看到逻辑很简单,就是取 groupId 的 hashCode,然后对总的分区数取模。

举个例子,假设一个 GroupId 计算出来的 hashCode 是 8,之后取模 50 得到 8。那么 partition-8 的 leader 所在的 broker 就是我们要找的那个节点。这个 Consumer Group 后面都会直接在 partition-8 分区保存位点。

kafka.server.KafkaApis#getOrCreateInternalTopic

  • 首先从当前 node 的元数据缓存中拿到对应 topic 的数据,如果没有,则创建。
  • 注意:kafka 创建 topic 是需要时间的,而这里的实现方式是往 zk 中写入数据触发创建 topic 流程,是一种异步方式,往 zk 中写入数据之后会返回一个 error,LEADER_NOT_AVAILABLE,待创建 topic 的流程走完,并同步各个节点 metaData 之后,最后从 metaData 中取到该节点信息 findCoordinatorRequest 才会成功返回。

4.4 FindCoordinatorResponse 响应报文


4.5 小结

总体分析下来,主要流程如下图所示:

  • 寻找最小负载节点信息
  • 向最小负载节点发送 FindCoordinatorRequest
  • 最小负载节点处理该请求
    • 首先找到该 groupId 对应的分区
    • 通过内存中缓存的 metaData 获取该分区的信息,如果不存在则创建 topic。
    • 返回查找到的分区 leader 信息
  • 最小负载节点向 client 响应 FindCoordinatorResponse

以上是关于聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR的主要内容,如果未能解决你的问题,请参考以下文章

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之实例化与启动 GroupCoordinator

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

什么是Kafka消费组协调器

什么是Kafka消费组协调器