Zookeeper源码阅读(十四) 单机Server

Posted gongcomeon

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper源码阅读(十四) 单机Server相关的知识,希望对你有一定的参考价值。

前言

前面两篇主要说了下client-server的session相关的内容,到这里client的内容以及client-server的连接的内容也就基本告一段落了,剩下的部分就是server部分内部的结构,zk的选举以及server部分的工作机制等了。 这一篇主要说下单机server的启动过程,里面会涉及到一些server内部的工作机制和机构。

Server架构

技术分享图片

可以看到Zookeeper的server端主要分为几个大模块,ZKDatabase是zk server内部的内存数据库,内部维护了节点数据等关键数据,负责快照和日志的记录,同时也管理了session的超时和集群的选举,其他的部分主要有负责管理session的sessiontracker,负责处理请求的请求链和Learner模块(集群沟通?目前还不是特别清楚)。

单机版Server启动

单机版Server主要流程:

技术分享图片

从上图中可以看到单机server启动可以分为预启动和初始化两个部分。

预启动

1. 统一由QuorumPeerMain作为启动类

无论单机或集群,在zkServer.cmd和zkServer.sh中都配置了QuorumPeerMain作为启动入口类。

2. 解析zoo.cfg

用过ZK的同学都知道zoo.cfg是用户配置的zookeeper核心配置文件,ticktime,dataDir,dataLogDir,集群ip:port等都配置在其中。在实例化QuorumPeerMain对象后会去解析zoo.cfg文件。

QuorumPeerMain(Main)->QuorumPeerMain(initializeAndRun)->QuorumPeerConfig(parse)->QuorumPeerConfig(parseProperties)

parseProperties函数太长了。。。而且都是很简单的property文件取值的操作,可以简单看下。

3. 创建并启动历史文件清理器DatadirCleanupManager

DatadirCleanupManager的start方法负责自动清理历史的快照和事务日志。

public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
        return;
    }
    // Don't schedule the purge task with zero or negative purge interval.
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled.");
        return;
    }

    timer = new Timer("PurgeTask", true);//利用了java的Timer类来做定时任务
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);//PurgeTask是一个timertask
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));//设置频率

    purgeTaskStatus = PurgeTaskStatus.STARTED;
}
4. 判断启动模式
if (args.length == 1 && config.servers.size() > 0) {//通过解析zoo.cfg server数量来判断是否是集群
    runFromConfig(config);//如果是集群,直接用QuorumPeerMain中集群启动方法
} else {
    LOG.warn("Either no config or no quorum defined in config, running "
            + " in standalone mode");
    // there is only server in the quorum -- run as standalone
    ZooKeeperServerMain.main(args);//如果是单机就用单机启动方法
}
5. 再次解析zoo.cfg

ZooKeeperServerMain(Main)->ZooKeeperServerMain(initializeAndRun)->ServerConfig(parse)->QuorumPeerConfig(parse)->QuorumPeerConfig(parseProperties)。

这里之所以还要进行一次解析是因为这里是调用的zookeeperserver的main方法,无法把原来解析的参数传入。而且配置文件比较小,解析并不是特别耗资源,可以接受。

6. 创建ZookeeperServer实例

ZookeeperServer是server端的核心类,在启动时会创建zookeeperserver的一个实例。

final ZooKeeperServer zkServer = new ZooKeeperServer();

到这里位置就完成了所谓的预启动,可以看出,在预启动阶段Zookeeper的server干了下面几件事:

  1. 清理历史快照和log文件;
  2. 解析配置文件并进行初步的分析,判断Server端状态(Standalone/Cluster);
  3. 实例化ZookeeperServer。

初始化

在实例化了zookeeperserver之后,zookeeper server端的启动过程便来到了初始化阶段,这个过程也是比较长的。

首先是在runfromconfig方法中:

public void runFromConfig(ServerConfig config) throws IOException {
    LOG.info("Starting server");
    FileTxnSnapLog txnLog = null;
    try {
        // Note that this thread isn't going to be doing anything else,
        // so rather than spawning another thread, we will just call
        // run() in this thread.
        // create a file logger url from the command line args
        final ZooKeeperServer zkServer = new ZooKeeperServer();//1
        // Registers shutdown handler which will be used to know the
        // server error or shutdown state changes.
        final CountDownLatch shutdownLatch = new CountDownLatch(1);//2
        zkServer.registerServerShutdownHandler(
                new ZooKeeperServerShutdownHandler(shutdownLatch));

        txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(//3
                config.dataDir));
        zkServer.setTxnLogFactory(txnLog);
        zkServer.setTickTime(config.tickTime);
        zkServer.setMinSessionTimeout(config.minSessionTimeout);
        zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
        cnxnFactory = ServerCnxnFactory.createFactory();
        cnxnFactory.configure(config.getClientPortAddress(),
                config.getMaxClientCnxns());
        cnxnFactory.startup(zkServer);
        // Watch status of ZooKeeper server. It will do a graceful shutdown
        // if the server is not running or hits an internal error.
        shutdownLatch.await();
        shutdown();

        cnxnFactory.join();
        if (zkServer.canShutdown()) {
            zkServer.shutdown(true);
        }
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Server interrupted", e);
    } finally {
        if (txnLog != null) {
            txnLog.close();
        }
    }
}
1. 创建服务器统计器ServerStats

在上面代码的1处,也就是zookeeperserver的构造函数内实例化了Server的统计器ServerStats。

public ZooKeeperServer() {
    serverStats = new ServerStats(this); //ServerStats统计了基本的server的数据如收发packet数,延迟信息等。
    listener = new ZooKeeperServerListenerImpl(this);
}

简单介绍下ServerStats:

/**
 * Basic Server Statistics
 */
 //从注释也可以知道ServerStats是server数据的基础类
public class ServerStats {
    private long packetsSent;//zkserver启动后,或是最近一次充值服务端统计信息后,服务端->客户端发送的响应次数
    private long packetsReceived;//zkserver启动后,或是最近一次充值服务端统计信息后,服务端收到的客户端发送的响应次数
    private long maxLatency;//zkserver启动后,或是最近一次充值服务端统计信息后,server端请求处理的最大延时
    private long minLatency = Long.MAX_VALUE;//zkserver启动后,或是最近一次充值服务端统计信息后,server端请求处理的最小延时
    private long totalLatency = 0;//zkserver启动后,或是最近一次充值服务端统计信息后,server端请求处理的总延时
    private long count = 0;//zkserver启动后,或是最近一次充值服务端统计信息后,server端处理的客户端请求总次数

    private final Provider provider;//provider对象提供部分统计数据,如下

    public interface Provider {
        public long getOutstandingRequests();//获取队列中还没有被处理的请求数量,在zookeeperserver和finalrequestprocessor中
        public long getLastProcessedZxid();//获得目前最新的zxid
        public String getState();//获取服务器状态
        public int getNumAliveConnections();//获取存活的客户端连接总数
    }
    
    public ServerStats(Provider provider) {//构造器
        this.provider = provider;
    }
2. 创建数据管理器FileTxnSnapLog

FileTxnSnapLog是Zookeeper上层服务器和底层数据存储之间的对接层,提供了一系列操作数据文件的接口,如事务日志文件和快照数据文件。Zookeeper根据zoo.cfg文件中解析出的快照数据目录dataDir和事务日志目录dataLogDir来创建FileTxnSnapLog。

其实这里的FileTxnSnapLog就是包含了第3,4篇中讲的快照和日志类FileTxnLog,FileSnap的功能类,FileTxnLog,FileSnap的生成以来dataDir和snapDir(dataLogDir)来生成。

3. 设置服务器tickTime和会话超时时间限制
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
4. 创建ServerCnxnFactory

通过配置系统属性zookeper.serverCnxnFactory来指定使用Zookeeper自己实现的NIO还是使用Netty框架作为Zookeeper服务端网络连接工厂。

cnxnFactory = ServerCnxnFactory.createFactory();//创建ServerCnxnFactory
cnxnFactory.configure(config.getClientPortAddress(),
        config.getMaxClientCnxns());//初始化ServerCnxnFactory
cnxnFactory.startup(zkServer);//启动ServerCnxnFactory

之前提到过这里利用到了反射。

static public ServerCnxnFactory createFactory() throws IOException {
    String serverCnxnFactoryName =
        System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);//读取配置
    if (serverCnxnFactoryName == null) {
        serverCnxnFactoryName = NioserverCnxnFactory.class.getName();//默认是NIO实现
    }
    try {
        ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)//如果配置了,则按照配置来实例化。反射
                .getDeclaredConstructor().newInstance();
        LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
        return serverCnxnFactory;
    } catch (Exception e) {
        IOException ioe = new IOException("Couldn't instantiate "
                + serverCnxnFactoryName);
        ioe.initCause(e);
        throw ioe;
    }
}
5. 初始化ServerCnxnFactory

NIOServerCnxnFactory(configure)

@Override
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
    configureSaslLogin();

    thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);//传入的runnable对象是ServerCnxnFactory的实现类
thread.setDaemon(true);//设置为daemon线程
    maxClientCnxns = maxcc;//NIO相关的设置
    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port " + addr);
    ss.socket().bind(addr);
    ss.configureBlocking(false);
    ss.register(selector, SelectionKey.OP_ACCEPT);
}

可以看到,Zookeeper会初始化Thread作为ServerCnxnFactory的主线程,然后再初始化NIO服务器。而zookeeper初始化的thread传入的runnable对象依然是ServerCnxnFactory的实现类,也就是说run的时候依然是执行ServerCnxnFactory。

6. 启动ServerCnxnFactory主线程
@Override
public void startup(ZooKeeperServer zks) throws IOException,
        InterruptedException {
    start();//启动主线程
    setZooKeeperServer(zks);//设置server端对象
    zks.startdata();//恢复数据等
    zks.startup();
}

其中start方法启动了线程。

public void start() {
    // ensure thread is started once and only once
    if (thread.getState() == Thread.State.NEW) {//如果是刚启动
        thread.start();//启动线程
    }
}

NIOServerCnxnFactory的run方法是NIO异步连接的一些基本设置如建立连接等。

7. 恢复本地数据

每次zk启动时,都需要从本地块找数据文件和事务日志文件中进行数据恢复。NIOServerCnxnFactory(startdata)->ZooKeeperServer(startdata)中进行了恢复data的操作。

8. 创建并启动session管理器

所谓的session管理器就是前面说的sessiontracker。

public synchronized void startup() {
    if (sessionTracker == null) {
        createSessionTracker();//创建session管理器
    }
    startSessionTracker();//启动session管理器
    setupRequestProcessors();//初始化zookeeper请求处理链

    registerJMX();//注册JMX服务

    setState(State.RUNNING);
    notifyAll();
}
protected void createSessionTracker() {
    sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
            tickTime, 1, getZooKeeperServerListener());//创建sessiontrack,初始化管理器里的sessionsWithTimeout,expirationInterval等变量,特别的是,初始化sessionId也在这里一起做了
}
9. 初始化Zookeeper的请求处理链
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

技术分享图片

zookeeper请求处理方式基于责任链模式,也就是说在server端有多个请求处理器一次来处理一个客户端请求。在服务器启动的时候,会将这些处理器串联起来形成一个处理链。上图是单机server的处理器。

10. 注册JMX服务

ZK 服务器的信息会以JXM的方式暴露给外部。这里还不太了解。

11. 注册ZK服务器实例
public void submitRequest(Request si) {
    if (firstProcessor == null) {
        synchronized (this) {
            try {
                // Since all requests are passed to the request
                // processor it should wait for setting up the request
                // processor chain. The state will be updated to RUNNING
                // after the setup.
                while (state == State.INITIAL) {//如果在初始化时会wait住
                    wait(1000);
                }
            } catch (InterruptedException e) {
                LOG.warn("Une

通过上面代码的注释可以知道在初始化时请求提交处理会wait住,而这个函数是在NIOServerCnxnFactory(run)->NIOServerCnxn(doIO)->ZookeeperServer(processConnectRequest)->ZookeeperServer(createSession)->ZookeeperServer(submitRequest)中调用的,也即是前面在启动ServerCnxnFactory主线程时便会在这里wait住。

setState(State.RUNNING);
notifyAll();

而在这里便会notify时线程可以完全工作。

思考

JMX需要再学习

请求链和日志/快照清理过程

参考

http://www.cnblogs.com/leesf456/p/6105276.html

https://www.jianshu.com/p/47cb9e6d309d
https://my.oschina.net/pingpangkuangmo/blog/491673

https://my.oschina.net/xianggao/blog/537902

https://www.jianshu.com/p/76d6d674530b

从paxos到zk

以上是关于Zookeeper源码阅读(十四) 单机Server的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper源码阅读(十五) Zookeeper集群之server启动

ZooKeeper系列3.ZooKeeper源码环境搭建

如何远程调试zookeeper集群

「大数据」(三十四)ZooKeeper之服务简介

Zookeeper 源码请求处理

Zookeeper源码分析之单机模式消息流动分析