[Kafka] broker 的启动

Posted 虫师的后花园

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Kafka] broker 的启动相关的知识,希望对你有一定的参考价值。

写在前面:

1. 这部分文章是边看边写的,一开始难面有错误,也会存在没讲清楚的地方。如果发现了,麻烦指出。后续会根据读到的内容逐渐进行修正和丰富

2. 暂时想按照启动流程把涉及到的几个broker 类梳理一下,边看边写,不稳定更新

3. ${xxx} 表示来自server.properties 的对应参数


kafka.Kafka

Broker 启动类为core.src.main.scala.kafka 包内的Kafka.scala。

Object

Methods

main(args: Array[String]):Unit

Broker 的启动函数,代码如下:


def main(args: Array[String]): Unit = {
   try {

     val serverProps = getPropsFromArgs(args)
     val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

     // attach shutdown handler to catch control-c
     Runtime.getRuntime().addShutdownHook(new Thread() {
       override def run() = {
         kafkaServerStartable.shutdown
       }
     })

     kafkaServerStartable.startup
     kafkaServerStartable.awaitShutdown
   }
   catch {
     case e: Throwable =>
       fatal(e)
       System.exit(1)
   }
   System.exit(0)
 }

可以看到,在启动类里,Kafka 主要做了三件事:

  1. 读取配置文件

  2. 初始化一个KafkaServerStartable 对象并调用startup 方法

  3. 阻塞等待shutdown 信号显然,第二点才是启动的主要流程。不过在介绍主要流程之前,这里简单说明这仨动作:

Kafka.getPropsFromArgs(args:Array[String]):Unit
  • 调用了Kafka.getPropsFromArgs(args):Properties,通过java 的OptionParser 从args 中传入的路径中导入配置文件。

KafkaServerStartable.startup(val serverConfig:KafkaConfig):Unit

  • 实际上调用了kafka.server.KafkaServer 的startup(),详细内容在kafka.server.KafkaServer 中解释。

KafkaServerStartable.awaitShutdown():Unit

  • 实际上调用了KafkaServer.awaitShutdown():Unit 方法;

  • 在KafkaServerStartable.startup():Unit 方法中,初始化了一个 CountDownLatch(1) 对象shutdownLatch,在awaitShutdown 中则通过shutdownLatch.await() 等待shutdown 信号。


kafka.server.KafkaServer

Broker 启动流程的实际实现类,是zkClient,logManager,socketServer,kafkaController 等重要资源和角色的初始化和实现的入口。

Object

KafkaServer(val config:KafkaConfig, time:Time=SystemTime, threadNamePrefix:Option[String]=None)

这里介绍类中部分较重要的flag/objects。

// 标志位,字面上的意思
private val startupComplete = new AtomicBoolean(false)
private val isShuttingDown = new AtomicBoolean(false)
private val isStartingUp = new AtomicBoolean(false)

var apis: KafkaApis = null // broker对外提供的api
var socketServer: SocketServer = null // SocketServer,用以处理交互
var requestHandlerPool: KafkaRequestHandlerPool = null // 请求缓冲池
var logManager: LogManager = null // 用以管理broker 的数据(kafka logs)
var replicaManager: ReplicaManager = null // 用以管理/维护 replications
var consumerCoordinator: GroupCoordinator = null // 维护/管理consumer 和topic/partitions 的关系
var kafkaController: KafkaController = null // controller,可以理解为集群的leader
var zkUtils: ZkUtils = null // zk 连接,controller 选举等

// 线程持有以阻塞等待shutdown 信号
private var shutdownLatch = new CountDownLatch(1)

//
private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses
reporters.add(new JmxReporter(jmxPrefix))

// TODO:
private val metricConfig: MetricConfig = new MetricConfig()
 .samples(config.metricNumSamples)
 .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS)

/**
* sealed trait BrokerStates{def state:Byte}
* 0:NotRunning; 1:Starting; 2:RecoveringFromUncleanShutdown; 3:RunningAsBroker
* 4: RunningAsController; 6:PendingControlledShutdown; 7:BrokerShuttingDown
*/
val brokerState: BrokerState = new BrokerState

// 维护了一个通过java 的ScheduledThreadPoolExecutor 线程池,用以容纳kafka 的后台调度线程。
// 线程池的容量为${background.threads} 的值,默认10。
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)

/**
* 服务端的metadata 缓存。包括:
* state 信息,controllerId,存活的节点信息,partition 信息
* 以及一个用以维持topicMatadata 一致性的ReentrantReadWriteLock
* 当集群信息变更时,由角色为controller 的broker 通知集群所有broker 更新该信息
*/
val metadataCache: MetadataCache = new MetadataCache(config.brokerId)

// TODO:
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap

Methods

public

startup(val serverConfig:KafkaConfig):Unit
  • 校验isShuttingDown 和startupComplete 两个标志位,若无异常,则通过CAS 更新isStartingUp;

  • 标记brokerState 为Starting;

  • 启动定时器 kafkaScheduler.startup();

  • 调用initZk()[private] 构造一个zk 客户端;

  • 通过createLogManager()[private] 初始化logManager 并执行logManager.startup(),在这里完成存量log 的加载;

  • 创建socketServer [new SocketServer]并调用startup() 方法,这里启动一个nio socket 服务管理连接,根据请求分类执行相应处理,构造响应并返回;

  • 创建replicaManager [new ReplicaManager]并调用startup() 方法,isr 超时管理相关事务在这里;

  • 创建kafkaController [new KafkaController]并调用startup() 方法,这里注册session 过期监听器,同时启动controller leader 选举;

  • 创建consumerCoordinator [new GroupCoordinator] 并调用startup() 方法,cordinator 负责consumer 相关协调,topic/partitions 分配等;

  • 创建apis [new KafkaApis] 这里进行权限认证;

  • 初始化requestHandlerPool [new KafkaRequestHandlerPool],本质上为长度为${num.io.threads} (8 default)的Array[KafkaRequestHandler],这里开启线程,处理请求,此时变更brokerState 为RunningAsBroker;

  • 创建kafkaHealthcheck 并执行startup()这一步完成zookeeper中/broker/ids目录下节点的注册,注册成功则成为一个活节点,否则就是dead

  • 初始化shutdownLatch(new CountDownLatch(1)) 并设置startupComplete=true,isStartingUp=false;

  • 启动完成

private

initZK():zkUtils

初始化zookeeper 工具类:

  • 若存在"/" 且zk 中没有该目录,则在zookeeper 中创建根目录;

  • 在根目录下检索/brokers, /consumers, /controller, /config, /admin 和/isr_change_notification 等目录及其子目录,若不存在则创建;

  • 返回一个ZkUtils 对象。


createLogManager(zkClient:ZkClient, brokerState:BrokerState):LogManager

该方法创建并返回一个用以管理broker 本地数据的LogManager 对象:

  • 通过zkUtils 从zk 获取topics 信息并组装为一个以topic 为key 的Map 对象configs:Map<topic:String, config:LogConfig>;

  • new LogManager() 并返回。

以上是关于[Kafka] broker 的启动的主要内容,如果未能解决你的问题,请参考以下文章

如何扩展Kafka的broker

Kafaka入门(1)- Kafka简介和安装与启动(mac)

Kafka 基础原理及工作流程简述

阿里大牛的Kafka动态配置了解下?

kafka集群启动遇到LeaderNotAvailableException错误

Kafka消息队列大数据实战教程-第五篇(Broker工作原理以及节点服役)