大数据源码Hadoop源码解读 Namenode 启动加载FsImage的过程

Posted 笑起来贼好看

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据源码Hadoop源码解读 Namenode 启动加载FsImage的过程相关的知识,希望对你有一定的参考价值。

Namenode 启动

前言

NameNode是HDFS中负责元数据管理的组件,它保存着整个文件系统的元数据信息,并且充当着指挥调度DataNode的作用。NameNode不仅在内存中保存着文件系统元数据信息,还会定期将文件系统的元数据(文件目录树、文件/ 目录元信息) 持久化到本地 fsImage 文件中, 以防止Namenode掉电或者进程异常崩溃。
如果Namenode实时地将内存中的元数据同步到fsimage文件中, 将会非常消耗资源且造成Namenode运行缓慢。 所以Namenode会先将元数据的修改操作保存在editlog文件中, 然后定期合并fsimage和editlog文件

NameNode.main() // 入口函数
       |——createNameNode(); // 通过new NameNode()进行实例化
         |——initialize(); // 方法进行初始化操作
           |——startHttpServer(); // 启动HttpServer
           |——loadNamesystem(); // 加载元数据
           |——createRpcServer(); // 创建并初始化rpc server实例
           |——startCommonServices();
             |——namesystem.startCommonServices(); // 启动一些磁盘检查、安全模式等一些后台服务及线程
               |——new NameNodeResourceChecker(); // 实例化一个NameNodeResourceChecker并准备出所有需要检查的磁盘路径
               |——checkAvailableResources(); // 开始磁盘空间检查
               |——NameNode.getStartupProgress(); // 获取StartupProgress实例用来获取NameNode各任务的启动信息
               |——setBlockTotal(); // 设置所有的block,用于后面判断是否进入安全模式
               |——blockManager.activate(); // 启动BlockManager里面的一堆关于block副本处理的后台线程
             |——rpcServer.start(); // 启动rpcServer
       |——join()

启动 Namenode 组件

启动脚本

bin/hdfs --daemon start namenode

上述脚本会 调用 org.apache.hadoop.hdfs.server.namenode.NameNode的main方法

public static void main(String argv[]) throws Exception 
    // ...省略
    // 创建 namenode
    NameNode namenode = createNameNode(argv, null);
    // ...省略

创建 NameNode 的过程,先进行参数解析,判断是 format / rollback / bootstrapStandby 等等操作类型。然后依次执行改逻辑,本章节主要是正对NameNode的启动过程。

依次往下阅读 ,进入到 NameNode的构造函数中 调用 initialize(getConf());

Namenode.initialize

此方法主要做了如下事项:

  • 配置安全相关的信息UserGroupInformation
  • 启动 JvmPauseMonitor 检查
  • 启动 HTTP 服务端 (9870)
  • 初始化FSNameSystem 核心组件 ,加载镜像文件和编辑日志到内存
  • 初始化rpc server 组件
  • 启动 公共 服务
protected void initialize(Configuration conf) throws IOException 
    if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) 
      String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
      if (intervals != null) 
        conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
          intervals);
      
    

    UserGroupInformation.setConfiguration(conf);
    loginAsNameNodeUser(conf);

    NameNode.initMetrics(conf, this.getRole());
    StartupProgressMetrics.register(startupProgress);

    pauseMonitor = new JvmPauseMonitor();
    pauseMonitor.init(conf);
    pauseMonitor.start();
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

    if (conf.getBoolean(DFS_NAMENODE_GC_TIME_MONITOR_ENABLE,
        DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT)) 
      long observationWindow = conf.getTimeDuration(
          DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS,
          DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS_DEFAULT,
          TimeUnit.MILLISECONDS);
      long sleepInterval = conf.getTimeDuration(
          DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS,
          DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS_DEFAULT,
          TimeUnit.MILLISECONDS);
      gcTimeMonitor = new Builder().observationWindowMs(observationWindow)
          .sleepIntervalMs(sleepInterval).build();
      gcTimeMonitor.start();
      metrics.getJvmMetrics().setGcTimeMonitor(gcTimeMonitor);
    

    if (NamenodeRole.NAMENODE == role) 
      // 启动 HTTP 服务端 (9870)
      startHttpServer(conf);
    

    // 初始化FSNameSystem 核心组件 ,加载镜像文件和编辑日志到内存
    loadNamesystem(conf);
    startAliasMapServerIfNecessary(conf);

    //初始化rpc server 组件
    rpcServer = createRpcServer(conf);

    initReconfigurableBackoffKey();

    if (clientNamenodeAddress == null) 
      // This is expected for MiniDFSCluster. Set it now using 
      // the RPC server's bind address.
      clientNamenodeAddress = 
          NetUtils.getHostPortString(getNameNodeAddress());
      LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
          + " this namenode/service.");
    
    // 如果是NameNode 设置NameNodeAddress 以及  FsImage
    if (NamenodeRole.NAMENODE == role) 
      httpServer.setNameNodeAddress(getNameNodeAddress());
      httpServer.setFSImage(getFSImage());
      if (levelDBAliasMapServer != null) 
        httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());
      
    

    // 一些公共服务的初始化
    startCommonServices(conf);
    startMetricsLogger(conf);
  

FSNamesystem.loadFromDisk

接着会调用 FSNamesystem 静态方法 loadFromDisk 加载 元数据

static FSNamesystem loadFromDisk(Configuration conf) throws IOException 

    checkConfiguration(conf);
    // 构建FSImage,从磁盘加载
    // FSImage 就是一个时间点的元数据快照信息,其实也就是元数据信息
    // FSNameSystem.getNamespaceDirs(conf) 获取元数据的目录
    // file://$hadoop.tmp.dir/dfs/name  $hadoop.tmp.dir:/tmp/hadoop-$user.name
    // 可以自己观察下启动的namenode进程的这个目录是否和这个匹配
    FSImage fsImage = new FSImage(conf,
        FSNamesystem.getNamespaceDirs(conf),
            //FSNamesystem.getNamespaceEditsDirs(conf)) 获取edits log 的目录
            // 默认情况下 edis log 和namespace 是在同一个目录下,可以进去看下配置信息
        FSNamesystem.getNamespaceEditsDirs(conf));
    // 实例化 FSnamesystem 对象,将fsImage对象放入到了FSNamesystem中
    FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
    StartupOption startOpt = NameNode.getStartupOption(conf);
    if (startOpt == StartupOption.RECOVER) 
      namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
    

    long loadStart = monotonicNow();
    try 
      // 这里就是说通过FSNamesystem 将 fsImage 以及 edits log 加载到内存中
      // 然后在内存中合并两个文件,形成新的 fsImage 信息
      //(注:默认情况下 每隔一段时间 就会有checkpoint 将旧的 fsImage 与 edits log
      // 就行合并形成新的 fsImage 文件,启动的时候肯定也需要合并才能形成新的 fsImage 文件 对吧)
      // 最后再内存中持有一份完整的元数据信息
      namesystem.loadFSImage(startOpt);
     catch (IOException ioe) 
      LOG.warn("Encountered exception loading fsimage", ioe);
      fsImage.close();
      throw ioe;
    
    long timeTakenToLoadFSImage = monotonicNow() - loadStart;
    LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
    NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
    if (nnMetrics != null) 
      nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
    
    namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());
    return namesystem;
  

内存结构

FSNameSystem这里采用了门面模式,自己不负责具体的加载逻辑,交给FsImage去干活。
继续看FsImage.recoverTransitionRead(StartupOption startOpt, FSNamesystem target,MetaRecoveryContext recovery)

FsImage.recoverTransitionRead

主要做了四件事:

  • 检查每个数据目录,判断是否状态一致性
  • Format unformatted dirs. 格式化未格式的目录
  • Do transitions 转换操作
  • 真正的加载 fsImage 和 edits log 文件进行合并
/**
   * Analyze storage directories.
   * Recover from previous transitions if required. 
   * Perform fs state transition if necessary depending on the namespace info.
   * Read storage info.
   * 分析存储的目录 就是存储 fsImage 以及 edits log 的目录
   * 从以前的状态恢复
   * 根据元信息 判断是否执行fs状态的转换
   * 读取存储的信息
   * 大概意思就是如果以前有 fsImage 和 edits log 就从文件信息中加载出来 并进行恢复
   * 
   * @throws IOException
   * @return true if the image needs to be saved or false otherwise
   */
  boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
      MetaRecoveryContext recovery)
      throws IOException 
    assert startOpt != StartupOption.FORMAT : 
      "NameNode formatting should be performed before reading the image";

    // 获取fsImage 文件资源地址 其实也就是目录
    Collection<URI> imageDirs = storage.getImageDirectories();

    // 获取edits log 目录
    Collection<URI> editsDirs = editLog.getEditURIs();

    // none of the data dirs exist
    if((imageDirs.size() == 0 || editsDirs.size() == 0) 
                             && startOpt != StartupOption.IMPORT)  
      throw new IOException(
          "All specified directories are not accessible or do not exist.");
    
    // 1. For each data directory calculate its state and 
    // check whether all is consistent before transitioning.
    // 检查每个数据目录,判断是否状态一致性
    // 进行数据恢复 里面就是对一些之前停机的时候 更新 回滚 新增数据的恢复操作
    Map<StorageDirectory, StorageState> dataDirStates = 
             new HashMap<StorageDirectory, StorageState>();
    boolean isFormatted = recoverStorageDirs(startOpt, storage, dataDirStates);

    if (LOG.isTraceEnabled()) 
      LOG.trace("Data dir states:\\n  " +
        Joiner.on("\\n  ").withKeyValueSeparator(": ")
        .join(dataDirStates));
    
    
    if (!isFormatted && startOpt != StartupOption.ROLLBACK 
                     && startOpt != StartupOption.IMPORT) 
      throw new IOException("NameNode is not formatted.");      
    


    int layoutVersion = storage.getLayoutVersion();
    if (startOpt == StartupOption.METADATAVERSION) 
      System.out.println("HDFS Image Version: " + layoutVersion);
      System.out.println("Software format version: " +
        HdfsServerConstants.NAMENODE_LAYOUT_VERSION);
      return false;
    

    if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) 
      NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
    
    if (startOpt != StartupOption.UPGRADE
        && startOpt != StartupOption.UPGRADEONLY
        && !RollingUpgradeStartupOption.STARTED.matches(startOpt)
        && layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
        && layoutVersion != HdfsServerConstants.NAMENODE_LAYOUT_VERSION) 
      throw new IOException(
          "\\nFile system image contains an old layout version " 
          + storage.getLayoutVersion() + ".\\nAn upgrade to version "
          + HdfsServerConstants.NAMENODE_LAYOUT_VERSION + " is required.\\n"
          + "Please restart NameNode with the \\""
          + RollingUpgradeStartupOption.STARTED.getOptionString()
          + "\\" option if a rolling upgrade is already started;"
          + " or restart NameNode with the \\""
          + StartupOption.UPGRADE.getName() + "\\" option to start"
          + " a new upgrade.");
    

    // 执行一些启动选项以及一些二更操作
    storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);

    // 2. Format unformatted dirs.
    for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) 
      StorageDirectory sd = it.next();
      StorageState curState = dataDirStates.get(sd);
      switch(curState) 
      case NON_EXISTENT:
        throw new IOException(StorageState.NON_EXISTENT + 
                              " state cannot be here");
      case NOT_FORMATTED:
        // Create a dir structure, but not the VERSION file. The presence of
        // VERSION is checked in the inspector's needToSave() method and
        // saveNamespace is triggered if it is absent. This will bring
        // the storage state uptodate along with a new VERSION file.
        // If HA is enabled, NNs start up as standby so saveNamespace is not
        // triggered.
        LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
        LOG.info("Formatting ...");
        sd.clearDirectory(); // create empty current dir
        // For non-HA, no further action is needed here, as saveNamespace will
        // take care of the rest.
        if (!target.isHaEnabled()) 
          continue;
        
        // If HA is enabled, save the dirs to create a version file later when
        // a checkpoint image is saved.
        if (newDirs == null) 
          newDirs = new HashSet<StorageDirectory>();
        
        newDirs.add(sd);
        break;
      default:
        break;
      
    

    // 3. Do transitions
    switch(startOpt) 
    case UPGRADE:
    case UPGRADEONLY:
      doUpgrade(target);
      return false; // upgrade saved image already
    case IMPORT:
      doImportCheckpoint(target);
      return false; // import checkpoint saved image already
    case ROLLBACK:
      throw new AssertionError("Rollback is now a standalone command, " +
          "NameNode should not be starting with this option.");
    case REGULAR:
    default:
      // just load the image
    

    // 真正的加载 fsImage 和 edits log 文件进行合并
    return loadFSImage(target, startOpt, recovery);
  

因为FsImage load重载比较多,故直接到关键步骤。
此方法中判断是否支持 md5 加密

/**
   *
   * 这里面就不仔细去详细的跟到文件加载了,
   * loadFSImage()方法就是最终加载文件的方法
   * @param target
   * @param recovery
   * @param imageFile
   * @param startupOption
   * @throws IOException
   */
  void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery,
      FSImageFile imageFile, StartupOption startupOption) throws IOException 
    LOG.info("Planning to load image: " + imageFile);
    StorageDirectory sdForProperties = imageFile.sd;
    storage.readProperties(sdForProperties, startupOption);

    if (NameNodeLayoutVersion.supports(
        LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) 
      // For txid-based layout, we should have a .md5 file
      // next to the image file
      boolean isRollingRollback = RollingUpgradeStartupOption.ROLLBACK
          .matches(startupOption);
      loadFSImage(imageFile.getFile(), target, recovery, isRollingRollback);
     else if (NameNodeLayoutVersion.supports(
        LayoutVersion.Feature.FSIMAGE_CHECKSUM, getLayoutVersion())) 
      // In 0.22, we have the checksum stored in the VERSION file.
      String md5 = storage.getDeprecatedProperty(
          NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY);
      if (md5 == null) 
        throw new InconsistentFSStateException(sdForProperties.getRoot(),
            "Message digest property " +
            NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
            " not set for storage directory " + sdForProperties.getRoot());
      
      loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery,
          false);
     else 
      // We don't have any record of the md5sum
      loadFSImage(imageFile.getFile(), null, target, recovery, false);
    
  

调用 FSImageFormat的LoaderDelegator进行加载fsImage文件

FSImageFormat.load

private void loadFSImage(File curFile, MD5Hash expectedMd5,
      FSNamesystem target, MetaRecoveryContext recovery,
      boolean requireSameLayoutVersion) throws IOException 
    // BlockPoolId is required when the FsImageLoader loads the rolling upgrade
    // information. Make sure the ID is properly set.
    target.setBlockPoolId(this.getBlockPoolID());

    // 一个持有FSNamesystem 以及 conf 对象的 loader,加载器
    FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
    loader.load(curFile, requireSameLayoutVersion);

    // Check that the image digest we loaded matches up with what
    // we expected
    MD5Hash readImageMd5 = loader.getLoadedImageMd5();
    if (expectedMd5 != null &&
        !expecte

以上是关于大数据源码Hadoop源码解读 Namenode 启动加载FsImage的过程的主要内容,如果未能解决你的问题,请参考以下文章

大数据实战之Centos搭建完全分布式Hadoop集群

大数据分析师实操练习(Hadoop完全分布式集群搭建)

Hadoop源码分析之NameNode的目录构成与类继承结构

Hadoop源码分析-HDFS写数据之创建packet

Hadoop源码学习之HDFS

Hadoop源码篇---解读Mapprer源码outPut输出