zookeeper启动入口

Posted 满城风絮2013

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper启动入口相关的知识,希望对你有一定的参考价值。

  最近正在研究zookeeper,一些心得记录一下,如有错误,还请大神指正。

zookeeper下载地址:http://zookeeper.apache.org/releases.html,百度一下就能找到,不过还是在这里列一下。

我认为学习一个东西,首先要理出一个头绪,否则感觉无从下手,这里我从启动开始研究,即从zkSever.sh入手。

if [ "x$JMXDISABLE" = "x" ]
then
    echo "JMX enabled by default" >&2
    # for some reason these two options are necessary on jdk6 on Ubuntu
    #   accord to the docs they are not necessary, but otw jconsole cannot
    #   do a local attach
    ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
else
    echo "JMX disabled by user request" >&2
    ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi

  从zkSever.sh可以看出,启动入口在QuorumPeerMain中,源码如下:

  // 入口函数
public static void main(String[] args)
  {
    QuorumPeerMain main = new QuorumPeerMain();
   //...1、启动初始化
      main.initializeAndRun(args);
   // ...
  }

  protected void initializeAndRun(String[] args)
    throws QuorumPeerConfig.ConfigException, IOException
  {
    // 2、加载配置文件
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
// 解析配置文件 config.parse(args[0]); } if ((args.length == 1) && (config.servers.size() > 0)) {
    // 配置文件的信息加载至QuorumPeer runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running in standalone mode"); ZooKeeperServerMain.main(args); } } public void runFromConfig(QuorumPeerConfig config) throws IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { NioserverCnxn.Factory cnxnFactory = new NIOServerCnxn.Factory(config.getClientPortAddress(), config.getMaxClientCnxns()); // 3、启动QuorumPeer this.quorumPeer = new QuorumPeer(); this.quorumPeer.setClientPortAddress(config.getClientPortAddress()); //...加载各种配置信息 this.quorumPeer.start(); this.quorumPeer.join(); } catch (InterruptedException e) { LOG.warn("Quorum Peer interrupted", e); } }

  可以看出,配置文件的解析由QuorumPeerConfig 类完成,其部分源码如下:

  public void parse(String path)
    throws QuorumPeerConfig.ConfigException
  {
    File configFile = new File(path);

    LOG.info("Reading configuration from: " + configFile);
    try
    {
      if (!configFile.exists()) {
        throw new IllegalArgumentException(configFile.toString() + " file is missing");
      }
     // 将配置信息加载如property文件
      Properties cfg = new Properties();
      FileInputStream in = new FileInputStream(configFile);
      try {
        cfg.load(in);
      } finally {
        in.close();
      }

      parseProperties(cfg);
    } catch (IOException e) {
      throw new ConfigException("Error processing " + path, e);
    } catch (IllegalArgumentException e) {
      throw new ConfigException("Error processing " + path, e);
    }
  }

  public void parseProperties(Properties zkProp)
    throws IOException, QuorumPeerConfig.ConfigException
  {
    int clientPort = 0;
    String clientPortAddress = null;
// 循环解析配置文件 for (Map.Entry entry : zkProp.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); if (key.equals("dataDir")) { this.dataDir = value; } else if (key.equals("dataLogDir")) { this.dataLogDir = value; } else if (key.equals("clientPort")) {
    // 客户端连接的端口号 clientPort = Integer.parseInt(value); } else if (key.equals("clientPortAddress")) { clientPortAddress = value.trim(); } else if (key.equals("tickTime")) {
    // 心跳时间 this.tickTime = Integer.parseInt(value); } else if (key.equals("maxClientCnxns")) { this.maxClientCnxns = Integer.parseInt(value); } else if (key.equals("minSessionTimeout")) { this.minSessionTimeout = Integer.parseInt(value); } else if (key.equals("maxSessionTimeout")) { this.maxSessionTimeout = Integer.parseInt(value); } else if (key.equals("initLimit")) { this.initLimit = Integer.parseInt(value); } else if (key.equals("syncLimit")) { this.syncLimit = Integer.parseInt(value); } else if (key.equals("electionAlg")) {
    // 选举算法的类型,默认算法为FastLeaderElection this.electionAlg = Integer.parseInt(value); } else if (key.equals("peerType")) { if (value.toLowerCase().equals("observer")) this.peerType = QuorumPeer.LearnerType.OBSERVER; else if (value.toLowerCase().equals("participant")) { this.peerType = QuorumPeer.LearnerType.PARTICIPANT; } else throw new ConfigException("Unrecognised peertype: " + value); } //...

  回到QuorumPeerMain类的runFromConfig方法。此方法中,会将配置信息加载至QuorumPeer,并调用其start方法:

  public synchronized void start()
  {
    try {
      this.zkDb.loadDataBase();
    } catch (IOException ie) {
      LOG.fatal("Unable to load database on disk", ie);
      throw new RuntimeException("Unable to run quorum server ", ie);
    }
    this.cnxnFactory.start();
    startLeaderElection();
    super.start();
  }

  在start方法中,会现价在硬盘中的数据,

 this.zkDb.loadDataBase();即ZKDatabase中
  public long loadDataBase()
    throws IOException
  {
    FileTxnSnapLog.PlayBackListener listener = new FileTxnSnapLog.PlayBackListener() {
      public void onTxnLoaded(TxnHeader hdr, Record txn) {
        Request r = new Request(null, 0L, hdr.getCxid(), hdr.getType(), null, null);

        r.txn = txn;
        r.hdr = hdr;
        r.zxid = hdr.getZxid();
        ZKDatabase.this.addCommittedProposal(r);
      }
    };
    long zxid = this.snapLog.restore(this.dataTree, this.sessionsWithTimeouts, listener);
    this.initialized = true;
    return zxid;
  }

  然后开确定选类型,startLeaderElection

  public synchronized void startLeaderElection() {
    this.currentVote = new Vote(this.myid, getLastLoggedZxid());
    for (QuorumServer p : getView().values()) {
      if (p.id == this.myid) {
        this.myQuorumAddr = p.addr;
        break;
      }
    }
    if (this.myQuorumAddr == null) {
      throw new RuntimeException("My id " + this.myid + " not in the peer list");
    }
    if (this.electionType == 0) {
      try {
        this.udpSocket = new DatagramSocket(this.myQuorumAddr.getPort());
        this.responder = new ResponderThread();
        this.responder.start();
      } catch (SocketException e) {
        throw new RuntimeException(e);
      }
    }
    this.electionAlg = createElectionAlgorithm(this.electionType);//加载选举类型
}

  然后启动run方法

以上是关于zookeeper启动入口的主要内容,如果未能解决你的问题,请参考以下文章

ZooKeeper源码阅读心得分享+源码基本结构+源码环境搭建

ZooKeeper源码阅读心得分享+源码基本结构+源码环境搭建

ZooKeeper单机服务端的启动源码阅读

zookeeper源码分析-Processor保证数据一致性

zookeeper源码分析-Processor保证数据一致性

KafkaServer启动流程分析