HDFS读文件过程分析:读取文件的Block数据 Posted 2020-09-13 YDDMAX
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS读文件过程分析:读取文件的Block数据相关的知识,希望对你有一定的参考价值。
转自http://shiyanjun.cn/archives/962.html
我们可以从java.io.InputStream类中看到,抽象出一个read方法,用来读取已经打开的InputStream实例中的字节,每次调用read方法,会读取一个字节数据,该方法抽象定义,如下所示: public abstract int read() throws IOException; Hadoop的DFSClient.DFSInputStream类实现了该抽象逻辑,如果我们清楚了如何从HDFS中读取一个文件的一个block的一个字节的原理,更加抽象的顶层只需要迭代即可获取到该文件的全部数据。 从HDFS读文件过程分析:获取文件对应的Block列表 (http://shiyanjun.cn/archives/925.html )中,我们已经获取到一个文件对应的Block列表信息,打开一个文件,接下来就要读取实际的物理块数据,我们从下面的几个方面来详细说明读取数据的过程。
Client从Datanode读取文件的一个字节
下面,我们通过分析DFSClient.DFSInputStream中实现的代码,读取HDFS上文件的内容。首先从下面的方法开始:
2
public
synchronized
int
read()
throws
IOException {
3
int
ret = read( oneByteBuf,
0
,
1
);
4
return
( ret <=
0
) ? -
1
: (oneByteBuf[
0
] &
0xff
);
上面调用read(oneByteBuf, 0, 1)读取一个字节到单字节缓冲区oneByteBuf中,具体实现见如下方法:
02
public
synchronized
int
read(
byte
buf[],
int
off,
int
len)
throws
IOException {
05
throw
new
IOException(
"Stream closed"
);
09
if
(pos < getFileLength()) {
14
currentNode = blockSeekTo(pos);
16
int
realLen = (
int
) Math.min((
long
) len, (blockEnd - pos + 1L));
17
int
result = readBuffer(buf, off, realLen);
23
throw
new
IOException(
"Unexpected EOS from the reader"
);
25
if
(stats !=
null
&& result != -
1
) {
26
stats.incrementBytesRead(result);
29
}
catch
(ChecksumException ce) {
31
}
catch
(IOException e) {
33
LOG.warn(
"DFS Read: "
+ StringUtils.stringifyException(e));
36
if
(currentNode !=
null
) { addToDeadNodes(currentNode); }
读取文件数据的一个字节,具体过程如下:
检查流对象是否处于打开状态(前面已经获取到文件对应的block列表的元数据,并打开一个InputStream对象)
从文件的第一个block开始读取,首先需要找到第一个block对应的数据块所在的Datanode,可以从缓存的block列表中查询到(如果查找不到,则会与Namenode进行一次RPC通信请求获取到)
打开一个到该读取的block所在Datanode节点的流,准备读取block数据
建立了到Datanode的连接后,读取一个字节数据到字节缓冲区中,返回读取的字节数(1个字节)
在读取的过程中,以字节为单位,通过判断某个偏移位置的字节属于哪个block(根据block元数据所限定的字节偏移范围),在根据这个block去定位某一个Datanode节点,这样就可连续地读取一个文件的全部数据(组成文件的、连续的多个block数据块)。
查找待读取的一个字节所在的Datanode节点
上面public synchronized int read(byte buf[], int off, int len) throws IOException方法,调用了blockSeekTo方法来获取,文件某个字节索引位置的数据所在的Datanode节点。其实,很容易就能想到,想要获取到数据所在的Datanode节点,一定是从block元数据中计算得到,然后根据Client缓存的block映射列表,找到block对应的Datanode列表,我们看一下blockSeekTo方法的代码实现:
01
private
synchronized
DatanodeInfo blockSeekTo(
long
target)
throws
IOException {
04
DatanodeInfo chosenNode =
null
;
07
LocatedBlock targetBlock = getBlockAt(target,
true
);
08
assert
(target==
this
.pos) :
"Wrong postion "
+ pos +
" expect "
+ target;
09
long
offsetIntoBlock = target - targetBlock.getStartOffset();
11
DNAddrPair retval = chooseDataNode(targetBlock);
12
chosenNode = retval.info;
13
InetSocketAddress targetAddr = retval.addr;
16
Block blk = targetBlock.getBlock();
17
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
18
if
(shouldTryShortCircuitRead(targetAddr)) {
20
blockReader = getLocalBlockReader(conf, src, blk, accessToken,
21
chosenNode, DFSClient.
this
.socketTimeout, offsetIntoBlock);
23
}
catch
(AccessControlException ex) {
24
LOG.warn(
"Short circuit access failed "
, ex);
26
shortCircuitLocalReads =
false
;
27
}
catch
(IOException ex) {
28
if
(refetchToken >
0
&& tokenRefetchNeeded(ex, targetAddr)) {
34
LOG.info(
"Failed to read "
+ targetBlock.getBlock()
35
+
" on local machine"
+ StringUtils.stringifyException(ex));
36
LOG.info(
"Try reading via the datanode on "
+ targetAddr);
43
s = socketFactory.createSocket();
44
LOG.debug(
"Connecting to "
+ targetAddr);
45
NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
46
s.setSoTimeout(socketTimeout);
47
blockReader = RemoteBlockReader.newBlockReader(s, src, blk.getBlockId(),
49
blk.getGenerationStamp(),
50
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
51
buffersize, verifyChecksum, clientName);
53
}
catch
(IOException ex) {
54
if
(refetchToken >
0
&& tokenRefetchNeeded(ex, targetAddr)) {
58
LOG.warn(
"Failed to connect to "
+ targetAddr
59
+
", add to deadNodes and continue"
+ ex);
60
if
(LOG.isDebugEnabled()) {
61
LOG.debug(
"Connection failure"
, ex);
64
addToDeadNodes(chosenNode);
69
}
catch
(IOException iex) { }
上面代码中,主要包括如下几个要点:
在读取文件的时候,首先会从Namenode获取文件对应的block列表元数据,返回的block列表是按照Datanode的网络拓扑结构进行排序过的(本地节点优先,其次是同一机架节点),而且,Client还维护了一个dead node列表,只要此时bock对应的Datanode列表中节点不出现在dead node列表中就会被返回,用来作为读取数据的Datanode节点。
如果Client为集群Datanode节点,尝试从本地读取block
通过调用chooseDataNode方法返回一个Datanode结点,通过判断,如果该节点地址是本地地址,并且该节点上对应的block元数据信息的状态不是正在创建的状态,则满足从本地读取数据块的条件,然后会创建一个LocalBlockReader对象,直接从本地读取。在创建LocalBlockReader对象的过程中,会先从缓存中查找一个本地Datanode相关的LocalDatanodeInfo对象,该对象定义了与从本地Datanode读取数据的重要信息,以及缓存了待读取block对应的本地路径信息,可以从LocalDatanodeInfo类定义的属性来说明:
1
private
ClientDatanodeProtocol proxy =
null
;
2
private
final
Map<Block, BlockLocalPathInfo> cache;
如果缓存中存在待读取的block的相关信息,可以直接进行读取;否则,会创建一个proxy对象,以及计算待读取block的路径信息BlockLocalPathInfo,最后再加入到缓存,为后续可能的读取加速。我们看一下如果没有从缓存中找到LocalDatanodeInfo信息(尤其是BlockLocalPathInfo),则会执行如下逻辑:
2
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
上面proxy为ClientDatanodeProtocol类型,Client与Datanode进行RPC通信的协议,RPC调用getBlockLocalPathInfo获取block对应的本地路径信息,可以在Datanode类中查看具体实现,如下所示:
1
BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
Datanode调用FSDataset(实现接口FSDatasetInterface)的getBlockLocalPathInfo,如下所示:
2
public
BlockLocalPathInfo getBlockLocalPathInfo(Block block)
4
File datafile = getBlockFile(block);
5
File metafile = getMetaFile(datafile, block);
6
BlockLocalPathInfo info =
new
BlockLocalPathInfo(block, datafile.getAbsolutePath(), metafile.getAbsolutePath());
接着可以直接去读取该block文件(如果需要检查校验和文件,会读取block的元数据文件metafile):
03
File blkfile =
new
File(pathinfo.getBlockPath());
04
dataIn =
new
FileInputStream(blkfile);
08
File metafile =
new
File(pathinfo.getMetaPath());
09
checksumIn =
new
FileInputStream(metafile);
12
BlockMetadataHeader header = BlockMetadataHeader.readHeader(
new
DataInputStream(checksumIn));
13
short
version = header.getVersion();
14
if
(version != FSDataset.METADATA_VERSION) {
15
LOG.warn(
"Wrong version ("
+ version +
") for metadata file for "
+ blk +
" ignoring ..."
);
17
DataChecksum checksum = header.getChecksum();
18
localBlockReader =
new
BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, checksum,
true
, dataIn, checksumIn);
20
localBlockReader =
new
BlockReaderLocal(conf, file, blk, token, startOffset, length, pathinfo, dataIn);
在上面代码中,返回了BlockLocalPathInfo,但是很可能在这个过程中block被删除了,在删除block的时候,Namenode会调度指派该Datanode删除该block,恰好在这个时间间隔内block对应的BlockLocalPathInfo信息已经失效(文件已经被删除),所以上面这段代码再try中会抛出异常,并在catch中捕获到IO异常,会从缓存中再清除掉失效的block到BlockLocalPathInfo的映射信息。
如果Client非集群Datanode节点,远程读取block
如果Client不是Datanode本地节点,则只能跨网络节点远程读取,首先创建Socket连接:
1
s = socketFactory.createSocket();
2
LOG.debug(
"Connecting to "
+ targetAddr);
3
NetUtils.connect(s, targetAddr, getRandomLocalInterfaceAddr(), socketTimeout);
4
s.setSoTimeout(socketTimeout);
建立Client到目标Datanode(targetAddr)的连接,然后同样也是创建一个远程BlockReader对象RemoteBlockReader来辅助读取block数据。创建RemoteBlockReader过程中,首先向目标Datanode发送RPC请求:
02
DataOutputStream out =
new
DataOutputStream(
new
BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
05
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
06
out.write( DataTransferProtocol.OP_READ_BLOCK );
07
out.writeLong( blockId );
08
out.writeLong( genStamp );
09
out.writeLong( startOffset );
11
Text.writeString(out, clientName);
12
accessToken.write(out);
然后获取到DataInputStream对象来读取Datanode的响应信息:
1
DataInputStream in =
new
DataInputStream(
2
new
BufferedInputStream(NetUtils.getInputStream(sock), bufferSize));
最后,返回一个对象RemoteBlockReader:
1
return
new
RemoteBlockReader(file, blockId, in, checksum, verifyChecksum, startOffset, firstChunkOffset, sock);
借助BlockReader来读取block字节
我们再回到blockSeekTo方法中,待读取block所在的Datanode信息、BlockReader信息都已经具备,接着就可以从包含输入流(InputStream)对象的BlockReader中读取数据块中一个字节数据:
1
int
result = readBuffer(buf, off, realLen);
将block数据中一个字节读取到buf中,如下所示:
01
private
synchronized
int
readBuffer(
byte
buf[],
int
off,
int
len)
throws
IOException {
03
boolean
retryCurrentNode =
true
;
08
return
blockReader.read(buf, off, len);
09
}
catch
( ChecksumException ce ) {
10
LOG.warn(
"Found Checksum error for "
+ currentBlock +
" from "
+ currentNode.getName() +
" at "
+ ce.getPos());
11
reportChecksumFailure(src, currentBlock, currentNode);
13
retryCurrentNode =
false
;
14
}
catch
( IOException e ) {
15
if
(!retryCurrentNode) {
16
LOG.warn(
"Exception while reading from "
+ currentBlock +
" of "
+ src +
" from "
+ currentNode +
": "
+ StringUtils.stringifyException(e));
20
boolean
sourceFound =
false
;
21
if
(retryCurrentNode) {
26
sourceFound = seekToBlockSource(pos);
28
addToDeadNodes(currentNode);
29
sourceFound = seekToNewSource(pos);
34
retryCurrentNode =
false
;
通过BlockReaderLocal或者RemoteBlockReader来读取block数据,逻辑非常类似,主要是控制读取字节的偏移量,记录偏移量的状态信息,详细可以查看它们的源码。
DataNode节点处理读文件Block请求
我们可以在DataNode端看一下,如何处理一个读取Block的请求。如果Client与DataNode不是同一个节点,则为远程读取文件Block,首先Client需要发送一个请求头信息,代码如下所示:
02
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
03
out.write( DataTransferProtocol.OP_READ_BLOCK );
04
out.writeLong( blockId );
05
out.writeLong( genStamp );
06
out.writeLong( startOffset );
08
Text.writeString(out, clientName);
09
accessToken.write(out);
DataNode节点端通过验证数据传输版本号(DataTransferProtocol.DATA_TRANSFER_VERSION)一致以后,会判断传输操作类型,如果是读操作DataTransferProtocol.OP_READ_BLOCK,则会通过Client建立的Socket来创建一个OutputStream对象,然后通过BlockSender向Client发送Block数据,代码如下所示:
2
blockSender =
new
BlockSender(block, startOffset, length,
true
,
true
,
false
, datanode, clientTraceFmt);
3
}
catch
(IOException e) {
4
out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
8
out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
9
long
read = blockSender.sendBlock(out, baseStream,
null
);
以上是关于HDFS读文件过程分析:读取文件的Block数据的主要内容,如果未能解决你的问题,请参考以下文章
HDFS写数据流程
浅谈HDFS的读流程
好程序员大数据学习路线分享HDFS读流程
初学HDFS的读过程和写过程的分析
HDFS中的block
深入理解HDFS