HDFS NN refreshNodes操作的可用性和效率的改进

Posted Android路上的人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS NN refreshNodes操作的可用性和效率的改进相关的知识,希望对你有一定的参考价值。

前言


我们知道在HDFS里面有,存着一类白名单和黑名单的列表来控制其下允许进行注册的DN节点。这样可以防止一些外部恶意节点注册到我们的NN上来。在HDFS的概念里,这个黑白名单叫做include file和exclude file。在一般情况下,exclude file的使用范围会更管一些,因为DN的decommission下线需要将待下线机器加到此exclude file中,然后再手动执行dfsadmin的refreshNodes命令进行刷新即可。至于include file白名单,它的管理其实比较复杂。在默认情况下,include file是为空的,意味着默认所有的注册的上来的节点都是被允许的。但是这样会有严重的安全隐患,所以在注重Security环境的集群内,需要管理员每次进行include file的更新来确保当前服务的DN都是有效的。Include file/execlude file的更新需要触发NN的refreshNodes操作来生效。因此这里会存在大量refreshNodes操作的发生。最近笔者在生产环境中就遇到了NN refreshNodes的一些性能问题,本文将讲述围绕refreshNodes的性能问题以及其改进点设计实现。

NN refreshNodes的可用性以及效率问题


笔者在最近工作中遇到了由于refreshNodes失败导致的集群出现大量missing block的情况,鉴于大量missing block造成了比较大的impact。笔者于是开始对此问题进行了深入的分析。

我们从结果开始往前分析,missing block的出现是因为NN上掉了大量节点。NN掉了大量节点的时间段,发现某一台挂掉的DN log里显示了很多下面的Disallow异常:

2021-05-23 07:10:07,609 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: RemoteException in offerService
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException): Datanode denied communication with namenode because the host is not in the include-list: xx.xx.xx.xx:50010
        at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.handleHeartbeat(DatanodeManager.java:1499)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.handleHeartbeat(FSNamesystem.java:4978)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.sendHeartbeat(NameNodeRpcServer.java:1539)
        at org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB.sendHeartbeat(DatanodeProtocolServerSideTranslatorPB.java:118)
        at org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos$DatanodeProtocolService$2.callBlockingMethod(DatanodeProtocolProtos.java:31228)

看到这个的第一反应,是这个节点不在NN的inlcude host列表里面了,然后才发生的上述拒绝错误。再往上进行分析,又发现了很多诸如“fail to resolve”这类的错误,基本断定是NN解析include host时的dns解析问题。

2021-05-23 07:09:59,030 INFO org.apache.hadoop.util.HostsFileReader: Adding a node "xx.xx.xx.xx" to the list of included hosts from /xxx/hadoop/hosts
2021-05-23 07:09:59,932 WARN org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager: Failed to resolve address `xx.xx.xx.xx` in `/xxx/hadoop/hosts`. Ignoring in the included list.

而触发上面resolve行为的是dfsadmin refreshNodes命令,不过这个refreshNodes执行的耗时相当长。

2021-05-23 07:01:39,001 WARN org.apache.hadoop.ipc.Server: Slow RPC : refreshNodes took 5269MILLISECONDS to process from client Call#0 Retry#0 org.apache.hadoop.hdfs.protocol.ClientProtocol.refreshNodes from xx.xx.xx.xx

后来经过分析发现,refershNodes失败的原因是短时段内大量的dns查询导致了dns解析的延迟,这个延时在NN的代码调用层面来看到的情况,就是解析失败的情况。

在这个问题里,有很多可以值得讨论的点:

  • 第一,为什么这里会有这么大量的dns查询请求?后来发现是因为集群DN数总量在持续不断变多,导致每次refreshNodes的cost越来越高。因为NN的refreshNodes行为会重刷所有的DN节点,相当于这是一个全量解析host地址的行为。

  • 第二,为什么这里没有NN解析重试行为?refreshNodes执行失败了,最好能够有一个重试机制,否则失败造成的掉节点,missing block对用户影响太大。

  • 第三,现有refreshNodes的效率不高,它是一种全量解析的方式,是否我们能够支持增量式地refresh node,对于大部分分状态不变的节点,NN完全没必要再对其做dns的重新解析。在增量式 refesh node的情况里,admin管理员能够指定特定的host进行node的refresh,从而使得refreshNode的操作变得非常轻量级。

针对上面的第二,第三点,我们内部对refreshNode逻辑进行了调整和改进,使其在可用性以及执行效率上得到一个号的提升。

NN refreshNodes的改造升级


针对上节部分提出的几个关键点,我们对此进行了如下两部分的改进。

refreshNodes重试机制的引入


在原有逻辑里,NN refreshNodes会触发重新读取所有的include/exclude文件中的host信息。在这个过程里,这些文件里可能会包含有上千甚至上万个节点的host信息,每次读取到一个新的host后,NN尝试会进行对其host的ip地址的解析,相关代码如下:

  private static HostSet readFile(String type, String filename)
          throws IOException {
    HostSet res = new HostSet();
    if (!filename.isEmpty()) {
      HashSet<String> entrySet = new HashSet<String>();
      HostsFileReader.readFileToSet(type, filename, entrySet);
      for (String str : entrySet) {
        // 构造InetSocketAddress地址时,会进行地址解析
        InetSocketAddress addr = parseEntry(type, filename, str);
        if (addr != null) {
          res.add(addr);
        }
      }
    }
    return res;
  }

  @VisibleForTesting
  static InetSocketAddress parseEntry(String type, String fn, String line) {
    try {
      URI uri = new URI("dummy", line, null, null, null);
      int port = uri.getPort() == -1 ? 0 : uri.getPort();
      InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
      if (addr.isUnresolved()) {
        LOG.warn(String.format("Failed to resolve address `%s` in `%s`. " +
                "Ignoring in the %s list.", line, fn, type));
        return null;
      }
      return addr;
    } catch (URISyntaxException e) {
      LOG.warn(String.format("Failed to parse `%s` in `%s`. " + "Ignoring in " +
              "the %s list.", line, fn, type));
    }
    return null;
  }

因此我们这里所说的重试行为指的就是对parseEntry里面失败的情况再一次进行retry。

那么这里的retry机制要怎么实现呢?我们在这里首先引入了一个新的用于存放paseEntry阶段解析host失败的集合对象,这里叫它notInclude的集合对象,对应成功解析的include host的集合对象。

class HostFileManager {
  private static final Log LOG = LogFactory.getLog(HostFileManager.class);
  private HostSet includes = new HostSet();
  private HostSet excludes = new HostSet();
  // 读取include文件时,存放那些解析失败的host
  private HostSet notIncludes = new HostSet();
...

然后对parseEntry 逻辑进行了微调,以此能够进行失败host的收集:

HostFileManager#readFile方法:

  private static HostSet[] readFile(String type, String filename)
          throws IOException {
    HostSet[] hostSets = new HostSet[2];
    HostSet res = new HostSet();
    HostSet failedRes = new HostSet();

    if (!filename.isEmpty()) {
      HashSet<String> entrySet = new HashSet<String>();
      HostsFileReader.readFileToSet(type, filename, entrySet);
      for (String str : entrySet) {
        InetSocketAddress addr = parseEntry(type, filename, str);
        if (addr != null) {
          // 根据host的地址是否能够解析,将其放入对应的host set集合里
          if (addr.isUnresolved()) {
            failedRes.add(addr);
          } else {
            res.add(addr);
          }
        }
      }
    }

    hostSets[0] = res;
    hostSets[1] = failedRes;
    return hostSets;
  }

上述步骤结束之后,HostFileManager里的notIncludes变量就会存有失败的host列表了,然后我们在后续的refreshDatanodes方法里进行重试,逻辑如下:

DatanodeManager#refreshDatanodes方法:

  public void refreshDatanodes() {
    Collection<DatanodeDescriptor> nodes = null;
    synchronized (datanodeMap) {
      nodes = datanodeMap.values();
    }
    
    List<DatanodeDescriptor> notIncluded =
        new ArrayList<DatanodeDescriptor>();
    List<DatanodeDescriptor> included =
        new ArrayList<DatanodeDescriptor>(nodes.size());
    // 根据判断是否在include列表里,进行筛选
    for (DatanodeDescriptor node : nodes) {
      if (hostFileManager.isIncluded(node)) {
        included.add(node);
      } else {
        notIncluded.add(node);
      }
    }

    // there could be DNS resolve failure causing healthy node is treated as
    // not included, retry resolving to avoid the issue;
    int resolveTry = 0;
    final int RESOLVE_MAX = 3;
    // 对not include列表里的host进行解析重试,
    while ((!notIncluded.isEmpty()) && (resolveTry < RESOLVE_MAX)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("resolve retry #" + resolveTry);
        LOG.debug("notInclude hosts: " + notIncluded);
      }
      Iterator<DatanodeDescriptor> iter = notIncluded.iterator();
      while (iter.hasNext()) {
        DatanodeDescriptor node = iter.next();
        InetSocketAddress addr =
            HostFileManager.resolvedAddressFromDatanodeID(node);
        if (!addr.isUnresolved()) {
          // 如果属于是在hostFileManager notIncludes里的解析失败情况的话,
          // 则进行host的移除,然后再重新把host加入include list
          if (hostFileManager.isNotIncluded(addr)) {
            hostFileManager.addToIncludeHosts(node.getIpAddr());
            included.add(node);
            iter.remove();
          } else if (hostFileManager.isIncluded(addr)) {
            // 如果只是在前面调用isIncluded执行时候失败的情况,
            // 则再进行一次此方法的调用,hostFileManager#isIncluded方法里面也会有一次地址解析
            included.add(node);
            iter.remove();
          }
        }
      }
      
      // 最多允许重试3次,每次睡眠一定的间隔,此间隔支持可配置化
      resolveTry++;
      try {
        Thread.sleep(hostResolveRetryInterval);
      } catch (InterruptedException ie) {}
    }
    
    // 对notIncluded、included进行相应action的take操作
    final String operationName = "inner-refreshNodes";
    namesystem.writeLock();
    try {
      for (DatanodeDescriptor node : notIncluded) {
        node.setDisallowed(true); // case 2.
      }
      for (DatanodeDescriptor node : included) {
        node.setDisallowed(false);
        if (hostFileManager.isExcluded(node)) {
          decomManager.startDecommission(node); // case 3.
        } else {
          decomManager.stopDecommission(node); // case 4.
        }
      }
      countSoftwareVersions();
    } finally {
      namesystem.writeUnlock(operationName);
    }
  }

上面每次重试过程是支持时间可配置化,因为有时dns解析的恢复可能需要多达几分钟这样的时间,需要根据实际的使用场景进行配置,默认是一分钟,然后最多重试3次。上面的代码已经是我们内部进行过特别改造过的代码,和社区代码已有所不同,大家看明白主要修改的逻辑思路即可。

增量refreshNodes命令的实现


另外一个关于执行效率上的改进点是增量refreshNodes的实现。

针对NN维护有include/exclude两类文件的情况,我们实现了对应如下的4个新的refreshNodes命令操作,分别是加减host到include/exclude文件里面,然后后面是需要传入的host信息,可支持多host的传入。

bin/hdfs dfsadmin refreshNodes -addIncludeHost [host1,host2,...]
bin/hdfs dfsadmin refreshNodes -removeIncludeHost [host1,host2,...]

bin/hdfs dfsadmin refreshNodes -addExcludeHost [host1,host2,...]
bin/hdfs dfsadmin refreshNodes -removeExcludeHost [host1,host2,...]

从本质上来说,上面4类命令是对HostFileManager里面includes,excludes变量的一个内存操作,然后再进行对应节点的action操作。

这里分别以-addIncludeHost/-addExcludeHost命令实现为例,这里只展示server端的代码实现:

DatanodeManager#refreshNodeWithAddHostIntoInclude方法:

  /**
   * Refresh with adding the given host string into include list.
   * @param host A single host or multiple host string.
   */
  public void refreshNodeWithAddHostIntoInclude(String host) {
    String ipAddr;
    // 1) 解析传入参数,得到指定的host信息
    String[] hosts = host.split(",");
    List<String> ipAddrList = new ArrayList<>();

    for(String h : hosts) {
      // 2)遍历host信息,得到其对应的地址,如果解析失败的情况,则跳过此host
      InetSocketAddress addr = this.hostFileManager.resolveAndAddIncludes(h);
      if (addr == null) {
        continue;
      }

      // 从地址对象里拿到ip信息,进行收集,因为NN在内部存DN信息时,是通过ip进行区分
      ipAddr = addr.getAddress() != null ?
          addr.getAddress().getHostAddress() : null;
      if (ipAddr != null) {
        ipAddrList.add(ipAddr);
      }
    }

    // 3)根据ip地址进行现有LIVE DN的过滤查找
    List<DatanodeDescriptor> targetNodeList = findTargetDatanode(ipAddrList);
    if (!targetNodeList.isEmpty()) {
      final String operationName = "refreshNodeWithAddHostIntoInclude";
      namesystem.writeLock();
      try {
        // 4) 找到目标DN节点对象后,进行include host节点的对应的属性操作,即设置Disallowed属性为false
        for (DatanodeDescriptor dd : targetNodeList) {
          dd.setDisallowed(false);
        }
        countSoftwareVersions();
      } finally {
        namesystem.writeUnlock(operationName);
      }
    } else {
      LOG.info("Refresh node with specific node," +
          " target host doesn't find in NN for host " + host);
    }
  }

this.hostFileManager的resolveAndAddIncludes方法如下:

  /**
   * Add specific host into the include host.
   * @param host Host to be added.
   */
  InetSocketAddress resolveAndAddIncludes(String host) {
    InetSocketAddress addr = parseEntry("included", "non-file", host);
    if (addr != null) {
      if (!addr.isUnresolved()) {
        synchronized (this) {
          // 如果该host地址信息已经在include集合里存在,则忽略添加
          if (includes.match(addr)) {
            LOG.warn("Target node " + host + " is already in include list," +
                " ignoring to add into include host.");
          } else {
            includes.add(addr);
          }
        }

        return addr;
      } else {
        // 如果地址解析失败,则也忽略添加
        LOG.warn("Failed to resolve the host " + host + "," +
            " ignoring to add into include host.");
      }
    }

    return null;
  }

addExcludeHost的实现和addIncludeHost实现几乎一致,区别只是后续DN的action操作不同:

  /**
   * Refresh with adding the given host string into exclude list.
   * @param host A single host or multiple host string.
   */
  public void refreshNodeWithAddHostIntoExclude(String host) {
    String ipAddr = null;
    String[] hosts = host.split(",");
    List<String> ipAddrList = new ArrayList<>();

    for (String h : hosts) {
      InetSocketAddress addr = this.hostFileManager.resolveAndAddExcludes(h);
      if (addr == null) {
        continue;
      }

      ipAddr = addr.getAddress() != null ?
          addr.getAddress().getHostAddress() : null;
      if (ipAddr != null) {
        ipAddrList.add(ipAddr);
      }
    }

    // find the node that match the target ip
    List<DatanodeDescriptor> targetNodeList = findTargetDatanode(ipAddrList);
    if (!targetNodeList.isEmpty()) {
      final String operationName = "refreshNodeWithAddHostIntoExclude";
      namesystem.writeLock();
      try {
        // 加入exclude host的对应action操作是进行此DN的下线
        for (DatanodeDescriptor dd : targetNodeList) {
          decomManager.startDecommission(dd);
        }
        countSoftwareVersions();
      } finally {
        namesystem.writeUnlock(operationName);
      }
    } else {
      LOG.info("Refresh node with specific node," +
          " target host doesn't find in NN for host " + host);
    }
  }

上面的this.hostFileManager.resolveAndAddExcludes方法如下:

  /**
   * Add specific host into the exclude host.
   * @param host Host to be added.
   */
  InetSocketAddress resolveAndAddExcludes(String host) {
    InetSocketAddress addr = parseEntry("excluded", "non-file", host);
    if (addr != null) {
      if (!addr.isUnresolved()) {
        synchronized (this) {
          // 如果exclude列表里已经有此地址时,则忽略添加
          if (excludes.match(addr)) {
            LOG.warn("Target node " + host + " is already in exclude list," +
                " ignoring to add into exclude host.");
          } else {
            // 在添加地址到exclude前,

以上是关于HDFS NN refreshNodes操作的可用性和效率的改进的主要内容,如果未能解决你的问题,请参考以下文章

HDFS的高可用(HA)--------通俗易懂的分析

zookeeper--实现NN和RM的HA

最新版hdfs中namenode的高可用

HDFS读写流程

HDFS NN,SNN,BN和HA

Namanode 高可用 主备节点切换