Kafka 3.0 源码笔记-Kafka 服务端的启动与请求处理源码分析

Posted 谈谈1974

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 3.0 源码笔记-Kafka 服务端的启动与请求处理源码分析相关的知识,希望对你有一定的参考价值。

前言

Kafka 3.0 源码笔记(1)-Kafka 服务端的网络通信架构 中笔者介绍了 Kafka 3.0 版本的组件构成,其实由此也可以将本文内容分为三个部分,主要时序如下图所示:

  1. Kafka 服务端的启动流程
  2. Kafka 服务端新建连接的处理
  3. Kafka 服务端请求处理流程

源码分析

1. Kafka 服务端的启动流程

  1. Kafka 服务端的启动由 Kafka.scala#main() 方法为入口,可以看到主要步骤如下:

    1. 调用 Kafka.scala#getPropsFromArgs() 方法将启动参数中指定的配置文件加载到内存中
    2. 调用 Kafka.scala#buildServer() 方法创建 kafka 的服务端实例对象
    3. 调用创建的服务端实例对象的接口方法 Server.scala#startup() 方法启动服务端
    def main(args: Array[String]): Unit = 
     try 
       val serverProps = getPropsFromArgs(args)
       val server = buildServer(serverProps)
    
       try 
         if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
           new LoggingSignalHandler().register()
        catch 
         case e: ReflectiveOperationException =>
           warn("Failed to register optional signal handler that logs a message when the process is terminated " +
             s"by a signal. Reason for registration failure is: $e", e)
       
    
       // attach shutdown handler to catch terminating signals as well as normal termination
       Exit.addShutdownHook("kafka-shutdown-hook", 
         try server.shutdown()
         catch 
           case _: Throwable =>
             fatal("Halting Kafka.")
             // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
             Exit.halt(1)
         
       )
    
       try server.startup()
       catch 
         case _: Throwable =>
           // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
           fatal("Exiting Kafka.")
           Exit.exit(1)
       
    
       server.awaitShutdown()
     
     catch 
       case e: Throwable =>
         fatal("Exiting Kafka due to fatal exception", e)
         Exit.exit(1)
     
     Exit.exit(0)
    
    
  2. Kafka.scala#getPropsFromArgs() 方法的核心是调用 Utils#loadProps() 加载指定的配置文件,这部分逻辑比较简单,不做深入

    def getPropsFromArgs(args: Array[String]): Properties = 
     val optionParser = new OptionParser(false)
     val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
       .withRequiredArg()
       .ofType(classOf[String])
     // This is just to make the parameter show up in the help output, we are not actually using this due the
     // fact that this class ignores the first parameter which is interpreted as positional and mandatory
     // but would not be mandatory if --version is specified
     // This is a bit of an ugly crutch till we get a chance to rework the entire command line parsing
     optionParser.accepts("version", "Print version information and exit.")
    
     if (args.length == 0 || args.contains("--help")) 
       CommandLineUtils.printUsageAndDie(optionParser,
         "USAGE: java [options] %s server.properties [--override property=value]*".format(this.getClass.getCanonicalName.split('$').head))
     
    
     if (args.contains("--version")) 
       CommandLineUtils.printVersionAndDie()
     
    
     val props = Utils.loadProps(args(0))
    
     if (args.length > 1) 
       val options = optionParser.parse(args.slice(1, args.length): _*)
    
       if (options.nonOptionArguments().size() > 0) 
         CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
       
    
       props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
     
     props
    
    
  3. Kafka.scala#buildServer()方法创建服务端实例对象是非常关键的一步,需要注意的点如下:

    1. 通过 KafkaConfig.scala#fromProps() 方法将加载到内存中的配置转化构建为 KafkaConfig 对象
    2. 调用 KafkaConfig.scala#requiresZookeeper() 方法确定 Kafa 服务端的启动模式。此处主要是通过 process.roles 配置的存在与否来判断,如果这个配置存在则以移除 zk 依赖的 KRaft模式启动,否则以依赖 zk 的旧模式启动
    3. 本文基于 Kafka 3.0 版本,此版本的 KRaft 支持已经比较稳定,故以 KRaft 模式为例进行分析,此处将创建 KafkaRaftServer 对象
      private def buildServer(props: Properties): Server = 
     val config = KafkaConfig.fromProps(props, false)
     if (config.requiresZookeeper) 
       new KafkaServer(
         config,
         Time.SYSTEM,
         threadNamePrefix = None,
         enableForwarding = false
       )
      else 
       new KafkaRaftServer(
         config,
         Time.SYSTEM,
         threadNamePrefix = None
       )
     
    
    
  4. Scala 的语法与 Java 有不少差异,比如 Scala 中构造函数是直接与类声明相关联的,另外KafkaRaftServer对象的创建动作会触发执行不少关键成员对象的创建,与本文直接相关的如下:

    1. brokerBrokerServer 对象,当节点的配置 process.roles 中指定了 broker 角色时才会创建,处理消息数据类请求,例如消息的生产消费等
    2. controller: ControllerServer 对象,当节点的配置 process.roles 中指定了 broker 角色时才会创建,处理元数据类请求,包括 topic 创建删除等
    class KafkaRaftServer(
     config: KafkaConfig,
     time: Time,
     threadNamePrefix: Option[String]
    ) extends Server with Logging 
    
    KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
    KafkaYammerMetrics.INSTANCE.configure(config.originals)
    
    private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
    
    private val metrics = Server.initializeMetrics(
     config,
     time,
     metaProps.clusterId
    )
    
    private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(
     RaftConfig.parseVoterConnections(config.quorumVoters))
    
    private val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
     metaProps,
     config,
     new MetadataRecordSerde,
     KafkaRaftServer.MetadataPartition,
     KafkaRaftServer.MetadataTopicId,
     time,
     metrics,
     threadNamePrefix,
     controllerQuorumVotersFuture
    )
    
    private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) 
     Some(new BrokerServer(
       config,
       metaProps,
       raftManager,
       time,
       metrics,
       threadNamePrefix,
       offlineDirs,
       controllerQuorumVotersFuture,
       Server.SUPPORTED_FEATURES
     ))
     else 
     None
    
    
    private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) 
     Some(new ControllerServer(
       metaProps,
       config,
       raftManager,
       time,
       metrics,
       threadNamePrefix,
       controllerQuorumVotersFuture
     ))
     else 
     None
    
    
    ...
    
    
    
  5. 经过以上步骤,Kafka 服务端的 Server 对象创建完毕,最终创建了一个 KafkaRaftServer 对象,则在本节步骤1第三步KafkaRaftServer.scala#startup() 方法启动服务端,可以看到和本文相关的重点如下:

    1. controller.foreach(_.startup()) 启动节点上可能存在的 ControllerServer,调用其 ControllerServer.scala#startup() 方法
    2. broker.foreach(_.startup()) 启动节点上可能存在的 BrokerServer,调用其BrokerServer.scala#startup() 方法

    本文将以 BrokerServer 的启动为例进行分析,其实从网络通信结构的角度来看,BrokerServerControllerServer 几乎是完全一致的

    override def startup(): Unit = 
     Mx4jLoader.maybeLoad()
     raftManager.startup()
     controller.foreach(_.startup())
     broker.foreach(_.startup())
     AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
     info(KafkaBroker.STARTED_MESSAGE)
    
    
  6. BrokerServer.scala#startup() 方法比较长,其中涉及的关键对象如下。不过去芜存菁,和网络通信相关的重点其实只有两个,分别是SocketServer 底层网络服务器的创建及配置启动KafkaRequestHandlerPool 上层请求处理器池的创建启动

    1. kafkaSchedulerKafkaScheduler 对象,定时任务的线程池
    2. metadataCache: KRaftMetadataCache 对象,集群元数据管理组件
    3. clientToControllerChannelManagerBrokerToControllerChannelManager 对象,broker 到 controller 的连接管理器
    4. forwardingManagerForwardingManagerImpl 对象,持有 clientToControllerChannelManager 对象,负责转发应该由 controller 处理的请求
    5. socketServer: SocketServer 对象,面向底层网络的服务器对象
    6. _replicaManager: ReplicaManager 对象,副本管理器,负责消息的存储读取
    7. groupCoordinator: GroupCoordinator 对象,普通消费者组的协调器,负责辅助完成消费者组内各个消费者消费分区的协调分配
    8. dataPlaneRequestProcessor: KafkaApis 对象,上层的请求处理器,持有底层网络服务器的请求队列socketServer.dataPlaneRequestChannel,负责从队列中取出请求进行处理
    9. dataPlaneRequestHandlerPool : KafkaRequestHandlerPool 对象,上层的请求处理器线程池
    def startup(): Unit = 
     if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
     try 
       info("Starting broker")
    
       /* start scheduler */
       kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
       kafkaScheduler.startup()
    
       /* register broker metrics */
       _brokerTopicStats = new BrokerTopicStats
    
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
    
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
    
       metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
    
       // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
       // until we catch up on the metadata log and have up-to-date topic and broker configs.
       logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time,
         brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true)
    
       // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
       // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
       credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
    
       val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala
       val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes)
    
       clientToControllerChannelManager = BrokerToControllerChannelManager(
         controllerNodeProvider,
         time,
         metrics,
         config,
         channelName = "forwarding",
         threadNamePrefix,
         retryTimeoutMs = 60000
       )
       clientToControllerChannelManager.start()
       forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager)
    
       val apiVersionManager = ApiVersionManager(
         ListenerType.BROKER,
         config,
         Some(forwardingManager),
         brokerFeatures,
         featureCache
       )
    
       // Create and start the socket server acceptor threads so that the bound port is known.
       // Delay starting processors until the end of the initialization sequence to ensure
       // that credentials have been loaded before processing authentications.
       socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
       socketServer.startup(startProcessingRequests = false)
    
       clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
    
       val alterIsrChannelManager = BrokerToControllerChannelManager(
         controllerNodeProvider,
         time,
         metrics,
         config,
         channelName = "alterIsr",
         threadNamePrefix,
         retryTimeoutMs = Long.MaxValue
       )
       alterIsrManager = new DefaultAlterIsrManager(
         controllerChannelManager = alterIsrChannelManager,
         scheduler = kafkaScheduler,
         time = time,
         brokerId = config.nodeId,
         brokerEpochSupplier = () => lifecycleManager.brokerEpoch
       )
       alterIsrManager.start()
    
       this._replicaManager = new ReplicaManager(config, metrics, time, None,
         kafkaScheduler, logManager, isShuttingDown, quotaManagers,
         brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager,
         threadNamePrefix)
    
       /* start token manager */
       if (config.tokenAuthEnabled) 
         throw new UnsupportedOperationException("Delegation tokens are not supported")
       
       tokenManager = new DelegationTokenManager(config, tokenCache, time , null)
       tokenManager.startup() // does nothing, we just need a token manager in order to compile right now...
    
       // Create group coordinator, but don't start it until we've started replica manager.
       // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
       groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)
    
       val producerIdManagerSupplier = () => ProducerIdManager.rpc(
         config.brokerId,
         brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
         clientToControllerChannelManager,
         config.requestTimeoutMs
       )
    
       // Create transaction coordinator, but don't start it until we've started replica manager.
       // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
       transactionCoordinator = TransactionCoordinator(config, replicaManager,
         new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"),
         producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)
    
       autoTopicCreationManager = new DefaultAutoTopicCreationManager(
         config, Some(clientToControllerChannelManager), None, None,
         groupCoordinator, transactionCoordinator)
    
       /* Add all reconfigurables for config change notification before starting the metadata listener */
       config.dynamicConfig.addReconfigurables(this)
    
       dynamicConfigHandlers = Map[String, ConfigHandler](
         ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, None),
         ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
    
       if (!config.processRoles.contains(ControllerRole)) 
         // If no controller is defined, we rely on the broker to generate snapshots.
         metadataSnapshotter = Some(new BrokerMetadataSnapshotter(
           config.nodeId,
           time,
           threadNamePrefix,
           new BrokerSnapshotWriterBuilder(raftManager.client)
         ))
       
    
       metadataListener = new BrokerMetadataListener(config.nodeId,
                                                     time,
                                                     threadNamePrefix,
                                                     config.metadataSnapshotMaxNewRecordBytes,
                                                     metadataSnapshotter)
    
       val networkListeners = new ListenerCollection()
       config.advertisedListeners.foreach  ep =>
         networkListeners.add(new Listener().
           setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host).
           setName(ep.listenerName.value()).
           setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port).
           setSecurityProtocol(ep.securityProtocol.id))
       
       lifecycleManager.start(() => metadataListener.highestMetadataOffset(),
         BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
           "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
         metaProps.clusterId, networkListeners, supportedFeatures)
    
       // Register a listener with the Raft layer to receive metadata event notifications
       raftManager.register(metadataListener)
    
       val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
       var interBrokerListener: Endpoint = null
       networkListeners.iterator().forEachRemaining(listener => 
         val endPoint = new Endpoint(listener.name(),
           SecurityProtocol.forId(listener.securityProtocol()),
           listener.host(), listener.port())
         endpoints.add(endPoint)
         if (listener.name().equals(config.interBrokerListenerName.value())) 
           interBrokerListener = endPoint
         
       )
       if (interBrokerListener == null) 
         throw new RuntimeException("Unable to find inter-broker listener " +
           config.interBrokerListenerName.value() + ". Found listener(s): " +
           endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", "))
       
       val authorizerInfo = ServerInfo(new ClusterResource(clusterId),
         config.nodeId, endpoints, interBrokerListener)
    
       /* Get the authorizer and initialize it if one is specified.*/
       authorizer = config.authorizer
       authorizer.foreach(_.configure(config.originals))
       val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match 
         case Some(authZ) =>
           authZ.start(authorizerInfo).asScala.map  case (ep, cs) =>
             ep -> cs.toCompletableFuture
           
         case None =>
           authorizerInfo.endpoints.asScala.map  ep =>
             ep -> CompletableFuture.completedFuture[Void](null)
           .toMap
       
    
       val fetchManager = new FetchManager(Time.SYSTEM,
         new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
           KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
    
       // Create the request processor objects.
       val raftSupport = RaftSupport(forwardingManager, metadataCache)
       dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport,
         replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
         config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, quotaManagers,
         fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager)
    
       dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
         socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
         config.numIoThreads, s"$SocketServer.DataPlaneMetricPrefixRequestHandlerAvgIdlePercent",
         SocketServer.DataPlaneThreadPrefix)
    
       if (socketServer.controlPlaneRequestChannelOpt.isDefined) 以上是关于Kafka 3.0 源码笔记-Kafka 服务端的启动与请求处理源码分析的主要内容,如果未能解决你的问题,请参考以下文章

    Kafka 源码解析:Server 端的运行过程

    就这一次!拼多多内部架构师培训Kafka源码笔记(现已绝版)

    聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

    聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

    聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR

    图解Kafka的服务端的网络通信模型