Zookeeper源码阅读(十四) 单机Server
Posted gongcomeon
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper源码阅读(十四) 单机Server相关的知识,希望对你有一定的参考价值。
前言
前面两篇主要说了下client-server的session相关的内容,到这里client的内容以及client-server的连接的内容也就基本告一段落了,剩下的部分就是server部分内部的结构,zk的选举以及server部分的工作机制等了。 这一篇主要说下单机server的启动过程,里面会涉及到一些server内部的工作机制和机构。
Server架构
可以看到Zookeeper的server端主要分为几个大模块,ZKDatabase是zk server内部的内存数据库,内部维护了节点数据等关键数据,负责快照和日志的记录,同时也管理了session的超时和集群的选举,其他的部分主要有负责管理session的sessiontracker,负责处理请求的请求链和Learner模块(集群沟通?目前还不是特别清楚)。
单机版Server启动
单机版Server主要流程:
从上图中可以看到单机server启动可以分为预启动和初始化两个部分。
预启动
1. 统一由QuorumPeerMain作为启动类
无论单机或集群,在zkServer.cmd和zkServer.sh中都配置了QuorumPeerMain作为启动入口类。
2. 解析zoo.cfg
用过ZK的同学都知道zoo.cfg是用户配置的zookeeper核心配置文件,ticktime,dataDir,dataLogDir,集群ip:port等都配置在其中。在实例化QuorumPeerMain对象后会去解析zoo.cfg文件。
QuorumPeerMain(Main)->QuorumPeerMain(initializeAndRun)->QuorumPeerConfig(parse)->QuorumPeerConfig(parseProperties)
parseProperties函数太长了。。。而且都是很简单的property文件取值的操作,可以简单看下。
3. 创建并启动历史文件清理器DatadirCleanupManager
DatadirCleanupManager的start方法负责自动清理历史的快照和事务日志。
public void start() {
if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
LOG.warn("Purge task is already running.");
return;
}
// Don't schedule the purge task with zero or negative purge interval.
if (purgeInterval <= 0) {
LOG.info("Purge task is not scheduled.");
return;
}
timer = new Timer("PurgeTask", true);//利用了java的Timer类来做定时任务
TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);//PurgeTask是一个timertask
timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));//设置频率
purgeTaskStatus = PurgeTaskStatus.STARTED;
}
4. 判断启动模式
if (args.length == 1 && config.servers.size() > 0) {//通过解析zoo.cfg server数量来判断是否是集群
runFromConfig(config);//如果是集群,直接用QuorumPeerMain中集群启动方法
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);//如果是单机就用单机启动方法
}
5. 再次解析zoo.cfg
ZooKeeperServerMain(Main)->ZooKeeperServerMain(initializeAndRun)->ServerConfig(parse)->QuorumPeerConfig(parse)->QuorumPeerConfig(parseProperties)。
这里之所以还要进行一次解析是因为这里是调用的zookeeperserver的main方法,无法把原来解析的参数传入。而且配置文件比较小,解析并不是特别耗资源,可以接受。
6. 创建ZookeeperServer实例
ZookeeperServer是server端的核心类,在启动时会创建zookeeperserver的一个实例。
final ZooKeeperServer zkServer = new ZooKeeperServer();
到这里位置就完成了所谓的预启动,可以看出,在预启动阶段Zookeeper的server干了下面几件事:
- 清理历史快照和log文件;
- 解析配置文件并进行初步的分析,判断Server端状态(Standalone/Cluster);
- 实例化ZookeeperServer。
初始化
在实例化了zookeeperserver之后,zookeeper server端的启动过程便来到了初始化阶段,这个过程也是比较长的。
首先是在runfromconfig方法中:
public void runFromConfig(ServerConfig config) throws IOException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
// Note that this thread isn't going to be doing anything else,
// so rather than spawning another thread, we will just call
// run() in this thread.
// create a file logger url from the command line args
final ZooKeeperServer zkServer = new ZooKeeperServer();//1
// Registers shutdown handler which will be used to know the
// server error or shutdown state changes.
final CountDownLatch shutdownLatch = new CountDownLatch(1);//2
zkServer.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch));
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(//3
config.dataDir));
zkServer.setTxnLogFactory(txnLog);
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
cnxnFactory.startup(zkServer);
// Watch status of ZooKeeper server. It will do a graceful shutdown
// if the server is not running or hits an internal error.
shutdownLatch.await();
shutdown();
cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
1. 创建服务器统计器ServerStats
在上面代码的1处,也就是zookeeperserver的构造函数内实例化了Server的统计器ServerStats。
public ZooKeeperServer() {
serverStats = new ServerStats(this); //ServerStats统计了基本的server的数据如收发packet数,延迟信息等。
listener = new ZooKeeperServerListenerImpl(this);
}
简单介绍下ServerStats:
/**
* Basic Server Statistics
*/
//从注释也可以知道ServerStats是server数据的基础类
public class ServerStats {
private long packetsSent;//zkserver启动后,或是最近一次充值服务端统计信息后,服务端->客户端发送的响应次数
private long packetsReceived;//zkserver启动后,或是最近一次充值服务端统计信息后,服务端收到的客户端发送的响应次数
private long maxLatency;//zkserver启动后,或是最近一次充值服务端统计信息后,server端请求处理的最大延时
private long minLatency = Long.MAX_VALUE;//zkserver启动后,或是最近一次充值服务端统计信息后,server端请求处理的最小延时
private long totalLatency = 0;//zkserver启动后,或是最近一次充值服务端统计信息后,server端请求处理的总延时
private long count = 0;//zkserver启动后,或是最近一次充值服务端统计信息后,server端处理的客户端请求总次数
private final Provider provider;//provider对象提供部分统计数据,如下
public interface Provider {
public long getOutstandingRequests();//获取队列中还没有被处理的请求数量,在zookeeperserver和finalrequestprocessor中
public long getLastProcessedZxid();//获得目前最新的zxid
public String getState();//获取服务器状态
public int getNumAliveConnections();//获取存活的客户端连接总数
}
public ServerStats(Provider provider) {//构造器
this.provider = provider;
}
2. 创建数据管理器FileTxnSnapLog
FileTxnSnapLog是Zookeeper上层服务器和底层数据存储之间的对接层,提供了一系列操作数据文件的接口,如事务日志文件和快照数据文件。Zookeeper根据zoo.cfg文件中解析出的快照数据目录dataDir和事务日志目录dataLogDir来创建FileTxnSnapLog。
其实这里的FileTxnSnapLog就是包含了第3,4篇中讲的快照和日志类FileTxnLog,FileSnap的功能类,FileTxnLog,FileSnap的生成以来dataDir和snapDir(dataLogDir)来生成。
3. 设置服务器tickTime和会话超时时间限制
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
4. 创建ServerCnxnFactory
通过配置系统属性zookeper.serverCnxnFactory来指定使用Zookeeper自己实现的NIO还是使用Netty框架作为Zookeeper服务端网络连接工厂。
cnxnFactory = ServerCnxnFactory.createFactory();//创建ServerCnxnFactory
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());//初始化ServerCnxnFactory
cnxnFactory.startup(zkServer);//启动ServerCnxnFactory
之前提到过这里利用到了反射。
static public ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName =
System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);//读取配置
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NioserverCnxnFactory.class.getName();//默认是NIO实现
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)//如果配置了,则按照配置来实例化。反射
.getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}
5. 初始化ServerCnxnFactory
NIOServerCnxnFactory(configure)
@Override
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
configureSaslLogin();
thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);//传入的runnable对象是ServerCnxnFactory的实现类
thread.setDaemon(true);//设置为daemon线程
maxClientCnxns = maxcc;//NIO相关的设置
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}
可以看到,Zookeeper会初始化Thread作为ServerCnxnFactory的主线程,然后再初始化NIO服务器。而zookeeper初始化的thread传入的runnable对象依然是ServerCnxnFactory的实现类,也就是说run的时候依然是执行ServerCnxnFactory。
6. 启动ServerCnxnFactory主线程
@Override
public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {
start();//启动主线程
setZooKeeperServer(zks);//设置server端对象
zks.startdata();//恢复数据等
zks.startup();
}
其中start方法启动了线程。
public void start() {
// ensure thread is started once and only once
if (thread.getState() == Thread.State.NEW) {//如果是刚启动
thread.start();//启动线程
}
}
NIOServerCnxnFactory的run方法是NIO异步连接的一些基本设置如建立连接等。
7. 恢复本地数据
每次zk启动时,都需要从本地块找数据文件和事务日志文件中进行数据恢复。NIOServerCnxnFactory(startdata)->ZooKeeperServer(startdata)中进行了恢复data的操作。
8. 创建并启动session管理器
所谓的session管理器就是前面说的sessiontracker。
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();//创建session管理器
}
startSessionTracker();//启动session管理器
setupRequestProcessors();//初始化zookeeper请求处理链
registerJMX();//注册JMX服务
setState(State.RUNNING);
notifyAll();
}
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1, getZooKeeperServerListener());//创建sessiontrack,初始化管理器里的sessionsWithTimeout,expirationInterval等变量,特别的是,初始化sessionId也在这里一起做了
}
9. 初始化Zookeeper的请求处理链
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
zookeeper请求处理方式基于责任链模式,也就是说在server端有多个请求处理器一次来处理一个客户端请求。在服务器启动的时候,会将这些处理器串联起来形成一个处理链。上图是单机server的处理器。
10. 注册JMX服务
ZK 服务器的信息会以JXM的方式暴露给外部。这里还不太了解。
11. 注册ZK服务器实例
public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {//如果在初始化时会wait住
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Une
通过上面代码的注释可以知道在初始化时请求提交处理会wait住,而这个函数是在NIOServerCnxnFactory(run)->NIOServerCnxn(doIO)->ZookeeperServer(processConnectRequest)->ZookeeperServer(createSession)->ZookeeperServer(submitRequest)中调用的,也即是前面在启动ServerCnxnFactory主线程时便会在这里wait住。
setState(State.RUNNING);
notifyAll();
而在这里便会notify时线程可以完全工作。
思考
JMX需要再学习
请求链和日志/快照清理过程
参考
http://www.cnblogs.com/leesf456/p/6105276.html
https://www.jianshu.com/p/47cb9e6d309d
https://my.oschina.net/pingpangkuangmo/blog/491673
https://my.oschina.net/xianggao/blog/537902
https://www.jianshu.com/p/76d6d674530b
从paxos到zk
以上是关于Zookeeper源码阅读(十四) 单机Server的主要内容,如果未能解决你的问题,请参考以下文章