从设计到实现:HDFS复制流程细节 | 分布式文件系统读书笔记
Posted 中兴大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从设计到实现:HDFS复制流程细节 | 分布式文件系统读书笔记相关的知识,希望对你有一定的参考价值。
本篇以看图说话的方式记录前篇讲到的hdfs复制流程各步骤对应的hdfs代码实现。
文 | 何文鑫
上篇要点:
gfs、hdfs使用同步复制方式在数据写入时通过pipeline在多个节点复制副本。
gfs pipeline的建立由chunkserver自行选择下个最近节点。数据通过pipeline传输完毕,master选择其中一个chunkserver作为primary,指导其他secondaries以一致的顺序落盘。
hdfs中,client按照namenode给出的datanode顺序建立pipeline,数据以通过pipeline的顺序落盘,简单直接。
图中标号为下述步骤标号,列举代码基于为hadoop 3.1.2:
https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.1.2/hadoop-3.1.2-src.tar.gz
流程
1. client向nn申请添加块(block)
org.apache.hadoop.hdfs.DFSOutputStream, line: 1082
dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
excludedNodes, fileId, favoredNodes, allocFlags);
其中,src是该归属模块的文件名。
该函数返回LocatedBlock给client,里面存有一组dn信息,nn建议client后续块存放到上述dn节点。
2. client向dn0发送set up消息尝试建立pipeline
org.apache.hadoop.hdfs.DataStreamer, line: 1753
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
(targetPinnings != null && targetPinnings[0]), targetPinnings,
nodeStorageIDs[0], nodeStorageIDs);
其中,nodes是第1步中nn返回的dn信息,out是client与dn0的输出流。
该函数执行完后,client会通过rpc向dn0发送set up消息,同时把dn0之后的其他dn节点信息作为参数传递,供下游dn建立后续pipeline。
3. dn0收到client发送的set up消息后,继续向下游dn发送set up消息建立pipeline
org.apache.hadoop.hdfs.server.datanode.DataXceiver, line: 821
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes,
srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
allowLazyPersist, false, targetPinnings,
targetStorageId, targetStorageIds);
可以看到这段代码与client的代码非常相似,主要的区别:
mirrorOut为该dn与下游dn的输出流
传递给下游dn的节点列表会减少一个节点(本节点),节点列表减为0后,pipeline完成。
4. 下游dn pipeline建立成功后向上游dn返回成功ack
org.apache.hadoop.hdfs.server.datanode.DataXceiver, line: 880
BlockOpResponseProto.newBuilder()
.setStatus(mirrorInStatus)
.setFirstBadLink(firstBadLink)
.build()
.writeDelimitedTo(replyOut);
replyOut为下游dn到上游dn的输出流。
5. set up ack返回到client
org.apache.hadoop.hdfs.DataStreamer, line: 1761
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(blockReplyStream));
client检查返回状态中没有异常,则认为pipeline建立成功。
6. pipeline建立成功后,client开始向dn0发送packet数据
创建packet
org.apache.hadoop.hdfs.DFSOutputStream, line: 445~447
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(buffer, len);
currentPacket.incNumChunks();
发送packet消息
org.apache.hadoop.hdfs.DataStreamer, line: 664, 774
DFSPacket one;
...
one.writeTo(blockStream);
7. dn接收packet,并镜像给下游dn
接收packet
org.apache.hadoop.hdfs.server.datanode.BlockReceiver, line: 528
packetReceiver.receiveNextPacket(in);
packet给下游镜像
org.apache.hadoop.hdfs.server.datanode.BlockReceiver, line: 588
packetReceiver.mirrorPacketTo(mirrorOut);
8. 向上游dn返回ack
org.apache.hadoop.hdfs.server.datanode.BlockReceiver, line: 1481
sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0),
PipelineAck.combineHeader(datanode.getECN(), myStatus));
后续dn重复相同操作,直到最后一个dn。
9. client收到所有packet ack,确认没有失败
org.apache.hadoop.hdfs.DFSOutputStream, line: 1086, 1108, 1124~1128
ack.readFields(blockReplyStream);
...
final Status reply = PipelineAck.getStatusFromHeader(ack
.getHeaderFlag(i));
...
if (reply != SUCCESS) {
errorState.setBadNodeIndex(i); // mark bad datanode
throw new IOException("Bad response " + reply +
" for " + block + " from datanode " + targets[i]);
}
10. client发送close消息,关闭pipeline
构造close消息
org.apache.hadoop.hdfs.DFSOutputStream, line: 499
currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(),
getStreamer().getAndIncCurrentSeqno(), true);
close消息与普通数据消息格式一致,只是数据长度为0,最后一个lastPacketInBlock参数为true
11. 各dn依次收到close消息完成(finalize)块,关闭输入输出流
完成块
org.apache.hadoop.hdfs.server.datanode.BlockReceiver, line: 1477
finalizeBlock(startTime);
关闭输入流
org.apache.hadoop.hdfs.server.datanode.DataXceiver, line: 928~932
IOUtils.closeStream(mirrorOut);
IOUtils.closeStream(mirrorIn);
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
块复制完成。
一些遗留问题
在图1的描述中pipeline close后还会向nn发送一个消息。这过程在pipeline的关闭代码中还未看到,不知是否在块上报流程中?待后续确认,也欢迎大家给予指正。
写异常恢复流程本篇没有涉及,后续讨论。
append流程本篇没有涉及,待学习。
块落盘及块状态迁移?finalize?本篇未涉及,待学习。
数据损坏副本补齐流程,待学习。
本篇摘录了hdfs论文中描述的写入流程关键代码,并附带了行号,方便日后再次翻看。
虽然关键代码已被记录,方便代码库中翻查。但要想在大脑中形成运行态模型,依然十分困难,因为关键逻辑散落多个类中,并运行在多个进程/线程中,这些因素对最本质逻辑依然产生很大干扰。
因此,下篇会将本篇的关键逻辑(主要是client代码)进一步抽离到一个独立类中,并以顺序方式执行,最终将呈现出一个纯粹的写入流程。
(to be continued...)
「从设计到实现」系列文章:
长按二维码关注
以上是关于从设计到实现:HDFS复制流程细节 | 分布式文件系统读书笔记的主要内容,如果未能解决你的问题,请参考以下文章