Kafka 3.0 源码笔记-Kafka 服务端的启动与请求处理源码分析
Posted 谈谈1974
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 3.0 源码笔记-Kafka 服务端的启动与请求处理源码分析相关的知识,希望对你有一定的参考价值。
前言
在 Kafka 3.0 源码笔记(1)-Kafka 服务端的网络通信架构 中笔者介绍了 Kafka 3.0 版本的组件构成,其实由此也可以将本文内容分为三个部分,主要时序如下图所示:
- Kafka 服务端的启动流程
- Kafka 服务端新建连接的处理
- Kafka 服务端请求处理流程
源码分析
1. Kafka 服务端的启动流程
-
Kafka 服务端的启动由
Kafka.scala#main()
方法为入口,可以看到主要步骤如下:- 调用
Kafka.scala#getPropsFromArgs()
方法将启动参数中指定的配置文件加载到内存中 - 调用
Kafka.scala#buildServer()
方法创建 kafka 的服务端实例对象 - 调用创建的服务端实例对象的接口方法
Server.scala#startup()
方法启动服务端
def main(args: Array[String]): Unit = try val serverProps = getPropsFromArgs(args) val server = buildServer(serverProps) try if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk) new LoggingSignalHandler().register() catch case e: ReflectiveOperationException => warn("Failed to register optional signal handler that logs a message when the process is terminated " + s"by a signal. Reason for registration failure is: $e", e) // attach shutdown handler to catch terminating signals as well as normal termination Exit.addShutdownHook("kafka-shutdown-hook", try server.shutdown() catch case _: Throwable => fatal("Halting Kafka.") // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit. Exit.halt(1) ) try server.startup() catch case _: Throwable => // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code fatal("Exiting Kafka.") Exit.exit(1) server.awaitShutdown() catch case e: Throwable => fatal("Exiting Kafka due to fatal exception", e) Exit.exit(1) Exit.exit(0)
- 调用
-
Kafka.scala#getPropsFromArgs()
方法的核心是调用Utils#loadProps()
加载指定的配置文件,这部分逻辑比较简单,不做深入def getPropsFromArgs(args: Array[String]): Properties = val optionParser = new OptionParser(false) val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file") .withRequiredArg() .ofType(classOf[String]) // This is just to make the parameter show up in the help output, we are not actually using this due the // fact that this class ignores the first parameter which is interpreted as positional and mandatory // but would not be mandatory if --version is specified // This is a bit of an ugly crutch till we get a chance to rework the entire command line parsing optionParser.accepts("version", "Print version information and exit.") if (args.length == 0 || args.contains("--help")) CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(this.getClass.getCanonicalName.split('$').head)) if (args.contains("--version")) CommandLineUtils.printVersionAndDie() val props = Utils.loadProps(args(0)) if (args.length > 1) val options = optionParser.parse(args.slice(1, args.length): _*) if (options.nonOptionArguments().size() > 0) CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(",")) props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala) props
-
Kafka.scala#buildServer()
方法创建服务端实例对象是非常关键的一步,需要注意的点如下:- 通过
KafkaConfig.scala#fromProps()
方法将加载到内存中的配置转化构建为KafkaConfig
对象 - 调用
KafkaConfig.scala#requiresZookeeper()
方法确定 Kafa 服务端的启动模式。此处主要是通过 process.roles 配置的存在与否来判断,如果这个配置存在则以移除 zk 依赖的KRaft模式
启动,否则以依赖 zk 的旧模式启动 - 本文基于 Kafka 3.0 版本,此版本的 KRaft 支持已经比较稳定,故以 KRaft 模式为例进行分析,此处将创建
KafkaRaftServer
对象
private def buildServer(props: Properties): Server = val config = KafkaConfig.fromProps(props, false) if (config.requiresZookeeper) new KafkaServer( config, Time.SYSTEM, threadNamePrefix = None, enableForwarding = false ) else new KafkaRaftServer( config, Time.SYSTEM, threadNamePrefix = None )
- 通过
-
Scala
的语法与Java
有不少差异,比如 Scala 中构造函数是直接与类声明相关联的,另外KafkaRaftServer
对象的创建动作会触发执行不少关键成员对象的创建,与本文直接相关的如下:broker
:BrokerServer
对象,当节点的配置 process.roles 中指定了broker
角色时才会创建,处理消息数据类请求,例如消息的生产消费等controller
:ControllerServer
对象,当节点的配置 process.roles 中指定了broker
角色时才会创建,处理元数据类请求,包括 topic 创建删除等
class KafkaRaftServer( config: KafkaConfig, time: Time, threadNamePrefix: Option[String] ) extends Server with Logging KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals)) KafkaYammerMetrics.INSTANCE.configure(config.originals) private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config) private val metrics = Server.initializeMetrics( config, time, metaProps.clusterId ) private val controllerQuorumVotersFuture = CompletableFuture.completedFuture( RaftConfig.parseVoterConnections(config.quorumVoters)) private val raftManager = new KafkaRaftManager[ApiMessageAndVersion]( metaProps, config, new MetadataRecordSerde, KafkaRaftServer.MetadataPartition, KafkaRaftServer.MetadataTopicId, time, metrics, threadNamePrefix, controllerQuorumVotersFuture ) private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) Some(new BrokerServer( config, metaProps, raftManager, time, metrics, threadNamePrefix, offlineDirs, controllerQuorumVotersFuture, Server.SUPPORTED_FEATURES )) else None private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) Some(new ControllerServer( metaProps, config, raftManager, time, metrics, threadNamePrefix, controllerQuorumVotersFuture )) else None ...
-
经过以上步骤,Kafka 服务端的
Server
对象创建完毕,最终创建了一个KafkaRaftServer
对象,则在本节步骤1第三步将KafkaRaftServer.scala#startup()
方法启动服务端,可以看到和本文相关的重点如下:controller.foreach(_.startup())
启动节点上可能存在的ControllerServer
,调用其ControllerServer.scala#startup()
方法broker.foreach(_.startup())
启动节点上可能存在的BrokerServer
,调用其BrokerServer.scala#startup()
方法
本文将以
BrokerServer
的启动为例进行分析,其实从网络通信结构的角度来看,BrokerServer
和ControllerServer
几乎是完全一致的override def startup(): Unit = Mx4jLoader.maybeLoad() raftManager.startup() controller.foreach(_.startup()) broker.foreach(_.startup()) AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds()) info(KafkaBroker.STARTED_MESSAGE)
-
BrokerServer.scala#startup()
方法比较长,其中涉及的关键对象如下。不过去芜存菁,和网络通信相关的重点其实只有两个,分别是SocketServer 底层网络服务器的创建及配置启动
和KafkaRequestHandlerPool 上层请求处理器池的创建启动
kafkaScheduler
:KafkaScheduler
对象,定时任务的线程池metadataCache
:KRaftMetadataCache
对象,集群元数据管理组件clientToControllerChannelManager
:BrokerToControllerChannelManager
对象,broker 到 controller 的连接管理器forwardingManager
:ForwardingManagerImpl
对象,持有clientToControllerChannelManager
对象,负责转发应该由 controller 处理的请求socketServer
:SocketServer
对象,面向底层网络的服务器对象_replicaManager
:ReplicaManager
对象,副本管理器,负责消息的存储读取groupCoordinator
:GroupCoordinator
对象,普通消费者组的协调器,负责辅助完成消费者组内各个消费者消费分区的协调分配dataPlaneRequestProcessor
:KafkaApis
对象,上层的请求处理器,持有底层网络服务器的请求队列socketServer.dataPlaneRequestChannel
,负责从队列中取出请求进行处理dataPlaneRequestHandlerPool
:KafkaRequestHandlerPool
对象,上层的请求处理器线程池
def startup(): Unit = if (!maybeChangeStatus(SHUTDOWN, STARTING)) return try info("Starting broker") /* start scheduler */ kafkaScheduler = new KafkaScheduler(config.backgroundThreads) kafkaScheduler.startup() /* register broker metrics */ _brokerTopicStats = new BrokerTopicStats quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse("")) logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId) // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery // until we catch up on the metadata log and have up-to-date topic and broker configs. logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true) // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) val controllerNodes = RaftConfig.voterConnectionsToNodes(controllerQuorumVotersFuture.get()).asScala val controllerNodeProvider = RaftControllerNodeProvider(raftManager, config, controllerNodes) clientToControllerChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, time, metrics, config, channelName = "forwarding", threadNamePrefix, retryTimeoutMs = 60000 ) clientToControllerChannelManager.start() forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager) val apiVersionManager = ApiVersionManager( ListenerType.BROKER, config, Some(forwardingManager), brokerFeatures, featureCache ) // Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager) socketServer.startup(startProcessingRequests = false) clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas) val alterIsrChannelManager = BrokerToControllerChannelManager( controllerNodeProvider, time, metrics, config, channelName = "alterIsr", threadNamePrefix, retryTimeoutMs = Long.MaxValue ) alterIsrManager = new DefaultAlterIsrManager( controllerChannelManager = alterIsrChannelManager, scheduler = kafkaScheduler, time = time, brokerId = config.nodeId, brokerEpochSupplier = () => lifecycleManager.brokerEpoch ) alterIsrManager.start() this._replicaManager = new ReplicaManager(config, metrics, time, None, kafkaScheduler, logManager, isShuttingDown, quotaManagers, brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager, threadNamePrefix) /* start token manager */ if (config.tokenAuthEnabled) throw new UnsupportedOperationException("Delegation tokens are not supported") tokenManager = new DelegationTokenManager(config, tokenCache, time , null) tokenManager.startup() // does nothing, we just need a token manager in order to compile right now... // Create group coordinator, but don't start it until we've started replica manager. // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics) val producerIdManagerSupplier = () => ProducerIdManager.rpc( config.brokerId, brokerEpochSupplier = () => lifecycleManager.brokerEpoch, clientToControllerChannelManager, config.requestTimeoutMs ) // Create transaction coordinator, but don't start it until we've started replica manager. // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM) autoTopicCreationManager = new DefaultAutoTopicCreationManager( config, Some(clientToControllerChannelManager), None, None, groupCoordinator, transactionCoordinator) /* Add all reconfigurables for config change notification before starting the metadata listener */ config.dynamicConfig.addReconfigurables(this) dynamicConfigHandlers = Map[String, ConfigHandler]( ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, None), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)) if (!config.processRoles.contains(ControllerRole)) // If no controller is defined, we rely on the broker to generate snapshots. metadataSnapshotter = Some(new BrokerMetadataSnapshotter( config.nodeId, time, threadNamePrefix, new BrokerSnapshotWriterBuilder(raftManager.client) )) metadataListener = new BrokerMetadataListener(config.nodeId, time, threadNamePrefix, config.metadataSnapshotMaxNewRecordBytes, metadataSnapshotter) val networkListeners = new ListenerCollection() config.advertisedListeners.foreach ep => networkListeners.add(new Listener(). setHost(if (Utils.isBlank(ep.host)) InetAddress.getLocalHost.getCanonicalHostName else ep.host). setName(ep.listenerName.value()). setPort(if (ep.port == 0) socketServer.boundPort(ep.listenerName) else ep.port). setSecurityProtocol(ep.securityProtocol.id)) lifecycleManager.start(() => metadataListener.highestMetadataOffset(), BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config, "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong), metaProps.clusterId, networkListeners, supportedFeatures) // Register a listener with the Raft layer to receive metadata event notifications raftManager.register(metadataListener) val endpoints = new util.ArrayList[Endpoint](networkListeners.size()) var interBrokerListener: Endpoint = null networkListeners.iterator().forEachRemaining(listener => val endPoint = new Endpoint(listener.name(), SecurityProtocol.forId(listener.securityProtocol()), listener.host(), listener.port()) endpoints.add(endPoint) if (listener.name().equals(config.interBrokerListenerName.value())) interBrokerListener = endPoint ) if (interBrokerListener == null) throw new RuntimeException("Unable to find inter-broker listener " + config.interBrokerListenerName.value() + ". Found listener(s): " + endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", ")) val authorizerInfo = ServerInfo(new ClusterResource(clusterId), config.nodeId, endpoints, interBrokerListener) /* Get the authorizer and initialize it if one is specified.*/ authorizer = config.authorizer authorizer.foreach(_.configure(config.originals)) val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match case Some(authZ) => authZ.start(authorizerInfo).asScala.map case (ep, cs) => ep -> cs.toCompletableFuture case None => authorizerInfo.endpoints.asScala.map ep => ep -> CompletableFuture.completedFuture[Void](null) .toMap val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) // Create the request processor objects. val raftSupport = RaftSupport(forwardingManager, metadataCache) dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport, replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager, config.nodeId, config, metadataCache, metadataCache, metrics, authorizer, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager) dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"$SocketServer.DataPlaneMetricPrefixRequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix) if (socketServer.controlPlaneRequestChannelOpt.isDefined) 以上是关于Kafka 3.0 源码笔记-Kafka 服务端的启动与请求处理源码分析的主要内容,如果未能解决你的问题,请参考以下文章
就这一次!拼多多内部架构师培训Kafka源码笔记(现已绝版)
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR
聊聊 Kafka:协调者 GroupCoordinator 源码剖析之 FIND_COORDINATOR