从设计到实现:HDFS复制流程细节 | 分布式文件系统读书笔记

Posted 中兴大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从设计到实现:HDFS复制流程细节 | 分布式文件系统读书笔记相关的知识,希望对你有一定的参考价值。

本篇以看图说话的方式记录前篇讲到的hdfs复制流程各步骤对应的hdfs代码实现。

文 | 何文鑫

上篇要点:

  • gfs、hdfs使用同步复制方式在数据写入时通过pipeline在多个节点复制副本。

  • gfs pipeline的建立由chunkserver自行选择下个最近节点。数据通过pipeline传输完毕,master选择其中一个chunkserver作为primary,指导其他secondaries以一致的顺序落盘。

  • hdfs中,client按照namenode给出的datanode顺序建立pipeline,数据以通过pipeline的顺序落盘,简单直接。


从设计到实现:HDFS复制流程细节 | 分布式文件系统读书笔记(3)

图中标号为下述步骤标号,列举代码基于为hadoop 3.1.2:

https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.1.2/hadoop-3.1.2-src.tar.gz

流程

1. client向nn申请添加块(block)

org.apache.hadoop.hdfs.DFSOutputStream, line: 1082

dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock, excludedNodes, fileId, favoredNodes, allocFlags);

其中,src是该归属模块的文件名。

该函数返回LocatedBlock给client,里面存有一组dn信息,nn建议client后续块存放到上述dn节点。

2. client向dn0发送set up消息尝试建立pipeline

org.apache.hadoop.hdfs.DataStreamer, line: 1753

new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, (targetPinnings != null && targetPinnings[0]), targetPinnings, nodeStorageIDs[0], nodeStorageIDs);

其中,nodes是第1步中nn返回的dn信息,out是client与dn0的输出流。

该函数执行完后,client会通过rpc向dn0发送set up消息,同时把dn0之后的其他dn节点信息作为参数传递,供下游dn建立后续pipeline。

3. dn0收到client发送的set up消息后,继续向下游dn发送set up消息建立pipeline

org.apache.hadoop.hdfs.server.datanode.DataXceiver, line: 821

new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], blockToken, clientname, targets, targetStorageTypes, srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, cachingStrategy, allowLazyPersist, false, targetPinnings, targetStorageId, targetStorageIds);

可以看到这段代码与client的代码非常相似,主要的区别

  • mirrorOut为该dn与下游dn的输出流

  • 传递给下游dn的节点列表会减少一个节点(本节点),节点列表减为0后,pipeline完成。

4. 下游dn pipeline建立成功后向上游dn返回成功ack

org.apache.hadoop.hdfs.server.datanode.DataXceiver, line: 880

BlockOpResponseProto.newBuilder() .setStatus(mirrorInStatus) .setFirstBadLink(firstBadLink) .build() .writeDelimitedTo(replyOut);

replyOut为下游dn到上游dn的输出流。

5. set up ack返回到client

org.apache.hadoop.hdfs.DataStreamer, line: 1761

BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(blockReplyStream));

client检查返回状态中没有异常,则认为pipeline建立成功。

6. pipeline建立成功后,client开始向dn0发送packet数据

创建packet

org.apache.hadoop.hdfs.DFSOutputStream, line: 445~447

currentPacket.writeChecksum(checksum, ckoff, cklen);currentPacket.writeData(buffer, len);currentPacket.incNumChunks();

发送packet消息

org.apache.hadoop.hdfs.DataStreamer, line: 664, 774

DFSPacket one;...one.writeTo(blockStream);

7. dn接收packet,并镜像给下游dn

接收packet

org.apache.hadoop.hdfs.server.datanode.BlockReceiver, line: 528

 
   
   
 
packetReceiver.receiveNextPacket(in);

packet给下游镜像

org.apache.hadoop.hdfs.server.datanode.BlockReceiver, line: 588

 
   
   
 
packetReceiver.mirrorPacketTo(mirrorOut);

8. 向上游dn返回ack

org.apache.hadoop.hdfs.server.datanode.BlockReceiver, line: 1481

sendAckUpstream(ack, expected, totalAckTimeNanos, (pkt != null ? pkt.offsetInBlock : 0), PipelineAck.combineHeader(datanode.getECN(), myStatus));

后续dn重复相同操作,直到最后一个dn。

9. client收到所有packet ack,确认没有失败

org.apache.hadoop.hdfs.DFSOutputStream, line: 1086, 1108, 1124~1128

ack.readFields(blockReplyStream);...final Status reply = PipelineAck.getStatusFromHeader(ack .getHeaderFlag(i));...if (reply != SUCCESS) { errorState.setBadNodeIndex(i); // mark bad datanode throw new IOException("Bad response " + reply + " for " + block + " from datanode " + targets[i]);}

10. client发送close消息,关闭pipeline

构造close消息

org.apache.hadoop.hdfs.DFSOutputStream, line: 499

currentPacket = createPacket(0, 0, getStreamer().getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), true);

close消息与普通数据消息格式一致,只是数据长度为0,最后一个lastPacketInBlock参数为true

11. 各dn依次收到close消息完成(finalize)块,关闭输入输出流

完成块

org.apache.hadoop.hdfs.server.datanode.BlockReceiver, line: 1477

 
   
   
 

finalizeBlock(startTime);

关闭输入流

org.apache.hadoop.hdfs.server.datanode.DataXceiver, line: 928~932

IOUtils.closeStream(mirrorOut);IOUtils.closeStream(mirrorIn);IOUtils.closeStream(replyOut);IOUtils.closeSocket(mirrorSock);IOUtils.closeStream(blockReceiver);

块复制完成

一些遗留问题

  • 在图1的描述中pipeline close后还会向nn发送一个消息。这过程在pipeline的关闭代码中还未看到,不知是否在块上报流程中?待后续确认,也欢迎大家给予指正。

  • 写异常恢复流程本篇没有涉及,后续讨论。

  • append流程本篇没有涉及,待学习。

  • 块落盘及块状态迁移?finalize?本篇未涉及,待学习。

  • 数据损坏副本补齐流程,待学习。


本篇摘录了hdfs论文中描述的写入流程关键代码,并附带了行号,方便日后再次翻看。

虽然关键代码已被记录,方便代码库中翻查。但要想在大脑中形成运行态模型,依然十分困难,因为关键逻辑散落多个类中,并运行在多个进程/线程中,这些因素对最本质逻辑依然产生很大干扰。

因此,下篇会将本篇的关键逻辑(主要是client代码)进一步抽离到一个独立类中,并以顺序方式执行,最终将呈现出一个纯粹的写入流程。

(to be continued...)



「从设计到实现」系列文章:





 

大数据时代的思考和洞察

长按二维码关注

以上是关于从设计到实现:HDFS复制流程细节 | 分布式文件系统读书笔记的主要内容,如果未能解决你的问题,请参考以下文章

从设计到实现:GFS和HDFS在复制上的设计及异同比较 | 分布式文件系统读书笔记

HDFS特点:

HDFS设计思想元数据简单JAVAAPI操作HDFS

HDFS设计思想元数据简单JAVAAPI操作HDFS

资源 | 大数据Hadoop入门到实战视频教程

Hadoop之HDFS架构设计