Kafka放弃Zookeeper后如何持存储主题与消费组呢?

Posted 中间件兴趣圈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka放弃Zookeeper后如何持存储主题与消费组呢?相关的知识,希望对你有一定的参考价值。


由于笔者公司目前使用的kafka版本是2.2.1,故当下关于kafka的内核研究目前主要是基于该版本,当然该专栏还会继续关注Kafka3.0。

我在使用kafka时发现客户端可以不依赖Zookeeper的情况下完成消息发送、消息消费,众所周知早期的Kafka,所有的元信息(topic、消费组、集群)等信息都存储在Zookeeper中,原先的消息发送客户端、消息消费客户端都需要依赖Zookeeper。

温馨提示:Kafka逐步开启了去zookeeper化,到kafka2.8之前实现了消息发送者、消息消费者的去zookeeper化,从2.8版之后broker也支持去zookeeper。

那kafka2.2.1版本中,主题的路由信息、消费组信息分别是存储在什么地方呢?消息发送端、消息消费端是如何感知的呢?

温馨提示:如果大家对Kafka有基本的了解,不防停留片刻,稍作思考。

1、主题元数据存储在Zookeeper中

进入到Kafka Broker连接的Zookeeper集群,我们不难发现在 /namespace/brokers/topics节点下存在该集群中所有的主题信息,展开某一个具体的主题,如下图所示:

关于主题的元信息,其实主要包括如下信息:

  • 分区数量 每一个具体topic下会有一个partitions节点,该节点下的每一个子节点代表一个分区。
  • 分区状态信息 每一个分区的的状态由叶子节点 /namespace/brokers/topics/topicName/parttions/partNO/state表示,存储的内容如下:
  • controller_epoch 控制器当前的选举版本。
  • leader 该分区的Leader所在的Broker节点ID。
  • version 当前的存储格式版本,默认为1。
  • leader_epoch 分区Leader的选举版本。
  • isr 分区的ISR集合。
  • 主题的路由信息是存储在Zookeeper中,那为什么客户端只需要Broker的地址,就可以获取到主题的路由信息呢?

    1.1 主题路由寻址

    查找路由信息在Kafka2.1版本中是发送ApiKeys.METADATA请求,该请求的响应逻辑定义在Broker中,那客户端是如何对Broker进行路由,Broker中的路由信息又是从何而来呢?

    消息发送者首次发送METADATA定位Broker机制:首次发送请求会从KafkaProducer的bootstrap.servers中设置的broker列表中选择当前最空闲的Broker,后续能感知所有的Broker。

    消息消费者发送METADATA定位Broker机制:发送到当前消费组的组协调所在的Broker。

    根据查阅KafkaApis的handleTopicMetadataRequest方法,进行一些ACL校验后进入其核心方法:

    关键点:

  • MetadataCache中获取topic到路由信息。
  • 如果MetadataCache中不存在指定topic的路由信息,如果Broker允许自动创建主题(auto.create.topics.enable),默认为true,则自动创建该主题的信息,并将主题信息写入到zookeeper,具体操作:
  • 在/brokers/topics节点下创建子节点,子节点名称为topic的名称。
  • 根据当前kafka分区的机架信息,分区数、副本数,broker节点数,进行分配,主要尽量将主分区不放在同一个机架、存储在主题的节点信息中,例如"version":1,"partitions":"4":[2,0,1],"5":[0,1,2],"1":[2,1,0],"0":[1,0,2],"2":[0,2,1],"3":[1,2,0],其中key为分区名称,值为副本所在的brokerId,其中排在第一位是倾向性Leader,主题中存储的值是静态数据,具体还会触发选举,选举算法会参考这个分配。
  • 控制器还会注册调用registerPartitionModificationsHandlers方法,监听主题信息的变化,从而触发后续流程,启动分区的真正创建(各个分区的Leader选举等)。
  • ⚠️温馨提示:Kafka开启自动创建主题,分区数量取自kafka broker中的num.partitions参数,默认为1,副本因子则取决于default.replication.factor参数,默认为1。

    1.2 路由信息同步机制

    MetadataCache,元信息缓存,那这里的数据又是从何而来呢?MetadataCache中路由信息的更新调用链如下图所示:

    Kafka的KafkaController(后续统称控制器)首先会听/brokers/topics/topicName节点内容的变化,一旦有新主题创建或主题信息变更,topic变更事件就会触发,此时TopicChange的process方法会调用,最终调用updatePartitionReplicaAssignment,也就是一旦主题的信息发生变更,控制器会向所有Broker节点发送ApiKeys.UPDATE_METADATA,各个Broker在到该请求后,会更新各个Broker中的内存缓存,供消息发送者查找topic路由信息。

    即Kafka2.2版本中,topic的元信息存储在Zookeeper中,同时Kafka Controller会监听zookeeper中相关节点,从而感知信息变更,从而将路由信息通过RPC发送到集群内所有的Broker中,故每一个Broker的内存中都存储一份相同的路由信息。

    ⚠️Kafka2.8版本开始尝试去Zookeeper化。

    思考题:⚠️为什么各个Broker不都监听zookeeper,从而感知topic变化,更新本地内存呢?欢迎各位留言讨论或私信dingwpmz,共同交流。

    2、消费组存储在位点主题中

    在较低版本中,启动Kafka消费组需要指定zookeeper集群的地址,因为在低版本中消费组的元信息存储在zookeeper中,具体路径为/consumers,但后续版本中消费端的启动已经不需指定zookeeper,而是指定broker的地址列表即可,那这个时候,消费组的信息是存储在哪呢?

    在前面介绍Kafka故障解决相关的文章中我们常常看到消费组组协调器,内部持有一个消费组元数据管理器GroupMetadataManager,相关的代码截图如下所示:

    在GroupMetadataManager对象中持有一个Map结构的缓存,其键为消费组的名称,值为GroupMetadata对象,内部记录消费组的状态,消费组的成员列表,位点信息。

    内存的特点:访问高效,但随着Broker进程的退出而丢失,消费组存储在内存中显然不行,但又不在zookeeper中,那消费组的定义信息存储在什么地方呢?

    2.1消费组元信息存储

    消费组的定义信息存储在系统主题__consumer_offsets中,什么,这个主题不是用来存储消费位点的吗?

    原来__consumer_offsets不仅存储消费组的位点信息,还存储消费组的元信息,具体代码入口:GroupMetadataManager#storeGroup,部分代码截图如下所示:

    即消费组元信息当成一条消息写入到__consumer_offsets,一条消费组元信息存储的value值,由GroupMetadataManager的groupMetadataValue方法定义,具体代码如下:

    随着Kafka的不断演化,存储格式进行了多次修改,对应的版本如下:

  • V0:Kafka 0.10级以下版本
  • V1:大于 0.10,低于等于2.1版本。
  • V2:2.2版本及以后
  • 消费组元信息存储的格式为Json,具体存储的内容:

  • protocol_type 协议版本,取自AbstractCoordinator的抽象方法protocolType(),消费组的固定为:consumer。

  • generation 消费组元信息的版本号,每发生一次消费组重平衡,该值会加一。

  • protocol 协议内容,存储消费组的队列负载算法,在构建消费者时可通过partition.assignment.strategy参数传递,可以传递多个,消费组具体的负载算法会选择每一个消费者都支持的协议进行队列负载,默认的负载算法为RangeAssignor。

  • leader 当前消费组的Leader,通常为第一个加入该消费组的消费者。

  • current_state_timestamp 最新状态变更的时间戳,该值是从V2版本开始引入。

  • members 消费组的成员信息,每一个成员信息存储的信息如下:

  • member_id 成员id,客户端id(clientId) + uuid。

  • client_id 客户端ID。

  • client_host 客户端ip地址。

  • rebalance_timeout 重平衡时间,默认为300000,5分钟。

  • session_timeout 会话超时时间,默认为10s。

  • subscription 元信息,取自AbstractCoordinator的抽象方法metadata(),消费组的实现类为ConsumerCoordinator,主要是遍历负载算法,每一个负载算法根据订阅信息计算元信息。

  • assignment

    各个消费者的队列负载情况。

  • ⚠️温馨提示:GroupMetadataManager的storeGroup方法的调用时间是在消费组进行重平衡时,具体是重平衡第二阶段(SYNC_GROUP)与完成重平衡。

    2.2加载消息组元信息

    消费组元信息是存储在 __consumer_offsets主题中,在什么时候会从该主题中加载到内存中呢?

    在__consumer_offsets的分区发生Leader选举时会触发将对应分区中的数据加载到内存,具体的处理入口在KafkaApis的handleLeaderAndIsrRequest方法,简易调用链如下图所示:

    3、总结

    本文主要介绍了Kafka 主题与消费组的持久化机制,在Kafka2.8版本开始,官方逐步去除对Zookeeper的依赖,那kafka3.x之后,又会是如何存储消费组、主题的信息呢?大家可以尝试思考后,笔者也将在该专栏的后续文章中加以介绍,敬请期待。

    ()

    PDF15JavaBAT


    10IT



     RocketMQ 

    []

    实战Kafka ACL机制

    1.概述

      在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。拿到Kafka Broker地址后,连接到Kafka集群,就可以操作集群上的所有主题了。由于没有权限控制,集群核心的业务主题时存在风险的。

    2.内容

    2.2 身份认证

      Kafka的认证范围包含如下:

    • Client与Broker之间
    • Broker与Broker之间
    • Broker与Zookeeper之间

      当前Kafka系统支持多种认证机制,如SSL、SASL(Kerberos、PLAIN、SCRAM)。

    2.3  SSL认证流程

      在Kafka系统中,SSL协议作为认证机制默认是禁止的,如果需要使用,可以手动启动SSL机制。安装和配置SSL协议的步骤,如下所示:

    1. 在每个Broker中Create一个Tmp密钥库
    2. 创建CA
    3. 给证书签名
    4. 配置Server和Client

      执行脚本如下所示:

    #! /bin/bash
    
    # 1.Create rsa
    keytool -keystore server.keystore.jks -alias dn1 -validity 365 -genkey -keyalg RSA
    # 2.Create CA
    openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
    # 3.Import client
    keytool -keystore client.truststore.jks -alias CAROOT -import -file ca-cert
    # 4.Import server
    keytool -keystore server.truststore.jks -alias CAROOT -import -file ca-cert
    # 5.Export
    keytool -keystore server.keystore.jks -alias dn1 -certreq -file cert-file
    # 6.Signed
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456
    # 7.Import ca-cert
    keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
    # 8.Import cert-signed
    keytool -keystore server.keystore.jks -alias dn1 -import -file cert-signed

    2.4 SASL认证流程

      在Kafka系统中,SASL机制包含三种,它们分别是Kerberos、PLAIN、SCRAM。以PLAIN认证为示例,下面给大家介绍PLAIN认证流程。

    2.4.1  配置Server

      首先,在$KAFKA_HOME/config目录中新建一个文件,名为kafka_server_jaas.conf,配置内容如下:

    KafkaServer {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="smartloli"
       password="smartloli-secret"
       user_admin="smartloli-secret";
    };
    
    Client {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="smartloli"
       password="smartloli-secret";
    };

      然后在Kafka启动脚本(kafka-server-start.sh)中添加配置文件路径,设置内容如下:

    [[email protected] bin]$ vi kafka-server-start.sh
    
    # Add jaas file
    export KAFKA_OPTS="-Djava.security.auth.login.config=/data/soft/new/kafka/config/kafka_server_jaas.conf"
    

      接下来,配置server.properties文件,内容如下:

    # Set ip & port
    listeners=SASL_PLAINTEXT://dn1:9092
    advertised.listeners=SASL_PLAINTEXT://dn1:9092
    # Set protocol
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.enabled.mechanisms=PLAIN
    sasl.mechanism.inter.broker.protocol=PLAIN
    
    # Add acl
    allow.everyone.if.no.acl.found=true
    auto.create.topics.enable=false
    delete.topic.enable=true
    advertised.host.name=dn1
    super.users=User:admin
    
    # Add class
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

    2.4.2 配置Client

      当Kafka Server端配置启用了SASL/PLAIN,那么Client连接的时候需要配置认证信息,Client配置一个kafka_client_jaas.conf文件,内容如下:

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="admin"
      password="admin-secret";
    };

      然后,在producer.properties和consumer.properties文件中设置认证协议,内容如下:

    security.protocol=SASL_PLAINTEXT 
    sasl.mechanism=PLAIN

      最后,在kafka-console-producer.sh脚本和kafka-console-producer.sh脚本中添加JAAS文件的路径,内容如下:

    # For example: kafka-console-producer.sh
    [email protected] bin]$ vi kafka-console-producer.sh
    
    # Add jaas file
    export KAFKA_OPTS="-Djava.security.auth.login.config=/data/soft/new/kafka/config/kafka_client_jaas.conf"
    

    2.5 ACL操作

      在配置好SASL后,启动Zookeeper集群和Kafka集群之后,就可以使用kafka-acls.sh脚本来操作ACL机制。

      (1)查看:在kafka-acls.sh脚本中传入list参数来查看ACL授权新

    [[email protected] bin]$ kafka-acls.sh --list --authorizer-properties zookeeper.connect=dn1:2181

      (2)创建:创建待授权主题之前,在kafka-acls.sh脚本中指定JAAS文件路径,然后在执行创建操作

    [[email protected] bin]$ kafka-topics.sh --create --zookeeper dn1:2181 --replication-factor 1 --partitions 1 --topic kafka_acl_topic

      (3)生产者授权:对生产者执行授权操作

    [[email protected] ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=dn1:2181 --add --allow-principalUser:producer --operation Write --topic kafka_acl_topic

      (4)消费者授权:对生产者执行授权后,通过消费者来进行验证

    [[email protected] ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=dn1:2181 --add --allow-principalUser:consumer --operation Read --topic kafka_acl_topic

      (5)删除:通过remove参数来回收相关权限

    [[email protected] bin]$ kafka-acls.sh --authorizer-properties zookeeper.connect=dn1:2181 --remove --allow-principal User:producer --operation Write --topic kafka_acl_topic3

    3.总结

      在处理一些核心的业务数据时,Kafka的ACL机制还是非常重要的,对核心业务主题进行权限管控,能够避免不必要的风险。

    4.结束语

      这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

    以上是关于Kafka放弃Zookeeper后如何持存储主题与消费组呢?的主要内容,如果未能解决你的问题,请参考以下文章

    kakfa从入门到放弃: 相关概念,幂等性&事务

    kakfa从入门到放弃: 相关概念,幂等性&事务

    kakfa从入门到放弃: 相关概念,幂等性&事务

    Apache Kafka 删除 Apache ZooKeeper 的依赖

    使用 - -zookeeper 标志列出所有 Kafka 0.10 主题,而无需访问 Zookeeper

    “kafka.zookeeper.ZooKeeperClientTimeoutException:等待连接超时”仅在列出主题期间