HDFS源码分析:“-put”到底做了些什么(客户端)

Posted tokendeng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS源码分析:“-put”到底做了些什么(客户端)相关的知识,希望对你有一定的参考价值。

        源码分析的目的不是为了扣代码中一字一句,更不是为了背记里面的实现细节,而是为了从源码中发现一种实现的思路与框架,有个宏观的把握,当碰到某个细节时,回过去定位某个细节,再咀嚼里边的代码。我们本着这个思路去窥探HDFS底层相关实现。


当我们键入命令:

bin/hadoop fs -put /home/dcc/data/1.3g.rar /rs1/g.rar

往HDFS集群存数据时,HDFS的客户端到底做了些什么?

总体做了如下工作:

解析-put命令;

建立本地文件输入流;

建立针对HDFS集群的远程文件输出流;

在输入流与输出流中拷贝数据;


下面对每个步骤进行详细的分析。

一、解析-put命令

有关fs的处理命令统一由FsShell类处理:

  //org.apache.hadoop.fs.FsShell
  public static void main(String argv[]) throws Exception 
    FsShell shell = newShellInstance();
    int res;
    try 
      res = ToolRunner.run(shell, argv);
     finally 
      shell.close();
    
    System.exit(res);
  
上面进入shell的run()方法,进一步跟踪:

  //org.apache.hadoop.fs.FsShell
  public int run(String argv[]) throws Exception 
    // initialize FsShell
    init();

    int exitCode = -1;
    if (argv.length < 1) 
      printUsage(System.err);
     else 
      String cmd = argv[0];
      Command instance = null;
      try 
        //根据传入命令,取出对应命令的实例类。
        //跟踪调试时发现instance的类为:org.apache.hadoop.fs.shell.CopyCommands$Put@65694ee6(CopyCommands里边的内部类Put)
        instance = commandFactory.getInstance(cmd);
        if (instance == null) 
          throw new UnknownCommandException();
        
        //进入run方法。
        exitCode = instance.run(Arrays.copyOfRange(argv, 1, argv.length));
       catch (IllegalArgumentException e) 
        displayError(cmd, e.getLocalizedMessage());
        if (instance != null) 
          printInstanceUsage(System.err, instance);
        
       catch (Exception e) 
        // instance.run catches IOE, so something is REALLY wrong if here
        LOG.debug("Error", e);
        displayError(cmd, "Fatal internal error");
        e.printStackTrace(System.err);
      
    
    return exitCode;
  
上面就完成了对-put命令的解析。这个解析过程对于fs下所有命令都是相同的。
进一步跟踪代码,进入了逻辑主体的实现入口:
  //org.apache.hadoop.fs.shell.Command
  public int run(String...argv) 
    LinkedList<String> args = new LinkedList<String>(Arrays.asList(argv));
    try 
      if (isDeprecated()) 
        displayWarning(
            "DEPRECATED: Please use '"+ getReplacementCommand() + "' instead.");
      
      //处理逻辑隐藏在这两行代码里边。
      processOptions(args);
      processRawArguments(args);
     catch (IOException e) 
      displayError(e);
    
    
    return (numErrors == 0) ? exitCode : exitCodeForError();
  
上面的Commad类是所有命令的父类,Hadoop自带的命令几乎都是它的子类,如图:


进入逻辑处理的主体:

  //org.apache.hadoop.fs.shell.CopyCommands
    protected void processArguments(LinkedList<PathData> args)
    throws IOException 
      // NOTE: this logic should be better, mimics previous implementation
      if (args.size() == 1 && args.get(0).toString().equals("-")) 
        copyStreamToTarget(System.in, getTargetPath(args.get(0)));
        return;
      
      //处理完参数后,所有重要的逻辑这个方法入口的里边。
      super.processArguments(args);
    

由于Command类具有一棵巨大的子类树,为了代码共用,整个调用过程从父类到子类,再子类,又到父类,上下蠢动。单纯看源代码很难看出调用路径,借用Eclipse单步跟踪调试才能摸清整个调用过程。下面进去方法的关键入口:

  //org.apache.hadoop.fs.shell.Command
  protected void processPaths(PathData parent, PathData ... items)
  throws IOException 
    // TODO: this really should be iterative
    for (PathData item : items) 
      try 
        //对于-put命令,这个方法入口进入,就是传输数据的逻辑主体。
        processPath(item);
        if (recursive && item.stat.isDirectory()) 
          recursePath(item);
        
        //数据上传完成后,这里进去做具体的收尾工作。
        postProcessPath(item);
       catch (IOException e) 
        displayError(e);
      
    
  
进一步跟踪进入到执行拷贝数据的关键入口:

  //org.apache.hadoop.fs.shell.CommandWithDestination
  protected void processPath(PathData src, PathData dst) throws IOException 
    if (src.stat.isSymlink()) 
      // TODO: remove when FileContext is supported, this needs to either
      // copy the symlink or deref the symlink
      throw new PathOperationException(src.toString());        
     else if (src.stat.isFile()) 
      //这里进入拷贝数据流程,从src路径拷往dst。
      copyFileToTarget(src, dst);
     else if (src.stat.isDirectory() && !isRecursive()) 
      throw new PathIsDirectoryException(src.toString());
    
  
二、建立本地文件输入流

  //org.apache.hadoop.fs.shell.CommandWithDestination
 protected void copyStreamToTarget(InputStream in, PathData target)
  throws IOException 
    if (target.exists && (target.stat.isDirectory() || !overwrite)) 
      throw new PathExistsException(target.toString());
    
    TargetFileSystem targetFs = new TargetFileSystem(target.fs);
    try 
      PathData tempTarget = target.suffix("._COPYING_");
      targetFs.setWriteChecksum(writeChecksum);
      //至此,已经建立的关键的输入流。
      targetFs.writeStreamToFile(in, tempTarget);
      //当数据传输完成后,重命名(拷贝的过程中可以发现在目标目录先有个“xxx.<pre name="code" class="java">._COPYING_
的文件”)。
targetFs.rename(tempTarget, target); finally targetFs.close(); // last ditch effort to ensure temp file is removed

 
三、建立针对HDFS集群的远程文件输出流

进一步跟踪,看看底层是如何拷贝流的:

  //org.apache.hadoop.fs.shell.CommandWithDestination
    void writeStreamToFile(InputStream in, PathData target) throws IOException 
      FSDataOutputStream out = null;
      try 
        //建立输出流(到HDFS集群上的流)
        out = create(target);
        //由工具类IOUtils执行具体的拷贝工作。
        IOUtils.copyBytes(in, out, getConf(), true);
       finally 
        IOUtils.closeStream(out); // just in case copyBytes didn't
      
    
关键点就到了,至此为止已经建立了输入输流,并且开始传送数据,并未看书整个过程有任何特别,而熟知的HDFS底层复杂的数据传输逻辑隐藏在哪里的呢?

四、在输入流与输出流中拷贝数据

进一步进入IOUtils.copyBytes()方法,看看有没啥玄妙之处:

  //org.apache.hadoop.io.IOUtils
   public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
    throws IOException 
    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
    //buffsize为缓冲区大小。
    byte buf[] = new byte[buffSize];
    //从输入流中读入一个缓冲区的字节。
    int bytesRead = in.read(buf);
    while (bytesRead >= 0) 
      //再把缓冲区里的数据循环写出到输出流中。
      out.write(buf, 0, bytesRead);
      if ((ps != null) && ps.checkError()) 
        throw new IOException("Unable to write to output stream.");
      
      bytesRead = in.read(buf);
    
  
进一步跟踪write()方法进去:

  //org.apache.hadoop.fs.FSOutputSummer
  public synchronized void write(byte b[], int off, int len)
      throws IOException 
    
    checkClosed();
    
    if (off < 0 || len < 0 || off > b.length - len) 
      throw new ArrayIndexOutOfBoundsException();
    
    至此,具体的写的工作落实到write1()方法上。
    for (int n=0;n<len;n+=write1(b, off+n, len-n)) 
    
  
跟踪进去write1()发现:

  //org.apache.hadoop.fs.FSOutputSummer
  private int write1(byte b[], int off, int len) throws IOException 
    if(count==0 && len>=buf.length) 
      // local buffer is empty and user data has one chunk
      // checksum and output data
      final int length = buf.length;
      sum.update(b, off, length);
      //这里是执行数据的主体逻辑,负责构建与传输源始数据与校验和数据。
      writeChecksumChunk(b, off, length, false);
      return length;
    
    
    // copy user data to local buffer
    int bytesToCopy = buf.length-count;
    bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
    sum.update(b, off, bytesToCopy);
    System.arraycopy(b, off, buf, count, bytesToCopy);
    count += bytesToCopy;
    if (count == buf.length) 
      // local buffer is full
      flushBuffer();
     
    return bytesToCopy;
  
进一步跟踪writeChecksumChunk()方法:

<span style="font-size:18px;">  //org.apache.hadoop.fs.FSOutputSummer
  private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
  throws IOException 
    int tempChecksum = (int)sum.getValue();
    if (!keep) 
      sum.reset();
    
    //取得之前计算的检验和。
    int2byte(tempChecksum, checksum);
    //进入写数据入口。
    writeChunk(b, off, len, checksum);
  </span>
进一步跟踪writeChunk()方法:

  //org.apache.hadoop.hdfs.DFSOutputStream
  //往HDFS集群写数据,最关键的逻辑主体落实到这个方法。
  protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
                                                        throws IOException 
    //检查远程文件(流)是否打开。
    dfsClient.checkOpen();
    //检查是否关闭。
    checkClosed();

    int cklen = checksum.length;
    int bytesPerChecksum = this.checksum.getBytesPerChecksum(); 
    if (len > bytesPerChecksum) 
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    
    if (checksum.length != this.checksum.getChecksumSize()) 
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            this.checksum.getChecksumSize() + 
                            " but found to be " + checksum.length);
    

    if (currentPacket == null) 
      //构建一个Packet,Packet是HDFS上传输数据的单元,有一个或多个Chunk构成。
      currentPacket = new Packet(packetSize, chunksPerPacket, 
          bytesCurBlock);
      if (DFSClient.LOG.isDebugEnabled()) 
        DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
            currentPacket.seqno +
            ", src=" + src +
            ", packetSize=" + packetSize +
            ", chunksPerPacket=" + chunksPerPacket +
            ", bytesCurBlock=" + bytesCurBlock);
      
    
    //往Packet包里写校验和与数据。
    currentPacket.writeChecksum(checksum, 0, cklen);
    currentPacket.writeData(b, offset, len);
    currentPacket.numChunks++;
    bytesCurBlock += len;

    // If packet is full, enqueue it for transmission
    // 当一个Packet里保存了足够多的Chunk进入下面的发送数据流程。
    if (currentPacket.numChunks == currentPacket.maxChunks ||
        bytesCurBlock == blockSize) 
      if (DFSClient.LOG.isDebugEnabled()) 
        DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
            currentPacket.seqno +
            ", src=" + src +
            ", bytesCurBlock=" + bytesCurBlock +
            ", blockSize=" + blockSize +
            ", appendChunk=" + appendChunk);
      
      // 这里是数据发送处理的关键入口。
      waitAndQueueCurrentPacket();

      // If the reopened file did not end at chunk boundary and the above
      // write filled up its partial chunk. Tell the summer to generate full 
      // crc chunks from now on.
      if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) 
        appendChunk = false;
        resetChecksumChunk(bytesPerChecksum);
      

      if (!appendChunk) 
        int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
        computePacketChunkSize(psize, bytesPerChecksum);
      
      //
      // if encountering a block boundary, send an empty packet to 
      // indicate the end of block and reset bytesCurBlock.
      //
      if (bytesCurBlock == blockSize) 
        currentPacket = new Packet(0, 0, bytesCurBlock);
        currentPacket.lastPacketInBlock = true;
        currentPacket.syncBlock = shouldSyncBlock;
        waitAndQueueCurrentPacket();
        bytesCurBlock = 0;
        lastFlushOffset = 0;
      
    
  
进一步进入发送数据的关键入口:waitAndQueueCurrentPacket()方法

  //org.apache.hadoop.hdfs.DFSOutputStream
  private void waitAndQueueCurrentPacket() throws IOException 
    synchronized (dataQueue) 
      // If queue is full, then wait till we have enough space
      while (!closed && dataQueue.size() + ackQueue.size()  > MAX_PACKETS) 
        try 
          dataQueue.wait();
         catch (InterruptedException e) 
          // If we get interrupted while waiting to queue data, we still need to get rid
          // of the current packet. This is because we have an invariant that if
          // currentPacket gets full, it will get queued before the next writeChunk.
          //
          // Rather than wait around for space in the queue, we should instead try to
          // return to the caller as soon as possible, even though we slightly overrun
          // the MAX_PACKETS iength.
          Thread.currentThread().interrupt();
          break;
        
      
      checkClosed();
      //把当前Packet添加进去发送队列。
      queueCurrentPacket();
    
  
进一步进入queueCurrentPacket()方法:

  //org.apache.hadoop.hdfs.DFSOutputStream
  private void queueCurrentPacket() 
    //dataQueue为一个队列,存储要发送的Packet。
    synchronized (dataQueue) 
      if (currentPacket == null) return;
      //把当前Packet添加进去队列。
      dataQueue.addLast(currentPacket);
      lastQueuedSeqno = currentPacket.seqno;
      if (DFSClient.LOG.isDebugEnabled()) 
        DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
      
      //清零当前Packet,使得拷贝数据能够继续。
      currentPacket = null;
      //唤醒持有dataQueue的其他线程。
      dataQueue.notifyAll();
    
  
至此,把构建的Packet放入dataQueue队列,本文开始的四个步骤基本完成。

但到现在为止,还为看到具体发送数据到HDFS集群上的代码。

从源代码分析把数据拷贝进去dataQueue后,没看到其他操作,并且dataQueue是一个共享变量,必然自然让人联想到后台应该有其他线程负责把dataQueue里边的数据发送,搜索对dataQueue的操作,发现在DFSOutputStream有一个DataStreamer类,而此类继承自Daemon具有做后台线程的天然条件。而在DataStreamer类的run()方法里实现了后台发送数据的主要逻辑:

  //org.apache.hadoop.hdfs.DFSOutputStream
    @Override
    public void run() 
      long lastPacket = Time.now();
      //这里循环执行发送数据。
      while (!streamerClosed && dfsClient.clientRunning) 

        // if the Responder encountered an error, shutdown Responder
        if (hasError && response != null) 
          try 
            response.close();
            response.join();
            response = null;
           catch (InterruptedException  e) 
          
        

        Packet one = null;

        try 
          // process datanode IO errors if any
          boolean doSleep = false;
          if (hasError && errorIndex>=0) 
            doSleep = processDatanodeError();
          
          //同步dataQueue
          synchronized (dataQueue) 
            // wait for a packet to be sent.
            long now = Time.now();
            while ((!streamerClosed && !hasError && dfsClient.clientRunning 
                && dataQueue.size() == 0 && 
                (stage != BlockConstructionStage.DATA_STREAMING || 
                 stage == BlockConstructionStage.DATA_STREAMING && 
                 now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) 
              long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
              timeout = timeout <= 0 ? 1000 : timeout;
              timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                 timeout : 1000;
              try 
                dataQueue.wait(timeout);
               catch (InterruptedException  e) 
              
              doSleep = false;
              now = Time.now();
            
            if (streamerClosed || hasError || !dfsClient.clientRunning) 
              continue;
            
            // get packet to be sent.
            if (dataQueue.isEmpty()) 
              one = new Packet();  // heartbeat packet
             else 
              //取出一个待发送的Packet
              one = dataQueue.getFirst(); // regular data packet
            
          
          assert one != null;

          // get new block from namenode.从namenode中申请block空间。
          if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) 
            if(DFSClient.LOG.isDebugEnabled()) 
              DFSClient.LOG.debug("Allocating new block");
            
            // 获得datanode信息,并且建立到datannode的链接,以及申请block。
            nodes = nextBlockOutputStream(src);
            initDataStreaming();
           else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) 
            if(DFSClient.LOG.isDebugEnabled()) 
              DFSClient.LOG.debug("Append to block " + block);
            
            setupPipelineForAppendOrRecovery();
            initDataStreaming();
          

          long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
          if (lastByteOffsetInBlock > blockSize) 
            throw new IOException("BlockSize " + blockSize +
                " is smaller than data size. " +
                " Offset of packet in block " + 
                lastByteOffsetInBlock +
                " Aborting file " + src);
          

          if (one.lastPacketInBlock) 
            // wait for all data packets have been successfully acked
            synchronized (dataQueue) 
              while (!streamerClosed && !hasError && 
                  ackQueue.size() != 0 && dfsClient.clientRunning) 
                try 
                  // wait for acks to arrive from datanodes
                  dataQueue.wait(1000);
                 catch (InterruptedException  e) 
                
              
            
            if (streamerClosed || hasError || !dfsClient.clientRunning) 
              continue;
            
            stage = BlockConstructionStage.PIPELINE_CLOSE;
          
          
          // send the packet
          synchronized (dataQueue) 
            // move packet from dataQueue to ackQueue
            if (!one.isHeartbeatPacket()) 
              dataQueue.removeFirst();
              ackQueue.addLast(one);
              dataQueue.notifyAll();
            
          

          if (DFSClient.LOG.isDebugEnabled()) 
            DFSClient.LOG.debug("DataStreamer block " + block +
                " sending packet " + one);
          

          // write out data to remote datanode
          try 
            //把Packet数据写往datanode上的block流。
            one.writeTo(blockStream);
            blockStream.flush();   
           catch (IOException e) 
            // HDFS-3398 treat primary DN is down since client is unable to 
            // write to primary DN 
            errorIndex = 0;
            throw e;
          
          lastPacket = Time.now();
          
          if (one.isHeartbeatPacket())   //heartbeat packet
          
          
          // update bytesSent
          long tmpBytesSent = one.getLastByteOffsetBlock();
          if (bytesSent < tmpBytesSent) 
            bytesSent = tmpBytesSent;
          

          if (streamerClosed || hasError || !dfsClient.clientRunning) 
            continue;
          

          // Is this block full?
          if (one.lastPacketInBlock) 
            // wait for the close packet has been acked
            synchronized (dataQueue) 
              while (!streamerClosed && !hasError && 
                  ackQueue.size() != 0 && dfsClient.clientRunning) 
                dataQueue.wait(1000);// wait for acks to arrive from datanodes
              
            
            if (streamerClosed || hasError || !dfsClient.clientRunning) 
              continue;
            

            endBlock();
          
          if (progress != null)  progress.progress(); 

          // This is used by unit test to trigger race conditions.
          if (artificialSlowdown != 0 && dfsClient.clientRunning) 
            Thread.sleep(artificialSlowdown); 
          
         catch (Throwable e) 
          DFSClient.LOG.warn("DataStreamer Exception", e);
          if (e instanceof IOException) 
            setLastException((IOException)e);
          
          hasError = true;
          if (errorIndex == -1)  // not a datanode error
            streamerClosed = true;
          
        
      
      closeInternal();
    
进一步跟踪nextBlockOutputStream()方法,里边实现了获得datanode信息,与datanode建立连接的相关信息:

  //org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer
    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException 
      LocatedBlock lb = null;
      DatanodeInfo[] nodes = null;
      int count = dfsClient.getConf().nBlockWriteRetry;
      boolean success = false;
      ExtendedBlock oldBlock = block;
      do 
        hasError = false;
        lastException = null;
        errorIndex = -1;
        success = false;

        long startTime = Time.now();
        DatanodeInfo[] excluded =
            excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
            .keySet()
            .toArray(new DatanodeInfo[0]);
        block = oldBlock;
        //向namenode节点申请block空间(相关元信息)。
        lb = locateFollowingBlock(startTime,
            excluded.length > 0 ? excluded : null);
        block = lb.getBlock();
        block.setNumBytes(0);
        accessToken = lb.getBlockToken();
        //获得保存此block的datanode信息。
        nodes = lb.getLocations();

        //
        // Connect to first DataNode in the list.
        //创建到第一个datanode的数据传输流。
        success = createBlockOutputStream(nodes, 0L, false);

        if (!success) 
          DFSClient.LOG.info("Abandoning " + block);
          dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName);
          block = null;
          DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
          excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
        
       while (!success && --count >= 0);

      if (!success) 
        throw new IOException("Unable to create new block.");
      
      return nodes;
    
进一步跟踪“创建到datanode的block流”的那个比较中重要的createBlockOutputStream()方法:

  //org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer
    private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
        boolean recoveryFlag) 
      Status pipelineStatus = SUCCESS;
      String firstBadLink = "";
      if (DFSClient.LOG.isDebugEnabled()) 
        for (int i = 0; i < nodes.length; i++) 
          DFSClient.LOG.debug("pipeline = " + nodes[i]);
        
      

      // persist blocks on namenode on next flush
      persistBlocks.set(true);

      int refetchEncryptionKey = 1;
      while (true) 
        boolean result = false;
        DataOutputStream out = null;
        try 
          assert null == s : "Previous socket unclosed";
          assert null == blockReplyStream : "Previous blockReplyStream unclosed";
          //建立到datanode的Socket链接。
          s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
          long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
          //建立基于Socket管道Chanel的输入输出流。
          OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
          InputStream unbufIn = NetUtils.getInputStream(s);
          if (dfsClient.shouldEncryptData()) 
            iostreamPair encryptedStreams =
                DataTransferEncryptor.getEncryptedStreams(unbufOut,
                    unbufIn, dfsClient.getDataEncryptionKey());
            unbufOut = encryptedStreams.out;
            unbufIn = encryptedStreams.in;
          
          //基于Socket的流向上封装。
          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
              HdfsConstants.SMALL_BUFFER_SIZE));
          blockReplyStream = new DataInputStream(unbufIn);
  
          //
          // Xmit header info to datanode
          //
  
          // send the request构建一个发送器。
          new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
              nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
              cachingStrategy);
  
          // receive ack for connect收取校验信息。
          BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
              PBHelper.vintPrefixed(blockReplyStream));
          pipelineStatus = resp.getStatus();
          firstBadLink = resp.getFirstBadLink();
          
          if (pipelineStatus != SUCCESS) 
            if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) 
              throw new InvalidBlockTokenException(
                  "Got access token error for connect ack with firstBadLink as "
                      + firstBadLink);
             else 
              throw new IOException("Bad connect ack with firstBadLink as "
                  + firstBadLink);
            
          
          assert null == blockStream : "Previous blockStream unclosed";
          blockStream = out;
          result =  true; // success
  
         catch (IOException ie) 
          DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
          if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) 
            DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
                + "encryption key was invalid when connecting to "
                + nodes[0] + " : " + ie);
            // The encryption key used is invalid.
            refetchEncryptionKey--;
            dfsClient.clearDataEncryptionKey();
            // Don't close the socket/exclude this node just yet. Try again with
            // a new encryption key.
            continue;
          
  
          // find the datanode that matches
          if (firstBadLink.length() != 0) 
            for (int i = 0; i < nodes.length; i++) 
              // NB: Unconditionally using the xfer addr w/o hostname
              if (firstBadLink.equals(nodes[i].getXferAddr())) 
                errorIndex = i;
                break;
              
            
           else 
            errorIndex = 0;
          
          hasError = true;
          setLastException(ie);
          result =  false;  // error
         finally 
          if (!result) 
            IOUtils.closeSocket(s);
            s = null;
            IOUtils.closeStream(out);
            out = null;
            IOUtils.closeStream(blockReplyStream);
            blockReplyStream = null;
          
        
        return result;
      
    
上面方法泛函了很多底层信息。


上面的数据发送后台线程,在DFSOutputStream初始化时就被启动:

  //org.apache.hadoop.hdfs.DFSOutputStream
  //初始化DFSOutputStream流。
  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
      short replication, long blockSize, Progressable progress, int buffersize,
      DataChecksum checksum, String[] favoredNodes) throws IOException 
    final HdfsFileStatus stat;
    try 
      stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
          new EnumSetWritable<CreateFlag>(flag), createParent, replication,
          blockSize);
     catch(RemoteException re) 
      throw re.unwrapRemoteException(AccessControlException.class,
                                     DSQuotaExceededException.class,
                                     FileAlreadyExistsException.class,
                                     FileNotFoundException.class,
                                     ParentNotDirectoryException.class,
                                     NSQuotaExceededException.class,
                                     SafeModeException.class,
                                     UnresolvedPathException.class,
                                     SnapshotAccessControlException.class);
    
    final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
        flag, progress, checksum, favoredNodes);
    //启动后台数据发送的线程。
    out.start();
    return out;
  
  
  
  //启动后台数据发送的线程的start()方法。
  private synchronized void start() 
    streamer.start();
  

至此由命令“-put”触发的整个流程就完成。上面的流程是在真实集群上单步调试得出的代码执行路径。应该上HDFS里边的类设计很合理,但每个类的调用路线比较纷乱,不通过单步调试,单纯看代码,很容易看走眼。当捋清了上面的代码执行路径,掌握他们的调用关系后,再去把握每个类的设计功用,以及相关类的层次关系。




以上是关于HDFS源码分析:“-put”到底做了些什么(客户端)的主要内容,如果未能解决你的问题,请参考以下文章

转Rollback后undo到底做了些什么?

js中的new()到底做了些什么??

Flutter从源码分析setState 的时候到底发生了什么?

pomelo源码解析--启动项目(pomelo start)

Hadoop2 使用 YARN 运行 MapReduce 的过程源码分析

软件安装过程