HDFS--RPC--delete流程速过

Posted Java不睡觉

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS--RPC--delete流程速过相关的知识,希望对你有一定的参考价值。

今儿个简单过一下HDFS的delete rpc的逻辑,一起来看看delete操作的实现逻辑。
并未进行整个流程总结,留待后续理解更深时总结。

看一下NameNodeRpcServer类中delete的实现。org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer#delete

@Override // ClientProtocol
  public boolean delete(String src, boolean recursive) throws IOException {
    checkNNStartup(); //检查NN启动状态
    if (stateChangeLog.isDebugEnabled()) {
      stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
          + ", recursive=" + recursive);
    }
    namesystem.checkOperation(OperationCategory.WRITE); //Standby节点不能进行写操作
    //RetryCache用来缓存上次处理过的重试请求(通过client id + Call id唯一标识)
    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return true// Return previous response
    }

    boolean ret = false;
    try {
      //调用FSNamesystem#delete方法
      ret = namesystem.delete(src, recursive, cacheEntry != null);
    } finally {
      //保存到RetryCache中
      RetryCache.setState(cacheEntry, ret);
    }
    //增加相关metric。
    if (ret) 
      metrics.incrDeleteFileOps();
    return ret;
  }

继续往下看FSNamesystem#delete方法:

  /**
   * Remove the indicated file from namespace.
   * 
   * @see ClientProtocol#delete(String, boolean) for detailed description and 
   * description of exceptions
   */

  boolean delete(String src, boolean recursive, boolean logRetryCache)
      throws IOException 
{
    final String operationName = "delete";
    BlocksMapUpdateInfo toRemovedBlocks = null;
    checkOperation(OperationCategory.WRITE);
    final FSPermissionChecker pc = getPermissionChecker();
    writeLock();
    boolean ret = false;
    try {
      checkOperation(OperationCategory.WRITE);
      checkNameNodeSafeMode("Cannot delete " + src);
      //获得将要移除的Blocks
      toRemovedBlocks = FSDirDeleteOp.delete(
          this, pc, src, recursive, logRetryCache);
      ret = toRemovedBlocks != null;
    } catch (AccessControlException e) {
      logAuditEvent(false, operationName, src);
      throw e;
    } finally {
      writeUnlock(operationName);
    }
    getEditLog().logSync();
    if (toRemovedBlocks != null) {
      // 真正删除block的地方
      removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
    }
    logAuditEvent(true, operationName, src); //记录审计日志
    return ret;
  }

我们继续看removeBlocks方法:

  /**
   * From the given list, incrementally remove the blocks from blockManager
   * Writelock is dropped and reacquired every BLOCK_DELETION_INCREMENT to
   * ensure that other waiters on the lock can get in. See HDFS-2938
   *
   * 给定要删除的block的list,从blockManager中增量移除blocks。
   * 写锁会被释放,然后每删除BLOCK_DELETION_INCREMENT个block时再重新获取一次写锁,这样确保其他等待写锁的线程能够得到执行。
   *
   * @param blocks
   *          An instance of {@link BlocksMapUpdateInfo} which contains a list
   *          of blocks that need to be removed from blocksMap
   */

  void removeBlocks(BlocksMapUpdateInfo blocks) {
    // 得到要删除的Block List;TODO
    List<BlockInfo> toDeleteList = blocks.getToDeleteList();
    Iterator<BlockInfo> iter = toDeleteList.iterator();
    // 循环List,Remove Block
    while (iter.hasNext()) {
      writeLock();
      try {
        for (int i = 0; i < blockDeletionIncrement && iter.hasNext(); i++) {
          blockManager.removeBlock(iter.next());
        }
      } finally {
        writeUnlock("removeBlocks");
      }
    }
  }
【HDFS】--RPC--delete流程速过

接着我们就进到了BlockManager#removeBlock方法:

public void removeBlock(BlockInfo block) {
    assert namesystem.hasWriteLock();
    // No need to ACK blocks that are being removed entirely
    // from the namespace, since the removal of the associated
    // file already removes them from the block map below.
    block.setNumBytes(BlockCommand.NO_ACK);
    addToInvalidates(block);
    removeBlockFromMap(block);
    // Remove the block from pendingReconstruction and neededReconstruction
    // pendingReconstruction是已经生成复制指令,待发送给DN的block队列
    // neededReconstruction是准备生成复制指令的block队列。
    PendingBlockInfo remove = pendingReconstruction.remove(block);
    if (remove != null) {
      DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
          .toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
    }
    neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
    // 从推迟处理的block队列里删除块信息
    postponedMisreplicatedBlocks.remove(block);
  }
【HDFS】--RPC--delete流程速过

这个方法主要是将block信息加入到块删除队列(invalidates)。然后从块关系map中移除此块的信息,并从pending和needed队列中移除关于此块的信息,避免无用复制。

下面看一下BlockManager#addToInvalidates方法:

 /**
   * Adds block to list of blocks which will be invalidated on all its
   * datanodes.
   */

  private void addToInvalidates(BlockInfo storedBlock) {
    if (!isPopulatingReplQueues()) {
      return;
    }
    StringBuilder datanodes = blockLog.isDebugEnabled()
        ? new StringBuilder() : null;
    for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
      if (storage.getState() != State.NORMAL) {
        continue;
      }
      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
      final Block b = getBlockOnStorage(storedBlock, storage);
      if (b != null) {
        // 在删除队列中添加 (块信息,datanode信息)
        invalidateBlocks.add(b, node, false);
        if (datanodes != null) {
          datanodes.append(node).append(" ");
        }
      }
    }
    if (datanodes != null && datanodes.length() != 0) {
      blockLog.debug("BLOCK* addToInvalidates: {} {}", storedBlock, datanodes);
    }
  }
【HDFS】--RPC--delete流程速过

接着走进org.apache.hadoop.hdfs.server.blockmanagement.InvalidateBlocks#add方法:
这是一个synchronized方法。

接着我们看putBlocksSet方法:

就很简单了,就是判断一下是EC块还是普通块,然后给put到set中,底层调用的是Map接口的put方法(因为hashset的底层实现就是map)

添加到InvalidateBlocks队列后,就等着定时线程来读取然后下发删除指令给Datanode了。


以上是关于HDFS--RPC--delete流程速过的主要内容,如果未能解决你的问题,请参考以下文章

VSCode自定义代码片段——git命令操作一个完整流程

VSCode自定义代码片段15——git命令操作一个完整流程

VSCode自定义代码片段15——git命令操作一个完整流程

《密码编码学与网络安全》William Stalling著---学习笔记知识点速过传统密码+经典对称加密算法+经典公钥密码算法+密码学Hash函数

60天速过RHCE,Linux牛人教你技术证书全掌握三步法

Android 逆向ART 脱壳 ( DexClassLoader 脱壳 | DexClassLoader 构造函数 | 参考 Dalvik 的 DexClassLoader 类加载流程 )(代码片段