hbase-2.1.0 源码阅读

Posted 大数据魔法师

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hbase-2.1.0 源码阅读相关的知识,希望对你有一定的参考价值。

1.Hmaster启动

用了这么久的hbase,今天开始着手hbase方面的源码阅读

2.1.0版本刚发布不久,是Hbase 2.x系列的第二次版本。旨在提高 HBase 的稳定性和可靠性,主要更新内容如下:

基于 Procedure v2 的复制对等修改

串行复制

最小 Hadoop 版本已更改为 2.7.1

成功完成从 1.4.3 到 2.1.0 的滚动升级,这表明可以从 1.x 滚动升级到 2.x


http://mail-archives.apache.org/mod_mbox/www-announce/201807.mbox/<CAAAYAnOObYM9b-6Etqpuxti-FTkC_dhiTD8Du+WZ2vC=L5E4Yg@mail.gmail.com>


a.我们如何启动hbase master

linxu 系统下


bin/hbase-daemon.sh start master

1

我们来分析这里发生了什么,追本溯源

在脚本hbase-daemon.sh中



case $startStop in


(start)

    check_before_start

    hbase_rotate_log $HBASE_LOGOUT

    hbase_rotate_log $HBASE_LOGGC

    echo running $command, logging to $HBASE_LOGOUT

    $thiscmd --config "${HBASE_CONF_DIR}" \

        foreground_start $command $args < /dev/null > ${HBASE_LOGOUT} 2>&1  &

    disown -h -r

    sleep 1; head "${HBASE_LOGOUT}"

  ;;


这里发现通过start的时候 调用了foreground_start 来启动


(foreground_start)

    trap cleanAfterRun SIGHUP SIGINT SIGTERM EXIT

    if [ "$HBASE_NO_REDIRECT_LOG" != "" ]; then

        # NO REDIRECT

        echo "`date` Starting $command on `hostname`"

        echo "`ulimit -a`"

        # in case the parent shell gets the kill make sure to trap signals.

        # Only one will get called. Either the trap or the flow will go through.

        nice -n $HBASE_NICENESS "$HBASE_HOME"/bin/hbase \

            --config "${HBASE_CONF_DIR}" \

            $command "$@" start &

    else

        echo "`date` Starting $command on `hostname`" >> ${HBASE_LOGLOG}

        echo "`ulimit -a`" >> "$HBASE_LOGLOG" 2>&1

        # in case the parent shell gets the kill make sure to trap signals.

        # Only one will get called. Either the trap or the flow will go through.

        nice -n $HBASE_NICENESS "$HBASE_HOME"/bin/hbase \

            --config "${HBASE_CONF_DIR}" \

            $command "$@" start >> ${HBASE_LOGOUT} 2>&1 &

    fi

    # Add to the command log file vital stats on our environment.

    hbase_pid=$!

    echo $hbase_pid > ${HBASE_PID}

    wait $hbase_pid

  ;;


可以看到最终调用了"$HBASE_HOME"/bin/hbase 脚本来start,我们继续查看hbase脚本


elif [ "$COMMAND" = "master" ] ; then

  CLASS='org.apache.hadoop.hbase.master.HMaster'

  if [ "$1" != "stop" ] && [ "$1" != "clear" ] ; then

    HBASE_OPTS="$HBASE_OPTS $HBASE_MASTER_OPTS"

  fi


匹配COMMAND为master时 CLASS 为 org.apache.hadoop.hbase.master.HMaster

最后通过java 命令启动


if [ "${HBASE_NOEXEC}" != "" ]; then

  "$JAVA" -Dproc_$COMMAND -XX:OnOutOfMemoryError="kill -9 %p" $HEAP_SETTINGS $HBASE_OPTS $CLASS "$@"

else

  exec "$JAVA" -Dproc_$COMMAND -XX:OnOutOfMemoryError="kill -9 %p" $HEAP_SETTINGS $HBASE_OPTS $CLASS "$@"

fi


我们通过jps -v -m -l |grep HMaster查看对应的启动命令 (这里是HBase 1.2.4)


8984 org.apache.hadoop.hbase.master.HMaster start -Dproc_master -XX:OnOutOfMemoryError=kill -9 %p -Xmx2G -Duser.timezone=Asia/Shanghai -Xss256k -XX:PermSize=128m -XX:MaxPermSize=128m -Xmx24g -Xms24g -Xmn4G -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:ParallelCMSThreads=8 -XX:+CMSClassUnloadingEnabled -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+HeapDumpOnOutOfMemoryError -XX:+UseCMSInitiatingOccupancyOnly -XX:MaxTenuringThreshold=4 -XX:SurvivorRatio=4 -verbose:gc -Xloggc:logs/gc_log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -XX:+PrintTenuringDistribution -XX:+PrintCommandLineFlags -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1024M -XX:PermSize=256m -XX:MaxPermSize=256m -Dhbase.log.dir=/data/hbase_home/logs -Dhbase.log.file=hbase-wifi-master-hadoop1.log -Dhbase.home.dir=/data/hbase_home -Dhbase.id.str=hadoop  -Dhbase.root.logger=INFO,RFA -Djava.library.path=/data/hadoop-2.6.4/lib/native -Dhbase.security.logger=INFO,RFAS

1

下来我们打开org.apache.hadoop.hbase.master.HMaster看看

主函数入口


 public static void main(String [] args) {

    LOG.info("STARTING service " + HMaster.class.getSimpleName());

    VersionInfo.logVersion();

    new HMasterCommandLine(HMaster.class).doMain(args);

  }


可以发现通过调用HMasterCommandLine的doMain方法来启动

解析命令行参数并运行


/**

   * Parse and run the given command line. This may exit the JVM if

   * a nonzero exit code is returned from <code>run()</code>.

   */

  public void doMain(String args[]) {

    try {

      int ret = ToolRunner.run(HBaseConfiguration.create(), this, args);

      if (ret != 0) {

        System.exit(ret);

      }

    } catch (Exception e) {

      LOG.error("Failed to run", e);

      System.exit(-1);

    }

  }



进入run方法发现,他调用的是org.apache.hadoop.util中的ToolRunner,这里的this将HMasterCommandLine对象实现的run方法传递进去,以达到调用的效果


public class ToolRunner {

    public ToolRunner() {

    }


    public static int run(Configuration conf, Tool tool, String[] args) throws Exception {

        if (conf == null) {

            conf = new Configuration();

        }


        GenericOptionsParser parser = new GenericOptionsParser(conf, args);

        tool.setConf(conf);

        String[] toolArgs = parser.getRemainingArgs();

        return tool.run(toolArgs);

    }

    



最后调用了 tool.run,我们发现 doMain方法内会调用ToolRunner的run方法,HMasterCommandLine继承自ServerCommandLine类,ServerCommandLine类实现了Tool接口。所以最后会调用HMasterCommandLine的run方法。下面我们看看HMasterCommandLine的run方法


public int run(String args[]) throws Exception {

    Options opt = new Options();

    opt.addOption("localRegionServers", true,

      "RegionServers to start in master process when running standalone");

    opt.addOption("masters", true, "Masters to start in this process");

    opt.addOption("minRegionServers", true, "Minimum RegionServers needed to host user tables");

    opt.addOption("backup", false, "Do not try to become HMaster until the primary fails");


    CommandLine cmd;

    try {

      cmd = new GnuParser().parse(opt, args);

    } catch (ParseException e) {

      LOG.error("Could not parse: ", e);

      usage(null);

      return 1;

    }



    if (cmd.hasOption("minRegionServers")) {

      String val = cmd.getOptionValue("minRegionServers");

      getConf().setInt("hbase.regions.server.count.min",

                  Integer.parseInt(val));

      LOG.debug("minRegionServers set to " + val);

    }


    // minRegionServers used to be minServers.  Support it too.

    if (cmd.hasOption("minServers")) {

      String val = cmd.getOptionValue("minServers");

      getConf().setInt("hbase.regions.server.count.min", Integer.parseInt(val));

      LOG.debug("minServers set to " + val);

    }


    // check if we are the backup master - override the conf if so

    if (cmd.hasOption("backup")) {

      getConf().setBoolean(HConstants.MASTER_TYPE_BACKUP, true);

    }


    // How many regionservers to startup in this process (we run regionservers in same process as

    // master when we are in local/standalone mode. Useful testing)

    if (cmd.hasOption("localRegionServers")) {

      String val = cmd.getOptionValue("localRegionServers");

      getConf().setInt("hbase.regionservers", Integer.parseInt(val));

      LOG.debug("localRegionServers set to " + val);

    }

    // How many masters to startup inside this process; useful testing

    if (cmd.hasOption("masters")) {

      String val = cmd.getOptionValue("masters");

      getConf().setInt("hbase.masters", Integer.parseInt(val));

      LOG.debug("masters set to " + val);

    }


    @SuppressWarnings("unchecked")

    List<String> remainingArgs = cmd.getArgList();

    if (remainingArgs.size() != 1) {

      usage(null);

      return 1;

    }


    String command = remainingArgs.get(0);


    if ("start".equals(command)) {

      return startMaster();

    } else if ("stop".equals(command)) {

      return stopMaster();

    } else if ("clear".equals(command)) {

      return (ZNodeClearer.clear(getConf()) ? 0 : 1);

    } else {

      usage("Invalid command: " + command);

      return 1;

    }

  }



这里的run干了什么?

一解析参数,二根据参数判断是start还是stop 执行对应的方法,我们来看一些startMaster


private int startMaster() {

    Configuration conf = getConf();

    TraceUtil.initTracer(conf);


    try {

      // If 'local', defer to LocalHBaseCluster instance.  Starts master

      // and regionserver both in the one JVM.

      if (LocalHBaseCluster.isLocal(conf)) {

        DefaultMetricsSystem.setMiniClusterMode(true);

        final MiniZooKeeperCluster zooKeeperCluster = new MiniZooKeeperCluster(conf);

        File zkDataPath = new File(conf.get(HConstants.ZOOKEEPER_DATA_DIR));


        // find out the default client port

        int zkClientPort = 0;


        // If the zookeeper client port is specified in server quorum, use it.

        String zkserver = conf.get(HConstants.ZOOKEEPER_QUORUM);

        if (zkserver != null) {

          String[] zkservers = zkserver.split(",");


          if (zkservers.length > 1) {

            // In local mode deployment, we have the master + a region server and zookeeper server

            // started in the same process. Therefore, we only support one zookeeper server.

            String errorMsg = "Could not start ZK with " + zkservers.length +

                " ZK servers in local mode deployment. Aborting as clients (e.g. shell) will not "

                + "be able to find this ZK quorum.";

              System.err.println(errorMsg);

              throw new IOException(errorMsg);

          }


          String[] parts = zkservers[0].split(":");


          if (parts.length == 2) {

            // the second part is the client port

            zkClientPort = Integer.parseInt(parts [1]);

          }

        }

        // If the client port could not be find in server quorum conf, try another conf

        if (zkClientPort == 0) {

          zkClientPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 0);

          // The client port has to be set by now; if not, throw exception.

          if (zkClientPort == 0) {

            throw new IOException("No config value for " + HConstants.ZOOKEEPER_CLIENT_PORT);

          }

        }

        zooKeeperCluster.setDefaultClientPort(zkClientPort);

        // set the ZK tick time if specified

        int zkTickTime = conf.getInt(HConstants.ZOOKEEPER_TICK_TIME, 0);

        if (zkTickTime > 0) {

          zooKeeperCluster.setTickTime(zkTickTime);

        }


        // login the zookeeper server principal (if using security)

        ZKUtil.loginServer(conf, HConstants.ZK_SERVER_KEYTAB_FILE,

          HConstants.ZK_SERVER_KERBEROS_PRINCIPAL, null);

        int localZKClusterSessionTimeout =

          conf.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster", 10*1000);

        conf.setInt(HConstants.ZK_SESSION_TIMEOUT, localZKClusterSessionTimeout);

        LOG.info("Starting a zookeeper cluster");

        int clientPort = zooKeeperCluster.startup(zkDataPath);

        if (clientPort != zkClientPort) {

          String errorMsg = "Could not start ZK at requested port of " +

            zkClientPort + ".  ZK was started at port: " + clientPort +

            ".  Aborting as clients (e.g. shell) will not be able to find " +

            "this ZK quorum.";

          System.err.println(errorMsg);

          throw new IOException(errorMsg);

        }

        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort));


        // Need to have the zk cluster shutdown when master is shutdown.

        // Run a subclass that does the zk cluster shutdown on its way out.

        int mastersCount = conf.getInt("hbase.masters", 1);

        int regionServersCount = conf.getInt("hbase.regionservers", 1);

        // Set start timeout to 5 minutes for cmd line start operations

        conf.setIfUnset("hbase.master.start.timeout.localHBaseCluster", "300000");

        LOG.info("Starting up instance of localHBaseCluster; master=" + mastersCount +

          ", regionserversCount=" + regionServersCount);

        LocalHBaseCluster cluster = new LocalHBaseCluster(conf, mastersCount, regionServersCount,

          LocalHMaster.class, HRegionServer.class);

        ((LocalHMaster)cluster.getMaster(0)).setZKCluster(zooKeeperCluster);

        cluster.startup();

        waitOnMasterThreads(cluster);

      } else {

        logProcessInfo(getConf());

        HMaster master = HMaster.constructMaster(masterClass, conf);

        if (master.isStopped()) {

          LOG.info("Won't bring the Master up as a shutdown is requested");

          return 1;

        }

        master.start();

        master.join();

        if(master.isAborted())

          throw new RuntimeException("HMaster Aborted");

      }

    } catch (Throwable t) {

      LOG.error("Master exiting", t);

      return 1;

    }

    return 0;

  }




根据配置判断启动那种模式的master,可以看到有两种模式,本地模式和分布式模式。如果是分布式模式,通过反射调用HMaster的构造方法,并调用其start和join方法。如果是本地模式,将会运行一个LocalHBaseCluster,包括:LocalHMaster、HRegionServer、MiniZooKeeperCluster (Starts master and regionserver both in the one JVM)

下面看看Hmaster构造


public static HMaster constructMaster(Class<? extends HMaster> masterClass,

      final Configuration conf)  {

    try {

      Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class);

      return c.newInstance(conf);

    } catch(Exception e) {

      Throwable error = e;

      if (e instanceof InvocationTargetException &&

          ((InvocationTargetException)e).getTargetException() != null) {

        error = ((InvocationTargetException)e).getTargetException();

      }

      throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "

        , error);

    }

  }



通过反射构造Hmaster,java 反射机制 通过 getConstructor获取有参数构造函数,然后newInstance执行有参数的构造函数 Hmaster 初始化启动


以上是关于hbase-2.1.0 源码阅读的主要内容,如果未能解决你的问题,请参考以下文章

Netty源码阅读计划——阅读环境搭建

问答形式阅读jQuery源码

Flink源码阅读(11)--- Flink Task重启时机

Tornado源码阅读(一) --- IOLoop之创建ioloop

下载和阅读Android源码

如何阅读jdk源码?