HDFS Multiple Standby原理分析

Posted Android路上的人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS Multiple Standby原理分析相关的知识,希望对你有一定的参考价值。

前言


HDFS在早期实现HA时,是标准的一主一备的服务模式,主的叫Active NameNode,备的叫Standby NameNode。Standby/Active NN间可以互相切换以此达到服务高可用的目的。但是这种双节点的HA模式是否能够满足更高的高可用性的要求呢?在标准的HA模式下,其实只有1个Standby的NN作为bak来使用。假设在极端情况下,Active和Stanby同时出现crash的情况(这种概率在实际生产环境中也是有几率发生的),那么此时HDFS集群将处于不能服务的状态。因此,我们是否能够实现一种具有更高HA特性的服务模式呢,比如1个Active Service+多Standby的运行模式?倘若我们有了多Standby的支持,毫无疑问将会大大提高NN服务的高可用性。目前社区在3.x版本中已经实现了此功能,相关JIRA HDFS-6440(Support more than 2 NameNodes)。本文笔者对此功能做一个简单的原理分析,在了解了其原理实现后,能够帮助我们更好地去使用这个功能特性。

HDFS Multiple Standby的实现要素


首先我们要知道实现HDFS Multiple Standby的前提是基于HDFS原有的HA实现之上的。它只是将原有的Active-Standby模式扩展为Active-多Standby模式。

从Single Standby到Multiple Standby的关系转变上,有一个核心的点是不变的:Active和Standby服务之间的交互行为。简单来说,就是原本Active和Standby NN服务之间的交互通信理应是不变的。在HA核心代码实现上我们并不需要做额外的改动。

因此在这里面,我们要重点关注以下Active、Standby NN之间的交互行为:

  • NN Bootstrap行为
  • Standby NN的checkpoint upload到Active NN的行为
  • Active/Standby NN间的failover行为
  • Standby NN向ActiveNN发起的log roll操作行为

上述四类操作在原有1对1的Active/Standby模式时,是比较简单的单向直接调用行为。但是在多Standby时,这个时候target会变成多个,而且这些 target的状态事先也不确定,可能是Standby也可能是Active。这部分也是多Standby实现的一个重点也是难点。

下面我们结合实际场景代码做逐一分析。

Multiple Standby实现分析


此小节部分我们主要对上面提到的4个交互行为做具体分析。

Bootstrap行为


在单Standby模式里,Standby NN的直接向另外一个Active NN发起download image的行为即可。但是在多Standby时,操作行为转变为:只需寻找到一个正在服务的NN进行bootstrp即可,不管这个NN是真正的Active还是Standby。因为bootstrap行为只是去NN的metadata文件数据。

相关代码如下:
BootstrapStandby.java

  private int doRun() throws IOException {
    // find the active NN
    NamenodeProtocol proxy = null;
    NamespaceInfo nsInfo = null;
    boolean isUpgradeFinalized = false;
    RemoteNameNodeInfo proxyInfo = null;
    for (int i = 0; i < remoteNNs.size(); i++) {
      proxyInfo = remoteNNs.get(i);
      InetSocketAddress otherIpcAddress = proxyInfo.getIpcAddress();
      proxy = createNNProtocolProxy(otherIpcAddress);
      try {
        // Get the namespace from any active NN. If you just formatted the primary NN and are
        // bootstrapping the other NNs from that layout, it will only contact the single NN.
        // However, if there cluster is already running and you are adding a NN later (e.g.
        // replacing a failed NN), then this will bootstrap from any node in the cluster.
        nsInfo = proxy.versionRequest();
        isUpgradeFinalized = proxy.isUpgradeFinalized();
        // 寻找到一个可用的NN即可
        break;
      } catch (IOException ioe) {
        LOG.warn("Unable to fetch namespace information from remote NN at " + otherIpcAddress
            + ": " + ioe.getMessage());
        if (LOG.isDebugEnabled()) {
          LOG.debug("Full exception trace", ioe);
        }
      }
    }

    ...

    // download the fsimage from active namenode
    int download = downloadImage(storage, proxy, proxyInfo);
    if (download != 0) {
      return download;
    }

    // finish the upgrade: rename previous.tmp to previous
    ...
    return 0;
  }

Standby NN的checkpoint upload行为


我们知道在HDFS HA机制里,Standby会定期checkpoint出一个新的image,然后upload到Active NN中。那么在多Standby NN的情况下,需要解决下面2点主要问题:

  • Standby NN如何找到谁是真正的Active NN,然后再进行image的upload。
  • 找到谁是真正的Active NN后,各个Standby如何协调进行image的upload,这里面会存在潜在的冲突问题。

对于第一个问题,社区的解决思路是假定所有的NN都是潜在的Active NN(里面肯定包括Standby NN)。然后检查每个NN上次发送成功image的状态信息,这个状态信息主要记录以下2点信息:

1)是否是Active节点,这个是基于上次image upload结果发现的,如果后面发生了failover,这个属性值会被更新。
2)上次image upload成功后的时间。

检查的条件:要么target NN是Active节点,要么或者是距上次image upload时间超过指定阈值范围内的即可。相关代码如下:

StandbyCheckpointer.java

  private void doCheckpoint() throws InterruptedException, IOException {
    assert canceler != null;
    final long txid;
    final NameNodeFile imageType;
       ...
      // checkpoint操作生成新的image文件
      img.saveNamespace(namesystem, imageType, canceler);
      txid = img.getStorage().getMostRecentCheckpointTxId();
      assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
          thisCheckpointTxId + " but instead saved at txid=" + txid;

      ...

    // Upload the saved checkpoint back to the active
    // Do this in a separate thread to avoid blocking transition to active, but don't allow more
    // than the expected number of tasks to run or queue up
    // See HDFS-4816
    ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,
        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
        uploadThreadFactory);
    // for right now, just match the upload to the nn address by convention. There is no need to
    // directly tie them together by adding a pair class.
    HashMap<String, Future<TransferFsImage.TransferResult>> uploads =
        new HashMap<>();
    for (final URL activeNNAddress : activeNNAddresses) {
      // Upload image if at least 1 of 2 following conditions met:
      // 1. has been quiet for long enough, try to contact the node.
      // 2. this standby IS the primary checkpointer of target NN.
      String addressString = activeNNAddress.toString();
      assert checkpointReceivers.containsKey(addressString);
      // 1)获取目标NN上次checkpoint的状态
      CheckpointReceiverEntry receiverEntry =
          checkpointReceivers.get(addressString);
      long secsSinceLastUpload =
          TimeUnit.MILLISECONDS.toSeconds(
              monotonicNow() - receiverEntry.getLastUploadTime());
      // 2)如果目标NN是Active或者上次checkpoint时间超出阈值外,则应该upload checkpoint到此NN
      boolean shouldUpload = receiverEntry.isPrimary() ||
          secsSinceLastUpload >= checkpointConf.getQuietPeriod();
      if (shouldUpload) {
        // 3)提交执行upload image到指定NN的任务
        Future<TransferFsImage.TransferResult> upload =
            executor.submit(new Callable<TransferFsImage.TransferResult>() {
              @Override
              public TransferFsImage.TransferResult call()
                  throws IOException, InterruptedException {
                CheckpointFaultInjector.getInstance().duringUploadInProgess();
                return TransferFsImage.uploadImageFromStorage(activeNNAddress,
                    conf, namesystem.getFSImage().getStorage(), imageType, txid,
                    canceler);
              }
            });
        uploads.put(addressString, upload);
      }
    }
...
}

然后在获取upload结果时进行相应target NN的receive结果更新。

    for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
        uploads.entrySet()) {
      String url = entry.getKey();
      Future<TransferFsImage.TransferResult> upload = entry.getValue();
      try {
        // TODO should there be some smarts here about retries nodes that
        //  are not the active NN?
        CheckpointReceiverEntry receiverEntry = checkpointReceivers.get(url);
        TransferFsImage.TransferResult uploadResult = upload.get();
        // 4.a)获取image upload结果,如果执行成功更新对应checkpoint receiver entry的状态:
        //  1)标记receiver的Primary为true,意为此为当前真正的Active的节点 2)更新时间
        if (uploadResult == TransferFsImage.TransferResult.SUCCESS) {
          receiverEntry.setLastUploadTime(monotonicNow());
          receiverEntry.setIsPrimary(true);
        } else {
          // Getting here means image upload is explicitly rejected
          // by the other node. This could happen if:
          // 1. the other is also a standby, or
          // 2. the other is active, but already accepted another
          // newer image, or
          // 3. the other is active but has a recent enough image.
          // All these are valid cases, just log for information.
          LOG.info("Image upload rejected by the other NameNode: {}",
              uploadResult);
          // 4.b)否则将receiver的primary属性设置为false
          receiverEntry.setIsPrimary(false);
        }
      } catch (ExecutionException e) {
        // Even if exception happens, still proceeds to next NN url.
        // so that fail to upload to previous NN does not cause the
        // remaining NN not getting the fsImage.
        ioes.add(new IOException("Exception during image upload", e));
      } catch (InterruptedException e) {
        ie = e;
        break;
      }
    }

从上面代码我们可以看到,这里对每个target NN更新了最新receive image的结果,同时也能够识别出哪个是真正的Active节点(通过Primary属性)。因为还是存在有NN failover到其它NN的可能性,所以这里Standby NN还是向所有的其它NN发起了image的upload请求。如果target是Standby NN的话,这里会接受到一个错误的返回结果,也就实际不会发生upload image到Standby的情况了。

如果target NN是真正的Active NN,那么是否可能会发生多Standby NN频繁upload image到Active NN的情况呢?毕竟每个Standby NN都会做这样的checkpoint逻辑。要解决这个问题,我们需要在Active NN接收方这边做一些额外的判断了。

相关代码如下:
ImageServlet.java

  protected void doPut(final HttpServletRequest request,
      final HttpServletResponse response) throws ServletException, IOException {
    try {
      ...

      UserGroupInformation.getCurrentUser().doAs(
          new PrivilegedExceptionAction<Void>() {

            @Override
            public Void run() throws Exception {
              // 1) 获取当前NN状态,判断是否是Active的NN,这里target的目标是Active NN
              //  如果当前是Standby NN收到请求了,则返回的是错误结果
              HAServiceProtocol.HAServiceState state = NameNodeHttpServer
                  .getNameNodeStateFromContext(getServletContext());
              if (state != HAServiceProtocol.HAServiceState.ACTIVE &&
                  state != HAServiceProtocol.HAServiceState.OBSERVER) {
                // we need a different response type here so the client can differentiate this
                // from the failure to upload due to (1) security, or (2) other checkpoints already
                // present
                sendError(response, HttpServletResponse.SC_EXPECTATION_FAILED,
                    "Nameode "+request.getLocalAddr()+" is currently not in a state which can "
                        + "accept uploads of new fsimages. State: "+state);
                return null;
              }

              final long txid = parsedParams.getTxId();
              String remoteAddr = request.getRemoteAddr();
              ImageUploadRequest imageRequest = new ImageUploadRequest(txid, remoteAddr);

              final NameNodeFile nnf = parsedParams.getNameNodeFile();

              // 2)判断当前是否有别的upload image正在被执行
              SortedSet<ImageUploadRequest> larger = currentlyDownloadingCheckpoints.tailSet(imageRequest);
              if (larger.size() > 0) {
                sendError(response, HttpServletResponse.SC_CONFLICT,
                    "Another checkpointer is already in the process of uploading a" +
                        " checkpoint made up to transaction ID " + larger.last());
                return null;
              }

              //make sure no one else has started uploading one
              if (!currentlyDownloadingCheckpoints.add(imageRequest)) {
                sendError(response, HttpServletResponse.SC_CONFLICT,
                    "Either current namenode is checkpointing or another"
                        + " checkpointer is already in the process of "
                        + "uploading a checkpoint made at transaction ID "
                        + txid);
                return null;
              }

              ...

              // 3) 计算NN上次checkpoint距离目前的时间间隔以及Transaction数的差值,
              // 然后做阈值的检查,避免过于频繁的image的download
              if (checkRecentImageEnable &&
                  NameNodeFile.IMAGE.equals(parsedParams.getNameNodeFile()) &&
                  timeDelta < checkpointPeriod &&
                  txid - lastCheckpointTxid < checkpointTxnCount) {
                // only when at least one of two conditions are met we accept
                // a new fsImage
                // 1. most recent image's txid is too far behind
                // 2. last checkpoint time was too old
                String message = "Rejecting a fsimage due to small time delta "
                    + "and txnid delta. Time since previous checkpoint is "
                    + timeDelta + " expecting at least " + checkpointPeriod
                    + " txnid delta since previous checkpoint is " +
                    (txid - lastCheckpointTxid) + " expecting at least "
                    + checkpointTxnCount;
                LOG.info(message);
                sendError(response, HttpServletResponse.SC_CONFLICT, message);
                return null;
              }

              try {
                ...

                InputStream stream = request.getInputStream();
                try {
                  long start = monotonicNow();
                  //4) 当前NN进行image的download
                  MD5Hash downloadImageDigest = TransferFsImage
                      .handleUploadImageRequest(request, txid,
                          nnImage.getStorage(), stream,
                          parsedParams.getFileSize(), getThrottler(conf));
                  nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
                      downloadImageDigest);
                  ...
                } finally {
                  // 5)NN image下载完成,从请求list中移除当前equest
                  currentlyDownloadingCheckpoints.remove(imageRequest);

                  stream.close();
                }
              } finally {
                nnImage.removeFromCheckpointing(txid);
              }
              return null;
            }

          });
    } catch (Throwable t) {
      ...
    }
  }

从上面代码我们可以看到,Active NN的这部分逻辑会进行upload行为的时间和阈值的检查,避免过于频繁的去下载来自Standby NN的image。

Active/Standby NN间的failover行为


HA服务间的failover行为是另一个需要特别改动的地方,目前HDFS的主从切换依赖的是ZKFC的机制做的。它的一个简单原理是谁率先能够抢到zk的锁并在上面创建相应的znode节点,那么此节点就成为当前的Active服务节点。如果这个znode节点的状态发生改变了,将会触发相应新的Active的选举产生。

我们先来看下原本的failover过程是如何实现的,这样能够方便我们理解多Standby情况的failover实现。

ZKFailoverController.java

  /**
   * Coordinate a graceful failover. This proceeds in several phases:
   * 1) Pre-flight checks: ensure that the local node is healthy, and
   * thus a candidate for failover.
   * 2) Determine the current active node. If it is the local node, no
   * need to failover - return success.
   * 3) Ask that node to yield from the election for a number of seconds.
   * 4) Allow the normal election path to run in other threads. Wait until
   * we either become unhealthy or we see an election attempt recorded by
   * the normal code path.
   * 5) Allow the old active to rejoin the election, so a future
   * failback is possible.
   */
  private void doGracefulFailover()
      throws ServiceFailedException, IOException, InterruptedException {
    int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
    
    // Phase 1: pre-flight checks
    checkEligibleForFailover();
    
    // 1) 获取当前Active地址信息,即老Active信息
    HAServiceTarget oldActive = getCurrentActive();
    if (oldActive == null) {
      // No node is currently active. So, if we aren't already
      // active ourselves by means of a normal election, then there's
      // probably something preventing us from becoming active.
      throw new ServiceFailedException(
          "No other node is currently active.");
    }
    
    // 如果目标target节点已经是active了,则不执行后面的failover逻辑
    if (oldActive.getAddress().equals(localTarget.getAddress())) {
      LOG.info("Local node " + localTarget + " is already active. " +
          "No need to failover. Returning success.");
      return;
    }
    
    // 2)让当前Active服务在指定时间里退出其Active服务角色,并在此时间之内其不能进行Active服务的竞争获取
    LOG.info("Asking " + oldActive + " to cede its active state for " +
        timeout + "ms");
    ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout);
    oldZkfc.cedeActive(timeout);

    // 3)等待当前节点成功变为Active服务,因为没有别的服务竞争Active的选举了
    ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000);
    
    if (attempt == null) {
      // We didn't even make an attempt to become active.
      synchronized(this) {
        if (lastHealthState != State.SERVICE_HEALTHY) {
          throw new ServiceFailedException("Unable to become active. " +
            "Service became unhealthy while trying to failover.");          
        }
      }
      
      throw new ServiceFailedException("Unable to become active. " +
          "Local node did not get an opportunity to do so from ZooKeeper, " +
          "or the local node took too long to transition to active.");
    }

    // 4)恢复原Active节点的选举权
    oldZkfc.cedeActive(-1);
    
    if (attempt.succeeded) {
      LOG.info("Successfully became active. " + attempt.status);
    } else {
      // Propagate failure
      String msg = "Failed to become active. " + attempt.status;
      throw new ServiceFailedException(msg);
    }
  }

上面的过程归纳来说是通过短暂剥夺其它服务的Active选举权来完成目标节点Active身份的成功获取。在多Standby的实现中,这套逻辑同样能够完美地适用,只是目标剥夺选举权的对象从一个Standby NN变为多个Standby NN。

相关代码如下,还是ZKFailoverController.java这个类:

  private void doGracefulFailover()
      throws ServiceFailedException, IOException, InterruptedException {
    int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
    
    // Phase 1: pre-flight checks
    checkEligibleForFailover();
    
    // 1.a)获取当前active节点信息
    HAServiceTarget oldActive = getCurrentActive();
    if (oldActive == null) {
      // No node is currently active. So, if we aren't already
      // active ourselves by means of a normal election, then there's
      // probably something preventing us from becoming active.
      throw new ServiceFailedException(
          "No other node is currently active.");
    }
    
    // 1.b)判断如果当前节点已经是active了,则执行返回
    if (oldActive.getAddress().equals(localTarget.getAddress())) {
      LOG.info("Local node " + localTarget + " is already active. " +
          "No need to failover. Returning success.");
      return;
    }以上是关于HDFS Multiple Standby原理分析的主要内容,如果未能解决你的问题,请参考以下文章

HDFS Multiple Standby的优化实践

HDFS Multiple Standby的优化实践

Hadoop之HDFS的热备份

hadoop 的HDFS 的 standby namenode无法启动事故处理

HDFS QJM机制分析

Hadoop的HA原理