HDFS|从mkdirs操作分析NameNode如何管理元数据——内存篇

Posted 大数据记事本

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS|从mkdirs操作分析NameNode如何管理元数据——内存篇相关的知识,希望对你有一定的参考价值。

     我们知道,NameNode 作为 HDFS 集群的主节点,管理了整个集群的元数据信息,那么,它是如何对元数据进行管理的呢?从这篇开始,我们通过一个 mkdirs 创建目录的具体操作,来分析 NameNode 管理元数据的详细过程。
高可用机制
    具体分析之前,首先要了解一下 NameNode 的高可用机制。在 Hadoop 1.x 版本,HDFS 的 NameNode 存在两个明显的问题:
  • 单点故障:NameNode 仅有一个,不支持高可用,一旦 NameNode 所在点节点出现故障,整个 HDFS 就无法提供服务

  • 内存受限:单个 NameNode 的内存有限,当集群规模达到一定程度,单个 NameNode 的内存将无法满足元数据的存储需求

    所以从 Hadoop 2.x 版本开始,引入了联邦机制来解决内存受限问题;而针对 NameNode 单点故障的问题,则开始支持高可用(HA),具体的原理如下:

     采用双 NameNode 机制,状态分别为 Active 和 StandBy。Active 状态的 NN 对外提供服务,而 StandBy 状态的 NN 作为热备节点,一旦 Active 状态的 NN 下线,StandBy 状态的 NN 切换为 Active 状态,继续对外提供服务。

    既然 StandBy 状态的 NN 要作为热备节点,那么它的元数据信息必须和 Active 状态的 NN 保持一致。Active 状态的 NN 在更新元数据时,除了会更新本地内存中的元数据信息,同时会将元数据信息写入 JournalNode 集群,而 JournalNode 集群本身具备高可用。之后 StandBy 状态的 NN 从 JournalNode 集群拉取元数据信息并更新自身的状态,从而和 Active 状态的 NN 保持元数据一致。

    这里的 Zookeeper 集群用来实现 Active 和 StandBy 状态 NN 的自动切换。

元数据管理流程  

    了解了 NameNode 的高可用机制后,回到开头提到的 mkdirs 操作。当 Active 状态的 NameNode 接收到客户端创建目录的请求时,会进行如下操作:
  • 更新本地内存中的元数据信息
  • 将操作日志写入本地磁盘
  • 将操作日志写入 JournalNode 集群
  • StandBy 状态的 NameNode 拉取 JournalNode 集群中的元数据信息进行同步
整个过程如下图所示:

(后台回复HDFS,获取高清流程图)

     由于整个流程相对复杂,我们分多篇来进行分析,这篇主要分析 NameNode 如何更新本地内存中的元数据信息。

    在编写客户端代码时,通过调用 FileSystem 对象的 mkdirs() 方法来创建目录。这里的 FileSystem 是一个抽象类,如果是 Local 模式,其实例是 LocalFileSystem 对象;如果是分布式模式,其实例是 DistributedFileSystem 对象,这里我们只研究分布式模式。

     当通过 FileSystem.mkdirs 创建目录时,实际调用的是 DistributedFileSystem.mkdirs() 方法(mkdirs 可以在父目录不存在的情况下逐层创建,而 mkdir 创建目录时父目录必须已经存在)
    在 mkdirs 方法内,调用了 mkdirsInternal() 方法,之后又依次调用了 DFSClient.mkdirs() -> DFSClient.primitiveMkdir(),在该方法内部,通过 namenode 代理对象,进行了 RPC 调用 ,实际调用的是 NameNodeRpcServer.mkdirs() 方法
public boolean mkdirs(String src, FsPermission masked, boolean createParent) throws IOException { //检查NN已启动 checkNNStartup(); if(stateChangeLog.isDebugEnabled()) { stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src); } //检查给定目录名称的长度 if (!checkPathLength(src)) { throw new IOException("mkdirs: Pathname too long. Limit "  + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } //TODO 这里的namesystem就是目录树对象,用来管理HDFS目录,创建目录实际调用的是FSNameSystem的mkdirs方法 return namesystem.mkdirs(src, new PermissionStatus(getRemoteUser().getShortUserName(), null, masked), createParent);}
     在 mkdirs() 方法中,调用了 FSNameSystem.mkdirs() 方法
boolean mkdirs(String src, PermissionStatus permissions, boolean createParent) throws IOException { HdfsFileStatus auditStat = null; checkOperation(OperationCategory.WRITE); writeLock(); try { //操作权限验证 checkOperation(OperationCategory.WRITE); //安全模式验证,如果处于安全模式则无法创建 checkNameNodeSafeMode("Cannot create directory " + src); //TODO 创建目录 auditStat = FSDirMkdirOp.mkdirs(this, src, permissions, createParent); } catch (AccessControlException e) { logAuditEvent(false, "mkdirs", src); throw e; } finally { writeUnlock(); } //TODO 元数据日志持久化 getEditLog().logSync(); logAuditEvent(true, "mkdirs", src, null, auditStat); return true;}
该方法的逻辑如下,这里重点分析步骤3 和 步骤4:
  1. 获取写锁

  2. 进行安全验证,如操作权限验证及安全模式验证

  3. 调用 FSDirMkdirOp.mkdirs() 方法创建目录并写入操作日志

  4. 将元数据操作日志持久化到磁盘(NN本地磁盘和JournalNode磁盘),这里将操作日志持久化到磁盘的过程就用到了上篇中提到的双缓冲机制

创建目录:

    本质就是在内存中的目录树结构下依次添加待创建的目录节点,然后将创建目录的操作日志写入 EditLog。
    在内存中目录树下添加节点对应流程图中的如下部分:

    FSDirMkdirOp.mkdirs() 方法

static HdfsFileStatus mkdirs(FSNamesystem fsn, String src, PermissionStatus permissions, boolean createParent) throws IOException { //TODO 获取当前的目录树结构 FSDirectory fsd = fsn.getFSDirectory(); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src); } //验证路径是否有效 if (!DFSUtil.isValidName(src)) { throw new InvalidPathException(src); } FSPermissionChecker pc = fsd.getPermissionChecker(); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); fsd.writeLock(); try { //对传入对路径参数进行规范化 src = fsd.resolvePath(pc, src, pathComponents); INodesInPath iip = fsd.getINodesInPath4Write(src); if (fsd.isPermissionEnabled()) { fsd.checkTraverse(pc, iip); } //TODO 比如我们现在已经存在的目录是/user/hive/warehouse,要在它下面创建/data/mytable // lastINode就是找到最后一级目录,即 mytable final INode lastINode = iip.getLastINode(); if (lastINode != null && lastINode.isFile()) { throw new FileAlreadyExistsException("Path is not a directory: " + src); }
INodesInPath existing = lastINode != null ? iip : iip.getExistingINodes(); if (lastINode == null) { if (fsd.isPermissionEnabled()) { fsd.checkAncestorAccess(pc, iip, FsAction.WRITE); }
if (!createParent) { fsd.verifyParentDir(iip, src); } fsn.checkFsObjectLimit(); //TODO 假设已存在/user/hive/warehouse // 要创建/user/hive/warehouse/data/mytable目录 // 则需要创建的目录就是/data/mytable nonExisting就是需要创建的多级目录, // 即/user/hive/warehouse/data 和 /user/hive/warehouse/data/mytable List<String> nonExisting = iip.getPath(existing.length(), iip.length() - existing.length()); int length = nonExisting.size(); //TODO 如果要创建的目录层级大于1,走这里 if (length > 1) { List<String> ancestors = nonExisting.subList(0, length - 1); existing = createChildrenDirectories(fsd, existing, ancestors, addImplicitUwx(permissions, permissions)); if (existing == null) { throw new IOException("Failed to create directory: " + src); } } //TODO 如果只有一级目录则走这里 if ((existing = createChildrenDirectories(fsd, existing, nonExisting.subList(length - 1, length), permissions)) == null) { throw new IOException("Failed to create directory: " + src); } } return fsd.getAuditFileInfo(existing); } finally { fsd.writeUnlock(); }}
     在 NameNode 中,将目录树这个结构抽象成了 FSDirectory 对象,这个方法的逻辑是:
  • 获取目录树结构
  • 验证传入的路径是否有效
  • 对路径参数进行规范化
  • 找到待创建的最后一级目录
  • 找到不存在的目录(可能有多级)
  • 根据是否存在多级目录,走不同的分支,但最终都是调用了 createChildrenDirectories() 方法来创建目录
private static INodesInPath createChildrenDirectories(FSDirectory fsd, INodesInPath existing, List<String> children, PermissionStatus perm) throws IOException { assert fsd.hasWriteLock(); //TODO 一级一级去创建 for (String component : children) { existing = createSingleDirectory(fsd, existing, component, perm); if (existing == null) { return null; } } return existing;}
     该方法会遍历传入的路径参数,一级一级地去创建,具体创建的方法为 FSDirMkdirOp.createSingleDirectory():
//创建单个目录private static INodesInPath createSingleDirectory(FSDirectory fsd, INodesInPath existing, String localName, PermissionStatus perm) throws IOException { assert fsd.hasWriteLock(); //TODO 更新文件目录树,这棵目录树是存在于内存中的,由FSNamesystem管理的 existing = unprotectedMkdir(fsd, fsd.allocateNewInodeId(), existing, localName.getBytes(Charsets.UTF_8), perm, null, now()); if (existing == null) { return null; }
final INode newNode = existing.getLastINode(); NameNode.getNameNodeMetrics().incrFilesCreated();
String cur = existing.getPath(); //TODO 将创建目录的操作日志写入EditLog fsd.getEditLog().logMkDir(cur, newNode); if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("mkdirs: created directory " + cur); } return existing;}
该方法中最重要的两个操作:
  • 更新内存中的目录树

  • 将操作日志写入EditLog

下面详细分析这两个步骤:
(1) 更新内存中的目录树 :调用的是 FSDirMkdirOp.unprotectedMkdir() 方法
private static INodesInPath unprotectedMkdir(FSDirectory fsd, long inodeId, INodesInPath parent, byte[] name, PermissionStatus permission, List<AclEntry> aclEntries, long timestamp) throws QuotaExceededException, AclException, FileAlreadyExistsException { assert fsd.hasWriteLock(); assert parent.getLastINode() != null; if (!parent.getLastINode().isDirectory()) { throw new FileAlreadyExistsException("Parent path is not a directory: " + parent.getPath() + " " + DFSUtil.bytes2String(name)); } //TODO 封装成一个目录 final INodeDirectory dir = new INodeDirectory(inodeId, name, permission, timestamp); //TODO 往文件目录树添加该目录节点 INodesInPath iip = fsd.addLastINode(parent, dir, true); if (iip != null && aclEntries != null) { AclStorage.updateINodeAcl(dir, aclEntries, Snapshot.CURRENT_STATE_ID); } return iip;}
     在 NameNode 的元数据结构中,目录被封装成了 INodeDirectory 对象,文件被封装成了 INodeFile 对象。所以这里的操作就是根据传入的路径参数封装一个 INodeDirectory 目录对象,然后将这个目录对象添加到目录树 中,调用的是 FSDirectory.addLastINode() 方法。该方法的逻辑就是找到最后一级目录(INodeDirectory 对象),然后在其管理的子目录数据结构:children 变量中(List<INode>类型)添加一个 INode 节点。

(2)将操作日志写入EditLog:调用的是 FSDirectory.logMkDir() 方法

public void logMkDir(String path, INode newNode) { PermissionStatus permissions = newNode.getPermissionStatus(); //TODO 创建日志对象 MkdirOp op = MkdirOp.getInstance(cache.get()) .setInodeId(newNode.getId()) .setPath(path) .setTimestamp(newNode.getModificationTime()) .setPermissionStatus(permissions);
AclFeature f = newNode.getAclFeature(); if (f != null) { op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode)); }
XAttrFeature x = newNode.getXAttrFeature(); if (x != null) { op.setXAttrs(x.getXAttrs()); } //TODO 记录日志,这里只是将操作日志写内存,具体刷写磁盘是由logSync()方法执行的 logEdit(op);}
     该方法中会先将创建目录的日志封装成对应的 MkdirOp 日志对象,然后将这个对象作为参数,调用 logEdit() 方法将操作日志写入缓冲。

以上是关于HDFS|从mkdirs操作分析NameNode如何管理元数据——内存篇的主要内容,如果未能解决你的问题,请参考以下文章

HDFS

Hadoop组件之-HDFS(HA实现细节)

大数据分析之技术框架整理

hadoop执行hdfs的指令出错 ./hadoop dfs mkdir 出错

HDFS Namenode挂掉后分析解决

HDFS 客户端读写操作详情