[Kafka] broker 的启动
Posted 虫师的后花园
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Kafka] broker 的启动相关的知识,希望对你有一定的参考价值。
写在前面:
1. 这部分文章是边看边写的,一开始难面有错误,也会存在没讲清楚的地方。如果发现了,麻烦指出。后续会根据读到的内容逐渐进行修正和丰富
2. 暂时想按照启动流程把涉及到的几个broker 类梳理一下,边看边写,不稳定更新
3. ${xxx} 表示来自server.properties 的对应参数
kafka.Kafka
Broker 启动类为core.src.main.scala.kafka 包内的Kafka.scala。
Object
Methods
main(args: Array[String]):Unit
Broker 的启动函数,代码如下:
def main(args: Array[String]): Unit = {
try {
val serverProps = getPropsFromArgs(args)
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() {
override def run() = {
kafkaServerStartable.shutdown
}
})
kafkaServerStartable.startup
kafkaServerStartable.awaitShutdown
}
catch {
case e: Throwable =>
fatal(e)
System.exit(1)
}
System.exit(0)
}
可以看到,在启动类里,Kafka 主要做了三件事:
读取配置文件
初始化一个KafkaServerStartable 对象并调用startup 方法
阻塞等待shutdown 信号显然,第二点才是启动的主要流程。不过在介绍主要流程之前,这里简单说明这仨动作:
Kafka.getPropsFromArgs(args:Array[String]):Unit
调用了Kafka.getPropsFromArgs(args):Properties,通过java 的OptionParser 从args 中传入的路径中导入配置文件。
KafkaServerStartable.startup(val serverConfig:KafkaConfig):Unit
实际上调用了kafka.server.KafkaServer 的startup(),详细内容在kafka.server.KafkaServer 中解释。
KafkaServerStartable.awaitShutdown():Unit
实际上调用了KafkaServer.awaitShutdown():Unit 方法;
在KafkaServerStartable.startup():Unit 方法中,初始化了一个 CountDownLatch(1) 对象shutdownLatch,在awaitShutdown 中则通过shutdownLatch.await() 等待shutdown 信号。
kafka.server.KafkaServer
Broker 启动流程的实际实现类,是zkClient,logManager,socketServer,kafkaController 等重要资源和角色的初始化和实现的入口。
Object
KafkaServer(val config:KafkaConfig, time:Time=SystemTime, threadNamePrefix:Option[String]=None)
这里介绍类中部分较重要的flag/objects。
// 标志位,字面上的意思
private val startupComplete = new AtomicBoolean(false)
private val isShuttingDown = new AtomicBoolean(false)
private val isStartingUp = new AtomicBoolean(false)
var apis: KafkaApis = null // broker对外提供的api
var socketServer: SocketServer = null // SocketServer,用以处理交互
var requestHandlerPool: KafkaRequestHandlerPool = null // 请求缓冲池
var logManager: LogManager = null // 用以管理broker 的数据(kafka logs)
var replicaManager: ReplicaManager = null // 用以管理/维护 replications
var consumerCoordinator: GroupCoordinator = null // 维护/管理consumer 和topic/partitions 的关系
var kafkaController: KafkaController = null // controller,可以理解为集群的leader
var zkUtils: ZkUtils = null // zk 连接,controller 选举等
// 线程持有以阻塞等待shutdown 信号
private var shutdownLatch = new CountDownLatch(1)
//
private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses
reporters.add(new JmxReporter(jmxPrefix))
// TODO:
private val metricConfig: MetricConfig = new MetricConfig()
.samples(config.metricNumSamples)
.timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS)
/**
* sealed trait BrokerStates{def state:Byte}
* 0:NotRunning; 1:Starting; 2:RecoveringFromUncleanShutdown; 3:RunningAsBroker
* 4: RunningAsController; 6:PendingControlledShutdown; 7:BrokerShuttingDown
*/
val brokerState: BrokerState = new BrokerState
// 维护了一个通过java 的ScheduledThreadPoolExecutor 线程池,用以容纳kafka 的后台调度线程。
// 线程池的容量为${background.threads} 的值,默认10。
val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
/**
* 服务端的metadata 缓存。包括:
* state 信息,controllerId,存活的节点信息,partition 信息
* 以及一个用以维持topicMatadata 一致性的ReentrantReadWriteLock
* 当集群信息变更时,由角色为controller 的broker 通知集群所有broker 更新该信息
*/
val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
// TODO:
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
Methods
public
startup(val serverConfig:KafkaConfig):Unit
校验isShuttingDown 和startupComplete 两个标志位,若无异常,则通过CAS 更新isStartingUp;
标记brokerState 为Starting;
启动定时器 kafkaScheduler.startup();
调用initZk()[private] 构造一个zk 客户端;
通过createLogManager()[private] 初始化logManager 并执行logManager.startup(),在这里完成存量log 的加载;
创建socketServer [new SocketServer]并调用startup() 方法,这里启动一个nio socket 服务管理连接,根据请求分类执行相应处理,构造响应并返回;
创建replicaManager [new ReplicaManager]并调用startup() 方法,isr 超时管理相关事务在这里;
创建kafkaController [new KafkaController]并调用startup() 方法,这里注册session 过期监听器,同时启动controller leader 选举;
创建consumerCoordinator [new GroupCoordinator] 并调用startup() 方法,cordinator 负责consumer 相关协调,topic/partitions 分配等;
创建apis [new KafkaApis] 这里进行权限认证;
初始化requestHandlerPool [new KafkaRequestHandlerPool],本质上为长度为${num.io.threads} (8 default)的Array[KafkaRequestHandler],这里开启线程,处理请求,此时变更brokerState 为RunningAsBroker;
创建kafkaHealthcheck 并执行startup()这一步完成zookeeper中/broker/ids目录下节点的注册,注册成功则成为一个活节点,否则就是dead;
初始化shutdownLatch(new CountDownLatch(1)) 并设置startupComplete=true,isStartingUp=false;
启动完成
private
initZK():zkUtils
初始化zookeeper 工具类:
若存在"/" 且zk 中没有该目录,则在zookeeper 中创建根目录;
在根目录下检索/brokers, /consumers, /controller, /config, /admin 和/isr_change_notification 等目录及其子目录,若不存在则创建;
返回一个ZkUtils 对象。
createLogManager(zkClient:ZkClient, brokerState:BrokerState):LogManager
该方法创建并返回一个用以管理broker 本地数据的LogManager 对象:
通过zkUtils 从zk 获取topics 信息并组装为一个以topic 为key 的Map 对象configs:Map<topic:String, config:LogConfig>;
new LogManager() 并返回。
以上是关于[Kafka] broker 的启动的主要内容,如果未能解决你的问题,请参考以下文章
Kafaka入门(1)- Kafka简介和安装与启动(mac)