HDFS源码分析:“-put”到底做了些什么(客户端)
Posted tokendeng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS源码分析:“-put”到底做了些什么(客户端)相关的知识,希望对你有一定的参考价值。
源码分析的目的不是为了扣代码中一字一句,更不是为了背记里面的实现细节,而是为了从源码中发现一种实现的思路与框架,有个宏观的把握,当碰到某个细节时,回过去定位某个细节,再咀嚼里边的代码。我们本着这个思路去窥探HDFS底层相关实现。
当我们键入命令:
bin/hadoop fs -put /home/dcc/data/1.3g.rar /rs1/g.rar
往HDFS集群存数据时,HDFS的客户端到底做了些什么?
总体做了如下工作:
解析-put命令;
建立本地文件输入流;
建立针对HDFS集群的远程文件输出流;
在输入流与输出流中拷贝数据;
下面对每个步骤进行详细的分析。
一、解析-put命令
有关fs的处理命令统一由FsShell类处理:
//org.apache.hadoop.fs.FsShell
public static void main(String argv[]) throws Exception
FsShell shell = newShellInstance();
int res;
try
res = ToolRunner.run(shell, argv);
finally
shell.close();
System.exit(res);
上面进入shell的run()方法,进一步跟踪:
//org.apache.hadoop.fs.FsShell
public int run(String argv[]) throws Exception
// initialize FsShell
init();
int exitCode = -1;
if (argv.length < 1)
printUsage(System.err);
else
String cmd = argv[0];
Command instance = null;
try
//根据传入命令,取出对应命令的实例类。
//跟踪调试时发现instance的类为:org.apache.hadoop.fs.shell.CopyCommands$Put@65694ee6(CopyCommands里边的内部类Put)
instance = commandFactory.getInstance(cmd);
if (instance == null)
throw new UnknownCommandException();
//进入run方法。
exitCode = instance.run(Arrays.copyOfRange(argv, 1, argv.length));
catch (IllegalArgumentException e)
displayError(cmd, e.getLocalizedMessage());
if (instance != null)
printInstanceUsage(System.err, instance);
catch (Exception e)
// instance.run catches IOE, so something is REALLY wrong if here
LOG.debug("Error", e);
displayError(cmd, "Fatal internal error");
e.printStackTrace(System.err);
return exitCode;
上面就完成了对-put命令的解析。这个解析过程对于fs下所有命令都是相同的。
进一步跟踪代码,进入了逻辑主体的实现入口:
//org.apache.hadoop.fs.shell.Command
public int run(String...argv)
LinkedList<String> args = new LinkedList<String>(Arrays.asList(argv));
try
if (isDeprecated())
displayWarning(
"DEPRECATED: Please use '"+ getReplacementCommand() + "' instead.");
//处理逻辑隐藏在这两行代码里边。
processOptions(args);
processRawArguments(args);
catch (IOException e)
displayError(e);
return (numErrors == 0) ? exitCode : exitCodeForError();
上面的Commad类是所有命令的父类,Hadoop自带的命令几乎都是它的子类,如图:
进入逻辑处理的主体:
//org.apache.hadoop.fs.shell.CopyCommands
protected void processArguments(LinkedList<PathData> args)
throws IOException
// NOTE: this logic should be better, mimics previous implementation
if (args.size() == 1 && args.get(0).toString().equals("-"))
copyStreamToTarget(System.in, getTargetPath(args.get(0)));
return;
//处理完参数后,所有重要的逻辑这个方法入口的里边。
super.processArguments(args);
由于Command类具有一棵巨大的子类树,为了代码共用,整个调用过程从父类到子类,再子类,又到父类,上下蠢动。单纯看源代码很难看出调用路径,借用Eclipse单步跟踪调试才能摸清整个调用过程。下面进去方法的关键入口:
//org.apache.hadoop.fs.shell.Command
protected void processPaths(PathData parent, PathData ... items)
throws IOException
// TODO: this really should be iterative
for (PathData item : items)
try
//对于-put命令,这个方法入口进入,就是传输数据的逻辑主体。
processPath(item);
if (recursive && item.stat.isDirectory())
recursePath(item);
//数据上传完成后,这里进去做具体的收尾工作。
postProcessPath(item);
catch (IOException e)
displayError(e);
进一步跟踪进入到执行拷贝数据的关键入口:
//org.apache.hadoop.fs.shell.CommandWithDestination
protected void processPath(PathData src, PathData dst) throws IOException
if (src.stat.isSymlink())
// TODO: remove when FileContext is supported, this needs to either
// copy the symlink or deref the symlink
throw new PathOperationException(src.toString());
else if (src.stat.isFile())
//这里进入拷贝数据流程,从src路径拷往dst。
copyFileToTarget(src, dst);
else if (src.stat.isDirectory() && !isRecursive())
throw new PathIsDirectoryException(src.toString());
二、建立本地文件输入流
//org.apache.hadoop.fs.shell.CommandWithDestination
protected void copyStreamToTarget(InputStream in, PathData target)
throws IOException
if (target.exists && (target.stat.isDirectory() || !overwrite))
throw new PathExistsException(target.toString());
TargetFileSystem targetFs = new TargetFileSystem(target.fs);
try
PathData tempTarget = target.suffix("._COPYING_");
targetFs.setWriteChecksum(writeChecksum);
//至此,已经建立的关键的输入流。
targetFs.writeStreamToFile(in, tempTarget);
//当数据传输完成后,重命名(拷贝的过程中可以发现在目标目录先有个“xxx.<pre name="code" class="java">._COPYING_
的文件”)。
targetFs.rename(tempTarget, target); finally targetFs.close(); // last ditch effort to ensure temp file is removed 三、建立针对HDFS集群的远程文件输出流
进一步跟踪,看看底层是如何拷贝流的:
//org.apache.hadoop.fs.shell.CommandWithDestination
void writeStreamToFile(InputStream in, PathData target) throws IOException
FSDataOutputStream out = null;
try
//建立输出流(到HDFS集群上的流)
out = create(target);
//由工具类IOUtils执行具体的拷贝工作。
IOUtils.copyBytes(in, out, getConf(), true);
finally
IOUtils.closeStream(out); // just in case copyBytes didn't
关键点就到了,至此为止已经建立了输入输流,并且开始传送数据,并未看书整个过程有任何特别,而熟知的HDFS底层复杂的数据传输逻辑隐藏在哪里的呢?
四、在输入流与输出流中拷贝数据
进一步进入IOUtils.copyBytes()方法,看看有没啥玄妙之处:
//org.apache.hadoop.io.IOUtils
public static void copyBytes(InputStream in, OutputStream out, int buffSize)
throws IOException
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
//buffsize为缓冲区大小。
byte buf[] = new byte[buffSize];
//从输入流中读入一个缓冲区的字节。
int bytesRead = in.read(buf);
while (bytesRead >= 0)
//再把缓冲区里的数据循环写出到输出流中。
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError())
throw new IOException("Unable to write to output stream.");
bytesRead = in.read(buf);
进一步跟踪write()方法进去:
//org.apache.hadoop.fs.FSOutputSummer
public synchronized void write(byte b[], int off, int len)
throws IOException
checkClosed();
if (off < 0 || len < 0 || off > b.length - len)
throw new ArrayIndexOutOfBoundsException();
至此,具体的写的工作落实到write1()方法上。
for (int n=0;n<len;n+=write1(b, off+n, len-n))
跟踪进去write1()发现:
//org.apache.hadoop.fs.FSOutputSummer
private int write1(byte b[], int off, int len) throws IOException
if(count==0 && len>=buf.length)
// local buffer is empty and user data has one chunk
// checksum and output data
final int length = buf.length;
sum.update(b, off, length);
//这里是执行数据的主体逻辑,负责构建与传输源始数据与校验和数据。
writeChecksumChunk(b, off, length, false);
return length;
// copy user data to local buffer
int bytesToCopy = buf.length-count;
bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
sum.update(b, off, bytesToCopy);
System.arraycopy(b, off, buf, count, bytesToCopy);
count += bytesToCopy;
if (count == buf.length)
// local buffer is full
flushBuffer();
return bytesToCopy;
进一步跟踪writeChecksumChunk()方法:
<span style="font-size:18px;"> //org.apache.hadoop.fs.FSOutputSummer
private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
throws IOException
int tempChecksum = (int)sum.getValue();
if (!keep)
sum.reset();
//取得之前计算的检验和。
int2byte(tempChecksum, checksum);
//进入写数据入口。
writeChunk(b, off, len, checksum);
</span>
进一步跟踪writeChunk()方法:
//org.apache.hadoop.hdfs.DFSOutputStream
//往HDFS集群写数据,最关键的逻辑主体落实到这个方法。
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException
//检查远程文件(流)是否打开。
dfsClient.checkOpen();
//检查是否关闭。
checkClosed();
int cklen = checksum.length;
int bytesPerChecksum = this.checksum.getBytesPerChecksum();
if (len > bytesPerChecksum)
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
if (checksum.length != this.checksum.getChecksumSize())
throw new IOException("writeChunk() checksum size is supposed to be " +
this.checksum.getChecksumSize() +
" but found to be " + checksum.length);
if (currentPacket == null)
//构建一个Packet,Packet是HDFS上传输数据的单元,有一个或多个Chunk构成。
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock);
if (DFSClient.LOG.isDebugEnabled())
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.seqno +
", src=" + src +
", packetSize=" + packetSize +
", chunksPerPacket=" + chunksPerPacket +
", bytesCurBlock=" + bytesCurBlock);
//往Packet包里写校验和与数据。
currentPacket.writeChecksum(checksum, 0, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;
// If packet is full, enqueue it for transmission
// 当一个Packet里保存了足够多的Chunk进入下面的发送数据流程。
if (currentPacket.numChunks == currentPacket.maxChunks ||
bytesCurBlock == blockSize)
if (DFSClient.LOG.isDebugEnabled())
DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
currentPacket.seqno +
", src=" + src +
", bytesCurBlock=" + bytesCurBlock +
", blockSize=" + blockSize +
", appendChunk=" + appendChunk);
// 这里是数据发送处理的关键入口。
waitAndQueueCurrentPacket();
// If the reopened file did not end at chunk boundary and the above
// write filled up its partial chunk. Tell the summer to generate full
// crc chunks from now on.
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0)
appendChunk = false;
resetChecksumChunk(bytesPerChecksum);
if (!appendChunk)
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum);
//
// if encountering a block boundary, send an empty packet to
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize)
currentPacket = new Packet(0, 0, bytesCurBlock);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
进一步进入发送数据的关键入口:waitAndQueueCurrentPacket()方法
//org.apache.hadoop.hdfs.DFSOutputStream
private void waitAndQueueCurrentPacket() throws IOException
synchronized (dataQueue)
// If queue is full, then wait till we have enough space
while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS)
try
dataQueue.wait();
catch (InterruptedException e)
// If we get interrupted while waiting to queue data, we still need to get rid
// of the current packet. This is because we have an invariant that if
// currentPacket gets full, it will get queued before the next writeChunk.
//
// Rather than wait around for space in the queue, we should instead try to
// return to the caller as soon as possible, even though we slightly overrun
// the MAX_PACKETS iength.
Thread.currentThread().interrupt();
break;
checkClosed();
//把当前Packet添加进去发送队列。
queueCurrentPacket();
进一步进入queueCurrentPacket()方法:
//org.apache.hadoop.hdfs.DFSOutputStream
private void queueCurrentPacket()
//dataQueue为一个队列,存储要发送的Packet。
synchronized (dataQueue)
if (currentPacket == null) return;
//把当前Packet添加进去队列。
dataQueue.addLast(currentPacket);
lastQueuedSeqno = currentPacket.seqno;
if (DFSClient.LOG.isDebugEnabled())
DFSClient.LOG.debug("Queued packet " + currentPacket.seqno);
//清零当前Packet,使得拷贝数据能够继续。
currentPacket = null;
//唤醒持有dataQueue的其他线程。
dataQueue.notifyAll();
至此,把构建的Packet放入dataQueue队列,本文开始的四个步骤基本完成。
但到现在为止,还为看到具体发送数据到HDFS集群上的代码。
从源代码分析把数据拷贝进去dataQueue后,没看到其他操作,并且dataQueue是一个共享变量,必然自然让人联想到后台应该有其他线程负责把dataQueue里边的数据发送,搜索对dataQueue的操作,发现在DFSOutputStream有一个DataStreamer类,而此类继承自Daemon具有做后台线程的天然条件。而在DataStreamer类的run()方法里实现了后台发送数据的主要逻辑:
//org.apache.hadoop.hdfs.DFSOutputStream
@Override
public void run()
long lastPacket = Time.now();
//这里循环执行发送数据。
while (!streamerClosed && dfsClient.clientRunning)
// if the Responder encountered an error, shutdown Responder
if (hasError && response != null)
try
response.close();
response.join();
response = null;
catch (InterruptedException e)
Packet one = null;
try
// process datanode IO errors if any
boolean doSleep = false;
if (hasError && errorIndex>=0)
doSleep = processDatanodeError();
//同步dataQueue
synchronized (dataQueue)
// wait for a packet to be sent.
long now = Time.now();
while ((!streamerClosed && !hasError && dfsClient.clientRunning
&& dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
stage == BlockConstructionStage.DATA_STREAMING &&
now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep )
long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
try
dataQueue.wait(timeout);
catch (InterruptedException e)
doSleep = false;
now = Time.now();
if (streamerClosed || hasError || !dfsClient.clientRunning)
continue;
// get packet to be sent.
if (dataQueue.isEmpty())
one = new Packet(); // heartbeat packet
else
//取出一个待发送的Packet
one = dataQueue.getFirst(); // regular data packet
assert one != null;
// get new block from namenode.从namenode中申请block空间。
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE)
if(DFSClient.LOG.isDebugEnabled())
DFSClient.LOG.debug("Allocating new block");
// 获得datanode信息,并且建立到datannode的链接,以及申请block。
nodes = nextBlockOutputStream(src);
initDataStreaming();
else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND)
if(DFSClient.LOG.isDebugEnabled())
DFSClient.LOG.debug("Append to block " + block);
setupPipelineForAppendOrRecovery();
initDataStreaming();
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > blockSize)
throw new IOException("BlockSize " + blockSize +
" is smaller than data size. " +
" Offset of packet in block " +
lastByteOffsetInBlock +
" Aborting file " + src);
if (one.lastPacketInBlock)
// wait for all data packets have been successfully acked
synchronized (dataQueue)
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning)
try
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
catch (InterruptedException e)
if (streamerClosed || hasError || !dfsClient.clientRunning)
continue;
stage = BlockConstructionStage.PIPELINE_CLOSE;
// send the packet
synchronized (dataQueue)
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket())
dataQueue.removeFirst();
ackQueue.addLast(one);
dataQueue.notifyAll();
if (DFSClient.LOG.isDebugEnabled())
DFSClient.LOG.debug("DataStreamer block " + block +
" sending packet " + one);
// write out data to remote datanode
try
//把Packet数据写往datanode上的block流。
one.writeTo(blockStream);
blockStream.flush();
catch (IOException e)
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN
errorIndex = 0;
throw e;
lastPacket = Time.now();
if (one.isHeartbeatPacket()) //heartbeat packet
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent)
bytesSent = tmpBytesSent;
if (streamerClosed || hasError || !dfsClient.clientRunning)
continue;
// Is this block full?
if (one.lastPacketInBlock)
// wait for the close packet has been acked
synchronized (dataQueue)
while (!streamerClosed && !hasError &&
ackQueue.size() != 0 && dfsClient.clientRunning)
dataQueue.wait(1000);// wait for acks to arrive from datanodes
if (streamerClosed || hasError || !dfsClient.clientRunning)
continue;
endBlock();
if (progress != null) progress.progress();
// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && dfsClient.clientRunning)
Thread.sleep(artificialSlowdown);
catch (Throwable e)
DFSClient.LOG.warn("DataStreamer Exception", e);
if (e instanceof IOException)
setLastException((IOException)e);
hasError = true;
if (errorIndex == -1) // not a datanode error
streamerClosed = true;
closeInternal();
进一步跟踪nextBlockOutputStream()方法,里边实现了获得datanode信息,与datanode建立连接的相关信息:
//org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer
private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
ExtendedBlock oldBlock = block;
do
hasError = false;
lastException = null;
errorIndex = -1;
success = false;
long startTime = Time.now();
DatanodeInfo[] excluded =
excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
.keySet()
.toArray(new DatanodeInfo[0]);
block = oldBlock;
//向namenode节点申请block空间(相关元信息)。
lb = locateFollowingBlock(startTime,
excluded.length > 0 ? excluded : null);
block = lb.getBlock();
block.setNumBytes(0);
accessToken = lb.getBlockToken();
//获得保存此block的datanode信息。
nodes = lb.getLocations();
//
// Connect to first DataNode in the list.
//创建到第一个datanode的数据传输流。
success = createBlockOutputStream(nodes, 0L, false);
if (!success)
DFSClient.LOG.info("Abandoning " + block);
dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName);
block = null;
DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
while (!success && --count >= 0);
if (!success)
throw new IOException("Unable to create new block.");
return nodes;
进一步跟踪“创建到datanode的block流”的那个比较中重要的createBlockOutputStream()方法:
//org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer
private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
boolean recoveryFlag)
Status pipelineStatus = SUCCESS;
String firstBadLink = "";
if (DFSClient.LOG.isDebugEnabled())
for (int i = 0; i < nodes.length; i++)
DFSClient.LOG.debug("pipeline = " + nodes[i]);
// persist blocks on namenode on next flush
persistBlocks.set(true);
int refetchEncryptionKey = 1;
while (true)
boolean result = false;
DataOutputStream out = null;
try
assert null == s : "Previous socket unclosed";
assert null == blockReplyStream : "Previous blockReplyStream unclosed";
//建立到datanode的Socket链接。
s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
//建立基于Socket管道Chanel的输入输出流。
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(s);
if (dfsClient.shouldEncryptData())
iostreamPair encryptedStreams =
DataTransferEncryptor.getEncryptedStreams(unbufOut,
unbufIn, dfsClient.getDataEncryptionKey());
unbufOut = encryptedStreams.out;
unbufIn = encryptedStreams.in;
//基于Socket的流向上封装。
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(unbufIn);
//
// Xmit header info to datanode
//
// send the request构建一个发送器。
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
cachingStrategy);
// receive ack for connect收取校验信息。
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(blockReplyStream));
pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();
if (pipelineStatus != SUCCESS)
if (pipelineStatus == Status.ERROR_ACCESS_TOKEN)
throw new InvalidBlockTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
else
throw new IOException("Bad connect ack with firstBadLink as "
+ firstBadLink);
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success
catch (IOException ie)
DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0)
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to "
+ nodes[0] + " : " + ie);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
// Don't close the socket/exclude this node just yet. Try again with
// a new encryption key.
continue;
// find the datanode that matches
if (firstBadLink.length() != 0)
for (int i = 0; i < nodes.length; i++)
// NB: Unconditionally using the xfer addr w/o hostname
if (firstBadLink.equals(nodes[i].getXferAddr()))
errorIndex = i;
break;
else
errorIndex = 0;
hasError = true;
setLastException(ie);
result = false; // error
finally
if (!result)
IOUtils.closeSocket(s);
s = null;
IOUtils.closeStream(out);
out = null;
IOUtils.closeStream(blockReplyStream);
blockReplyStream = null;
return result;
上面方法泛函了很多底层信息。
上面的数据发送后台线程,在DFSOutputStream初始化时就被启动:
//org.apache.hadoop.hdfs.DFSOutputStream
//初始化DFSOutputStream流。
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, int buffersize,
DataChecksum checksum, String[] favoredNodes) throws IOException
final HdfsFileStatus stat;
try
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
blockSize);
catch(RemoteException re)
throw re.unwrapRemoteException(AccessControlException.class,
DSQuotaExceededException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
ParentNotDirectoryException.class,
NSQuotaExceededException.class,
SafeModeException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
//启动后台数据发送的线程。
out.start();
return out;
//启动后台数据发送的线程的start()方法。
private synchronized void start()
streamer.start();
至此由命令“-put”触发的整个流程就完成。上面的流程是在真实集群上单步调试得出的代码执行路径。应该上HDFS里边的类设计很合理,但每个类的调用路线比较纷乱,不通过单步调试,单纯看代码,很容易看走眼。当捋清了上面的代码执行路径,掌握他们的调用关系后,再去把握每个类的设计功用,以及相关类的层次关系。
以上是关于HDFS源码分析:“-put”到底做了些什么(客户端)的主要内容,如果未能解决你的问题,请参考以下文章
Flutter从源码分析setState 的时候到底发生了什么?
pomelo源码解析--启动项目(pomelo start)