    2).HRegionServer构造函数主要是对做了一些校验,如果启用了Kerberos先进行进行了登录,启动zookeper对于hbase znode的监听,获取master对应znode节点变化,同时启动对于集群状态变化的监听;


    4).最后调用run函数,循环向zk中注册成为active Master,一直阻塞循环直到写入成功为止,成功后进入finishInitialization()函数并初始化master。



(start) check_before_start hbase_rotate_log $HBASE_LOGOUT hbase_rotate_log $HBASE_LOGGC echo running $command, logging to $HBASE_LOGOUT $thiscmd --config "${HBASE_CONF_DIR}" \ foreground_start $command $args < /dev/null > ${HBASE_LOGOUT} 2>&1 & disown -h -r sleep 1; head "${HBASE_LOGOUT}" ;;


 /** * @see org.apache.hadoop.hbase.master.HMasterCommandLine */ public static void main(String [] args) { LOG.info("STARTING service " + HMaster.class.getSimpleName());    //这里没啥东西  就是打印版本信息 VersionInfo.logVersion();    //实例化一个HMasterCommandLine对象,执行该对象的doMain(args)方法 new HMasterCommandLine(HMaster.class).doMain(args); }


 public void doMain(String args[]) { try {    //通过ToolRunner机制执行启动/停止等命令 int ret = ToolRunner.run(HBaseConfiguration.create(), this, args); if (ret != 0) { System.exit(ret); } } catch (Exception e) { LOG.error("Failed to run", e); System.exit(-1); } }


public static int run(Configuration conf, Tool tool, String[] args)  throws Exception{ if(conf == null) { conf = new Configuration(); }    //封装成了GenericOptionsParser 对象,然后调用HMasterCommandLine的run函数启动master GenericOptionsParser parser = new GenericOptionsParser(conf, args); //set the configuration back, so that Tool can configure itself    tool.setConf(conf);   //get the args w/o generic hadoop args String[] toolArgs = parser.getRemainingArgs(); return tool.run(toolArgs); }


@Override public int run(String args[]) throws Exception { Options opt = new Options(); opt.addOption("localRegionServers", true, "RegionServers to start in master process when running standalone"); opt.addOption("masters", true, "Masters to start in this process"); opt.addOption("minRegionServers", true, "Minimum RegionServers needed to host user tables"); opt.addOption("backup", false, "Do not try to become HMaster until the primary fails");
    CommandLine cmd; //获取cmd 其实就一个start 参数 try { cmd = new GnuParser().parse(opt, args); } catch (ParseException e) { LOG.error("Could not parse: ", e); usage(null); return 1; }    //下面就是一些判断了,直接跳过 下面代码省略掉一部分判断,各种判断 if (cmd.hasOption("minRegionServers")) {    ..................................... @SuppressWarnings("unchecked") List<String> remainingArgs = cmd.getArgList(); if (remainingArgs.size() != 1) { usage(null); return 1; }
String command = remainingArgs.get(0);
if ("start".equals(command)) {    //  跳了一圈最后执行的是这个startMaster()函数 return startMaster(); } else if ("stop".equals(command)) { return stopMaster(); } else if ("clear".equals(command)) { return (ZNodeClearer.clear(getConf()) ? 0 : 1); } else { usage("Invalid command: " + command); return 1; } }


private int startMaster() {    //初始化conf配置 Configuration conf = getConf();    TraceUtil.initTracer(conf); try { // If 'local', defer to LocalHBaseCluster instance. Starts master // and regionserver both in the one JVM.      //如果是本地默认,这master和regionserver在一个节点上启动,我们是集群模式 直接跳过 if (LocalHBaseCluster.isLocal(conf)) { ............................. } else {       //直接看这里,打印配置信息到日志里面 logProcessInfo(getConf());        //这里构造一个HMaster线程,由于Hmaster继承自HRegionser类,这里会构造HRegionser,执行初始化 HMaster master = HMaster.constructMaster(masterClass, conf); if (master.isStopped()) { LOG.info("Won't bring the Master up as a shutdown is requested"); return 1; }        //调用start 和join方法 master.start(); master.join(); if(master.isAborted()) throw new RuntimeException("HMaster Aborted"); } } catch (Throwable t) { LOG.error("Master exiting", t); return 1; } return 0; }


public HRegionServer(Configuration conf) throws IOException { super("RegionServer"); // thread name TraceUtil.initTracer(conf); try { this.startcode = System.currentTimeMillis(); this.conf = conf; this.fsOk = true;      //下面都是一些校验 this.masterless = conf.getBoolean(MASTERLESS_CONFIG_NAME, false); this.eventLoopGroupConfig = setupNetty(this.conf);      //检查是否有足够的内存分配给Memstore和Block Cache使用      //memstore 默认分配40%的内存给Memstore 由参数hbase.regionserver.global.memstore.size控制      //block cache也是默认分配40% 由参数hfile.block.cache.size控制      //就是说memstore和Block cache 内存不能大于80%  否则报错 MemorySizeUtil.checkForClusterFreeHeapMemoryLimit(this.conf);      //文件格式通过hfile.format.version配置。老版本是2  现在是3 只能是2 和3 HFile.checkHFileVersion(this.conf); checkCodecs(this.conf); this.userProvider = UserProvider.instantiate(conf);      //checksum校验 一般设置为flase不校验 FSUtils.setupShortCircuitRead(this.conf);
// Disable usage of meta replicas in the regionserver this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); // Config'ed params      //重试次数 this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);       //regionserver合并周期 this.compactionCheckFrequency = conf.getInt(PERIOD_COMPACTION, this.threadWakeFrequency); this.flushCheckFrequency = conf.getInt(PERIOD_FLUSH, this.threadWakeFrequency); this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
this.sleeper = new Sleeper(this.msgInterval, this);
boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true); this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;
this.numRegionsToReport = conf.getInt("hbase.regionserver.numregionstoreport", 10);
this.abortRequested = false; this.stopped = false;
//这里比较重要 调用createRpcService生成RSRpcServices对象      //在构造RpcServer对象的过程中,HMaster和HRegionServer分别创建rpcserver服务      // 以使HMaster和HRegionServer响应不同的rpc服务 rpcServices = createRpcServices(); useThisHostnameInstead = getUseThisHostnameInstead(conf); String hostName = StringUtils.isBlank(useThisHostnameInstead) ? this.rpcServices.isa.getHostName() : this.useThisHostnameInstead;      //根据主机名 端口和 启动时间确定服务名       //格式大约是这样的master.hadoop.ljs:16000:时间戳 serverName = ServerName.valueOf(hostName, this.rpcServices.isa.getPort(), this.startcode);
rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
// login the zookeeper client principal (if using security)      //这里只有在开启了kerberos的安全集群才会进行zookeeper的登录      //非安全集群这里忽略 ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); // login the server principal (if using secure Hadoop) //这里也一样 非安全集群 可忽略 login(userProvider, hostName); // init superusers and add the server principal (if using security) // or process owner as default super user. Superusers.initialize(conf);      //实例化RegionServerAccounting 它用来记录此rs中所有的memstore所占大小的实例 regionServerAccounting = new RegionServerAccounting(conf);      //HBase2.0中Master节点可以有表 默认为false boolean isMasterNotCarryTable = this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf);
      // isMasterNotCarryTable为false 需要创建block cache缓存和 MOB缓存 if (!isMasterNotCarryTable) { blockCache = BlockCacheFactory.createBlockCache(conf); mobFileCache = new MobFileCache(conf); }
uncaughtExceptionHandler = new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { abort("Uncaught exception in executorService thread " + t.getName(), e); } };      //获取HBase在hdfs上的各个存储目录 比如WAL预写日志 数据存储路径等 initializeFileSystem(); //hbase-site.xml中读取span RegionServer参数指标 spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
this.configurationManager = new ConfigurationManager(); setupWindows(getConfiguration(), getConfigurationManager());
// Some unit tests don't need a cluster, so no zookeeper at all      if (!conf.getBoolean("hbase.testing.nocluster"false)) {        //这里获取zookeeper连接,并启动hbase znode节点的监听 zooKeeper = new ZKWatcher(conf, getProcessName() + ":" + rpcServices.isa.getPort(), this, canCreateBaseZNode()); // If no master in cluster, skip trying to track one or look for a cluster status. if (!this.masterless) { if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { this.csm = new ZkCoordinatedStateManager(this); }          //根据zookeeper相关信息,设置MasterAddressTracker 构造一个Master地址的监听器 masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);         //启动对于Hbase znode对应的监听, masterAddressTracker.start();          //创建一个对集群状态的监听 clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); //启动监听 clusterStatusTracker.start(); } else { masterAddressTracker = null; clusterStatusTracker = null; } } else { zooKeeper = null; masterAddressTracker = null; clusterStatusTracker = null; }      //启动rpc  等待regionserver端和客户端的请求 this.rpcServices.start(zooKeeper); // This violates 'no starting stuff in Constructor' but Master depends on the below chore // and executor being created and takes a different startup route. Lots of overlap between HRS // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super // Master expects Constructor to put up web servers. Ugh. // class HRS. TODO. this.choreService = new ChoreService(getName(), true); this.executorService = new ExecutorService(getName()); putUpWebUI(); } catch (Throwable t) { // Make sure we log the exception. HRegionServer is often started via reflection and the // cause of failed startup is lost. LOG.error("Failed construction RegionServer", t); throw t; } }


public HMaster(final Configuration conf) throws IOException, KeeperException { super(conf); TraceUtil.initTracer(conf); try { if (conf.getBoolean(MAINTENANCE_MODE, false)) { LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE); maintenanceMode = true; } else if (Boolean.getBoolean(MAINTENANCE_MODE)) { LOG.info("Detected {}=true via environment variables.", MAINTENANCE_MODE); maintenanceMode = true; } else { maintenanceMode = false; }      //存储regionserver异常的内存缓存 this.rsFatals = new MemoryBoundedLogMessageBuffer( conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); LOG.info("hbase.rootdir=" + getRootDir() + ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
// Disable usage of meta replicas in the master this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);       //这里修改conf配置,然后将replication相关特性写入conf中 decorateMasterConfiguration(this.conf);
// Hack! Maps DFSClient => Master for logs. HDFS made this // config param for task trackers, but we can piggyback off of it. if (this.conf.get("mapreduce.task.attempt.id") == null) { this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString()); } //实例化hbase的监控 this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
// preload table descriptor at startup this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
this.maxBlancingTime = getMaxBalancingTime(); this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT, HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
      // 集群状态发表。 比如当regionService 死了,要立即告知client ,不要用client等待socket回应了。     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED, HConstants.STATUS_PUBLISHED_DEFAULT); Class<? extends ClusterStatusPublisher.Publisher> publisherClass = conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS, ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS, ClusterStatusPublisher.Publisher.class);
if (shouldPublish) { if (publisherClass == null) { LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " + ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS + " is not set - not publishing status"); } else { clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass); getChoreService().scheduleChore(clusterStatusPublisherChore); } }
// Some unit tests don't need a cluster, so no zookeeper at all if (!conf.getBoolean("hbase.testing.nocluster", false)) {        //视图将Master信息写入到zookeeper中,这里构造函数会启动一个zookeeper  对Mater的znode的监听        //接收zookeeer的事件 this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this); } else { this.activeMasterManager = null; } } catch (Throwable t) { // Make sure we log the exception. HMaster is often started via reflection and the // cause of failed startup is lost. LOG.error("Failed construction of Master", t); throw t; } }


@Override public void run() { try { if (!conf.getBoolean("hbase.testing.nocluster", false)) { Threads.setDaemonThreadRunning(new Thread(() -> { try { int infoPort = putUpJettyServer(); startActiveMasterManager(infoPort); } catch (Throwable t) { // Make sure we log the exception. String error = "Failed to become Active Master"; LOG.error(error, t); // Abort should have been called already. if (!isAborted()) { abort(error, t); } } }), getName() + ":becomeActiveMaster"); } // Fall in here even if we have been aborted. Need to run the shutdown services and // the super run call will do this for us. super.run(); } finally { if (this.clusterSchemaService != null) { // If on way out, then we are no longer active master. this.clusterSchemaService.stopAsync(); try { this.clusterSchemaService.awaitTerminated( getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS, DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS); } catch (TimeoutException te) { LOG.warn("Failed shutdown of clusterSchemaService", te); } } this.activeMaster = false; } }



