大数据HadoopHDFS-Namenode-bootstrapStandby同步元数据的源码步骤分析

Posted 笑起来贼好看

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据HadoopHDFS-Namenode-bootstrapStandby同步元数据的源码步骤分析相关的知识,希望对你有一定的参考价值。

Namenode -bootstrapStandby

流程

  1. 根据配置项获取nameserviceId、namenodeId
  2. 获取其他的 namenode 信息,建立rpc通信。
  3. 判断配置项dfs.namenode.support.allow.format是否允许格式化,一般生产环境建议配置,防止误操作格式化了已有数据。
  4. 获取格式化的目录(fsImage和edits的存储目录,还有sharedEditsDirs配置)。
  5. format目录,创建current目录,写VERSION文件和seen_txid文件
  6. 从qjm中检验上一次checkpoint到最近的curtxid中间的editlog文件是否存在。
  7. 从远端namenode下载最近一次checkpoint产生的fsImage文件
  8. 整个过程格式化完毕。

同步元数据命令

hdfs namenode [-bootstrapStandby [-force] [-nonInteractive] [-skipSharedEditsCheck] ]


# 常用的命令
hdfs namenode -bootstrapStandby

源码解读

配置解析

入口org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby.run方法
此步骤做了如下操作:

  • 获取集群配置信息
  • 找到远端Namenode,获取第一个
  • 校验是否可以格式化
  • 调用具体同步的流程
  public int run(String[] args) throws Exception 
    // 解析命令行参数
    parseArgs(args);
    // Disable using the RPC tailing mechanism for bootstrapping the standby
    // since it is less efficient in this case; see HDFS-14806
    conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
    // 解析配置,获取集群信息,找到remoteNN
    parseConfAndFindOtherNN();
    NameNode.checkAllowFormat(conf);

    InetSocketAddress myAddr = DFSUtilClient.getNNAddress(conf);
    SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
        DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, myAddr.getHostName());

    return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() 
      @Override
      public Integer run() 
        try 
          // 执行 同步元数据
          return doRun();
         catch (IOException e) 
          throw new RuntimeException(e);
        
      
    );
  

同步元数据

执行 doRun 的时候,里面集成了整个流程,主要做了如下事项:

  • 创建remoteNN的代理对象
  • format目录文件,创建VERSION/seen_txid文件
  • 准备下载fsImage
private int doRun() throws IOException 
    // find the active NN
    NamenodeProtocol proxy = null;
    NamespaceInfo nsInfo = null;
    boolean isUpgradeFinalized = false;
    RemoteNameNodeInfo proxyInfo = null;
    // 整个一大段就是在创建nn的代理对象。通过循环,找到第一个符合要求的。
    for (int i = 0; i < remoteNNs.size(); i++) 
      proxyInfo = remoteNNs.get(i);
      InetSocketAddress otherIpcAddress = proxyInfo.getIpcAddress();
      proxy = createNNProtocolProxy(otherIpcAddress);
      try 
        // Get the namespace from any active NN. If you just formatted the primary NN and are
        // bootstrapping the other NNs from that layout, it will only contact the single NN.
        // However, if there cluster is already running and you are adding a NN later (e.g.
        // replacing a failed NN), then this will bootstrap from any node in the cluster.
        nsInfo = proxy.versionRequest();
        isUpgradeFinalized = proxy.isUpgradeFinalized();
        break;
       catch (IOException ioe) 
        LOG.warn("Unable to fetch namespace information from remote NN at " + otherIpcAddress
            + ": " + ioe.getMessage());
        if (LOG.isDebugEnabled()) 
          LOG.debug("Full exception trace", ioe);
        
      
    

    if (nsInfo == null) 
      LOG.error(
          "Unable to fetch namespace information from any remote NN. Possible NameNodes: "
              + remoteNNs);
      return ERR_CODE_FAILED_CONNECT;
    
	// 判断layout,目前是-66
    if (!checkLayoutVersion(nsInfo)) 
      LOG.error("Layout version on remote node (" + nsInfo.getLayoutVersion()
          + ") does not match " + "this node's layout version ("
          + HdfsServerConstants.NAMENODE_LAYOUT_VERSION + ")");
      return ERR_CODE_INVALID_VERSION;
    
	// 打印集群信息
    System.out.println(
        "=====================================================\\n" +
        "About to bootstrap Standby ID " + nnId + " from:\\n" +
        "           Nameservice ID: " + nsId + "\\n" +
        "        Other Namenode ID: " + proxyInfo.getNameNodeID() + "\\n" +
        "  Other NN's HTTP address: " + proxyInfo.getHttpAddress() + "\\n" +
        "  Other NN's IPC  address: " + proxyInfo.getIpcAddress() + "\\n" +
        "             Namespace ID: " + nsInfo.getNamespaceID() + "\\n" +
        "            Block pool ID: " + nsInfo.getBlockPoolID() + "\\n" +
        "               Cluster ID: " + nsInfo.getClusterID() + "\\n" +
        "           Layout version: " + nsInfo.getLayoutVersion() + "\\n" +
        "       isUpgradeFinalized: " + isUpgradeFinalized + "\\n" +
        "=====================================================");
    // 创建待格式化的存储对象
    NNStorage storage = new NNStorage(conf, dirsToFormat, editUrisToFormat);

    if (!isUpgradeFinalized) 
      //...省略升级相关部分代码
     else if (!format(storage, nsInfo))  // prompt the user to format storage 此步骤就是创建 VERSION/seen_txid文件
      return ERR_CODE_ALREADY_FORMATTED;
    

    // download the fsimage from active namenode
    // 从remoteNN通过http下载fsImage文件了。
    int download = downloadImage(storage, proxy, proxyInfo);
    if (download != 0) 
      return download;
    

    //...省略部分代码
  

下载fsImage文件

private int downloadImage(NNStorage storage, NamenodeProtocol proxy, RemoteNameNodeInfo proxyInfo)
      throws IOException 
    // Load the newly formatted image, using all of the directories
    // (including shared edits)
    // 获取最近的checkpointTxid
    final long imageTxId = proxy.getMostRecentCheckpointTxId();
    // 获取当前事务id
    final long curTxId = proxy.getTransactionID();
    FSImage image = new FSImage(conf);
    try 
      // 赋值集群信息给image
      image.getStorage().setStorageInfo(storage);
      // 创建journalSet对象,置状态为OPEN_FOR_READING
      image.initEditLog(StartupOption.REGULAR);
      assert image.getEditLog().isOpenForRead() :
          "Expected edit log to be open for read";

      // Ensure that we have enough edits already in the shared directory to
      // start up from the last checkpoint on the active.
      // 从共享的qjm中获取curTxId到imageTxId的editLogs数据
      if (!skipSharedEditsCheck &&
          !checkLogsAvailableForRead(image, imageTxId, curTxId)) 
        return ERR_CODE_LOGS_UNAVAILABLE;
      
	  // 通过http下载fsImage,名称为fsimage.ckpt文件,写到存储目录中。
      // Download that checkpoint into our storage directories.
      MD5Hash hash = TransferFsImage.downloadImageToStorage(
        proxyInfo.getHttpAddress(), imageTxId, storage, true, true);
        // 保存fsImage的md5值,并且重命名fsImage为正式的无ckpt的。
      image.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, imageTxId,
          hash);
	  // 写seen_txid到目录中
      // Write seen_txid to the formatted image directories.
      storage.writeTransactionIdFileToStorage(imageTxId, NameNodeDirType.IMAGE);
     catch (IOException ioe) 
      throw ioe;
     finally 
      image.close();
    
    return 0;
  

校验shareEditsLog是否存在

先看 checkLogsAvailableForRead
此步骤主要是从 QJM中获取imageTxId到curTxId之间的editlogs的日志流
直接看重点
org.apache.hadoop.hdfs.server.namenode.FSEditLog.selectInputStreams方法

public Collection<EditLogInputStream> selectInputStreams(long fromTxId,
      long toAtLeastTxId, MetaRecoveryContext recovery, boolean inProgressOk,
      boolean onlyDurableTxns) throws IOException 

    List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
    synchronized(journalSetLock) 
      Preconditions.checkState(journalSet.isOpen(), "Cannot call " +
          "selectInputStreams() on closed FSEditLog");
      // 从共享qjm中获取editLogs,并保存
      selectInputStreams(streams, fromTxId, inProgressOk, onlyDurableTxns);
    

    try 
       // 校验是否有间隔 
      checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
     catch (IOException e) 
      if (recovery != null) 
        // If recovery mode is enabled, continue loading even if we know we
        // can't load up to toAtLeastTxId.
        LOG.error("Exception while selecting input streams", e);
       else 
        closeAllStreams(streams);
        throw e;
      
    
    return streams;
  

下载fsImage

public static MD5Hash downloadImageToStorage(URL fsName, long imageTxId,
      Storage dstStorage, boolean needDigest, boolean isBootstrapStandby)
      throws IOException 
    String fileid = ImageServlet.getParamStringForImage(null,
        imageTxId, dstStorage, isBootstrapStandby);
    String fileName = NNStorage.getCheckpointImageFileName(imageTxId);
    
    List<File> dstFiles = dstStorage.getFiles(
        NameNodeDirType.IMAGE, fileName);
    if (dstFiles.isEmpty()) 
      throw new IOException("No targets in destination storage!");
    
    // 下载并返回 md5值
    MD5Hash hash = getFileClient(fsName, fileid, dstFiles, dstStorage, needDigest);
    LOG.info("Downloaded file " + dstFiles.get(0).getName() + " size " +
        dstFiles.get(0).length() + " bytes.");
    return hash;
  

最后同步元数据完成

另外一个节点的数据目录下存放如下数据:

── current
    ├── fsimage_0000000000000000000
    ├── fsimage_0000000000000000000.md5
    ├── seen_txid
    └── VERSION

1 directory, 4 files

希望对正在查看文章的您有所帮助,记得关注、评论、收藏,谢谢您

大数据包括哪些?

   简单来说,从大数据的生命周期来看,无外乎四个方面:大数据采集、大数据预处理、大数据存储、大数据分析,共同组成了大数据生命周期里最核心的技术,下面分开来说:

一、大数据采集

大数据采集,即对各种来源的结构化和非结构化海量数据,所进行的采集。

    数据库采集:流行的有Sqoop和ETL,传统的关系型数据库MySQL和Oracle 也依然充当着许多企业的数据存储方式。当然了,目前对于开源的Kettle和Talend本身,也集成了大数据集成内容,可实现hdfs,hbase和主流Nosq数据库之间的数据同步和集成。

    网络数据采集:一种借助网络爬虫或网站公开API,从网页获取非结构化或半结构化数据,并将其统一结构化为本地数据的数据采集方式。

    文件采集:包括实时文件采集和处理技术flume、基于ELK的日志采集和增量采集等等。

    二、大数据预处理

    大数据预处理,指的是在进行数据分析之前,先对采集到的原始数据所进行的诸如“清洗、填补、平滑、合并、规格化、一致性检验”等一系列操作,旨在提高数据质量,为后期分析工作奠定基础。数据预处理主要包括四个部分:数据清理、数据集成、数据转换、数据规约。

    数据清理:指利用ETL等清洗工具,对有遗漏数据(缺少感兴趣的属性)、噪音数据(数据中存在着错误、或偏离期望值的数据)、不一致数据进行处理。

    数据集成:是指将不同数据源中的数据,合并存放到统一数据库的,存储方法,着重解决三个问题:模式匹配、数据冗余、数据值冲突检测与处理。

    数据转换:是指对所抽取出来的数据中存在的不一致,进行处理的过程。它同时包含了数据清洗的工作,即根据业务规则对异常数据进行清洗,以保证后续分析结果准确性。

    数据规约:是指在最大限度保持数据原貌的基础上,最大限度精简数据量,以得到较小数据集的操作,包括:数据方聚集、维规约、数据压缩、数值规约、概念分层等。

    三、大数据存储

    大数据存储,指用存储器,以数据库的形式,存储采集到的数据的过程,包含三种典型路线:

    1、基于MPP架构的新型数据库集群

    采用Shared Nothing架构,结合MPP架构的高效分布式计算模式,通过列存储、粗粒度索引等多项大数据处理技术,重点面向行业大数据所展开的数据存储方式。具有低成本、高性能、高扩展性等特点,在企业分析类应用领域有着广泛的应用。

    较之传统数据库,其基于MPP产品的PB级数据分析能力,有着显著的优越性。自然,MPP数据库,也成为了企业新一代数据仓库的最佳选择。

    2、基于Hadoop的技术扩展和封装

    基于Hadoop的技术扩展和封装,是针对传统关系型数据库难以处理的数据和场景(针对非结构化数据的存储和计算等),利用Hadoop开源优势及相关特性(善于处理非结构、半结构化数据、复杂的ETL流程、复杂的数据挖掘和计算模型等),衍生出相关大数据技术的过程。

    伴随着技术进步,其应用场景也将逐步扩大,目前最为典型的应用场景:通过扩展和封装 Hadoop来实现对互联网大数据存储、分析的支撑,其中涉及了几十种NoSQL技术。

    3、大数据一体机

    这是一种专为大数据的分析处理而设计的软、硬件结合的产品。它由一组集成的服务器、存储设备、操作系统、数据库管理系统,以及为数据查询、处理、分析而预安装和优化的软件组成,具有良好的稳定性和纵向扩展性。

    四、大数据分析挖掘

    从可视化分析、数据挖掘算法、预测性分析、语义引擎、数据质量管理等方面,对杂乱无章的数据,进行萃取、提炼和分析的过程。

    1、可视化分析

    可视化分析,指借助图形化手段,清晰并有效传达与沟通信息的分析手段。主要应用于海量数据关联分析,即借助可视化数据分析平台,对分散异构数据进行关联分析,并做出完整分析图表的过程。
    具有简单明了、清晰直观、易于接受的特点。

    2、数据挖掘算法

    数据挖掘算法,即通过创建数据挖掘模型,而对数据进行试探和计算的,数据分析手段。它是大数据分析的理论核心。

    数据挖掘算法多种多样,且不同算法因基于不同的数据类型和格式,会呈现出不同的数据特点。但一般来讲,创建模型的过程却是相似的,即首先分析用户提供的数据,然后针对特定类型的模式和趋势进行查找,并用分析结果定义创建挖掘模型的最佳参数,并将这些参数应用于整个数据集,以提取可行模式和详细统计信息。

    3、预测性分析

    预测性分析,是大数据分析最重要的应用领域之一,通过结合多种高级分析功能(特别统计分析、预测建模、数据挖掘、文本分析、实体分析、优化、实时评分、机器学习等),达到预测不确定事件的目的。

    帮助分用户析结构化和非结构化数据中的趋势、模式和关系,并运用这些指标来预测将来事件,为采取措施提供依据。

    4、语义引擎

    语义引擎,指通过为已有数据添加语义的操作,提高用户互联网搜索体验。

    5、数据质量管理

    指对数据全生命周期的每个阶段(计划、获取、存储、共享、维护、应用、消亡等)中可能引发的各类数据质量问题,进行识别、度量、监控、预警等操作,以提高数据质量的一系列管理活动。

    以上是从大的方面来讲,具体来说大数据的框架技术有很多,这里列举其中一些:

    文件存储:Hadoop HDFS、Tachyon、KFS

    离线计算:Hadoop MapReduce、Spark

    流式、实时计算:Storm、Spark Streaming、S4、Heron

    K-V、NOSQL数据库:HBase、Redis、MongoDB

    资源管理:YARN、Mesos

    日志收集:Flume、Scribe、Logstash、Kibana

    消息系统:Kafka、StormMQ、ZeroMQ、RabbitMQ

    查询分析:Hive、Impala、Pig、Presto、Phoenix、SparkSQL、Drill、Flink、Kylin、Druid

    分布式协调服务:Zookeeper

    集群管理与监控:Ambari、Ganglia、Nagios、Cloudera Manager

    数据挖掘、机器学习:Mahout、Spark MLLib

    数据同步:Sqoop

    任务调度:Oozie

    ······

想要学习更多关于大数据的知识可以加群和志同道合的人一起交流一下啊[https://sourl.cn/d9wRmb ]

参考技术A 大数据技术庞大复杂,基础的技术包含数据的采集、数据预处理、分布式存储、NoSQL数据库、数据仓库、机器学习、并行计算、可视化等各种技术范畴和不同的技术层面。
大数据主要技术组件:Hadoop、HBase、kafka、Hive、MongoDB、Redis、Spark 、Storm、Flink等。
大数据技术包括数据采集,数据管理,数据分析,数据可视化,数据安全等内容。数据的采集包括传感器采集,系统日志采集以及网络爬虫等。数据管理包括传统的数据库技术,nosql技术,以及对于针对大规模数据的大数据平台,例如hadoop,spark,storm等。数据分析的核心是机器学习,当然也包括深度学习和强化学习,以及自然语言处理,图与网络分析等。
参考技术B 大数据(英语:Big data[1][2]或Megadata),或称巨量数据、海量数据、大资料,指的是所涉及的数据量规模巨大到无法通过人工,在合理时间内达到截取、管理、处理、并整理成为人类所能解读的信息。
在总数据量相同的情况下,与个别分析独立的小型数据集(data
set)相比,将各个小型数据集合并后进行分析可得出许多额外的信息和数据关系性,可用来察觉商业趋势、判定研究质量、避免疾病扩散、打击犯罪或测定实时交通路况等;这样的用途正是大型数据集盛行的原因。
大数据的应用示例包括大科学、RFID、感测设备网络、天文学、大气学、基因组学、生物学、大社会数据分析、互联网文件处理、制作互联网搜索引擎索引、通信记录明细、军事侦查、社交网络、通勤时间预测、医疗记录、照片图像和视频封存、大规模的电子商务等。
参考技术C 什么是大数据?
列举三个常用的大数据定义:
(1)具有较强决策、洞察和流程优化能力的海量、高增长、多样化的信息资产需要新的处理模式。
——Gartner
(2)海量数据量、快速数据流和动态数据速度、多样的数据类型和巨大的数据价值。
—— IDC
(3)或者是海量数据、海量数据、大数据,是指所涉及的数据太大,无法在合理的时间内被截取、管理、处理、整理成人类可以解读的信息。
—— Wiki
大数据的其他定义也差不多,可以用几个关键词来定义大数据。
首先是“大尺度”,可以从两个维度来衡量,一是从时间序列中积累大量数据,二是对数据进行深度提炼。
其次,“多样化”可以是不同的数据格式,比如文字、图片、视频等。,可以是不同的数据类别,如人口数据、经济数据等。,也可以有不同的数据源,如互联网和传感器等。
第三,“动态”。数据是不断变化的,它可以随着时间迅速增加大量的数据,也可以是在空间不断移动变化的数据。
这三个关键词定义了大数据的形象。
但是,需要一个关键能力,就是“处理速度快”。如果有这样的大规模、多样化、动态的数据,但是需要很长时间的处理和分析,那就不叫大数据。从另一个角度来说,要实现这些数据的快速处理,肯定没有办法手工实现,所以需要借助机器来实现。
参考技术D 大数据(big data),指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。
在维克托·迈尔-舍恩伯格及肯尼斯·库克耶编写的《大数据时代》中大数据指不用随机分析法(抽样调查)这样捷径,而采用所有数据进行分析处理。大数据的5V特点(IBM提出):Volume(大量)、Velocity(高速)、Variety(多样)、Value(低价值密度)、Veracity(真实性)。

以上是关于大数据HadoopHDFS-Namenode-bootstrapStandby同步元数据的源码步骤分析的主要内容,如果未能解决你的问题,请参考以下文章

大数据与Hadoop之间的关系

新如何学习大数据技术?大数据怎么入门?怎么做大数据分析?

大数据是怎么定义的,大数据包括啥?

大数据如何入门

大数据有啥技术,大数据技术内容介绍

大数据需要掌握哪些技能