HDFS读写机制剖析
Posted 同程艺龙技术中心
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS读写机制剖析相关的知识,希望对你有一定的参考价值。
0x01简介
HDFS是一个分布式文件系统,在HDFS上写文件的过程与我们平时使用的单机文件系统非常不同。传统的文件系统是允许对一个文件并发写入的,只是如果不同步的话,文件内容会乱掉。而HDFS不允许并发写,并且通过文件租约机制保障这一点。下面我们来结合代码讲述下HDFS主要的读写过程。
0x02
读数据机制
DistributedFileSystem.open(path) 将创建一个FSDataInputStream对象,分装的是DFSClient.open产生的DFSInputStream对象。构造DFSInputStream对象的时候,会连接Namenode,进行初始化文件Block的信息。在初始化文件Block信息的时候,namenode返回所有blocks的信息,且根据client所处的物理位置进行排序。下图准确描述了这个过程[1]:
如果返回的blocks,最后一个block未正确关闭,即出现了locations不存在,但block的len不为0的情况,则会进行有限次的重试。如果是其他情况的未关闭,则会从block里面记录的locations对应的datanode获取block的实际写入大小,调用的是getReplicaVisibleLength(ExtendedBlock)方法。如果这个时候Datanode计入的generationStamp比Namenode当前的小,将选择下个datanode。获取失败,则抛出IOException,读取以失败告终。与此相关的类图如下:
根据调用的Read方法,dfsClient有不同的表现,分为从头读取和跳转读取。从头读基于ReaderStrategy执行doRead方法读取数据。目前有两种ReaderStrategy:ByteArrayStrategy和ByteBufferStrategy。这种情况的其他流程如下:
根据当前记录的position,查找文件处于哪个block。连接到该block对应的datanode上创建Peer,再构造BlockReader对象。BlockReader里面构造了一个Sender对象,发送BlockReaderOperation在Datanode的DataXceiver线程中创建一个发送数据的BlockSender对象。
BlockReader对象以chunkSize获取BlockSender发送的数据及checkSum。发送的数据不大于block可读取的量。
如果这个过程中遇到非token、key问题IOException,会将datanode加入到deadNodes中,再选取别的datanode重试。
跳转读的时候,根据dfs.client.hedged.read.threadpool.size的值是否为0,会有“Hedged Read”与常规Read。Hedged Read主要使用了ExecutorCompletionService,解决部分datanode读写慢而拖累client的问题。其他流程与别的读取只有文件起始位置区别。
0x03
写数据机制
相对于读的 FSDataInputStream, 写的时候使用的是DFSOutputStream。 FileSystem.create(Path) 后,生成一个FSDataOutputStream对象,该对象wrap了一个由DFSClient生成的DFSOutputStream对象。
为提高写的效率,客户端写的数据会缓存起来再批次发送。数据会以一个个数据包(DFSPacket)的形式发送,每个数据包默认是64K。每个数据包又是以一个个数据块(chunk)组成的,每个数据块是512字节,并且有校验和确保数据正确性。默认的校验(dfs.checksum.type,默认CRC32C)需要占用4个字节(见DataChecksum类),也就是说每个chunk实际占用516个字节。一个packet最多存储65536/516=127个chunk。所以,一个packet的实际大小只有127*516=65532字节,其中只有65024个字节是真正的数据。这个计算逻辑见DFSOutputStream:
1private void computePacketChunkSize(int psize, int csize) {
2 int chunkSize = csize + checksum.getChecksumSize();
3 chunksPerPacket = Math.max(psize/chunkSize, 1);
4 packetSize = chunkSize*chunksPerPacket;
5 if (DFSClient.LOG.isDebugEnabled()) {
6 DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
7 ", chunkSize=" + chunkSize +
8 ", chunksPerPacket=" + chunksPerPacket +
9 ", packetSize=" + packetSize);
10 }
11}
参与写的datanode内部维护了两个队列,dataQueue和ackQueue。从字面来理解就是,一个作为数据的发送队列,一个作为相应ack的队列。下面的场景就是典型的生产者和消费者模式。当前要发送的数据积累到一个packet的时候,会放入dataQueue队列中,消费者datastreamer 线程从dataqueue里把数据拉出来,发送到三个副本所在节点组成的data pipline中的第一个datanode, 将packet从dataQueue中移出,放入ackQueue里面。ackQueue对应的消费者responseProcessor会将成功收到ack的packet从ackQueue中移出,标志一个packet发送的完成。如果发送过程中出现错误,所有未收到ack的packet将会被移出ackQueue,重新加入dataQueue,等待再次被发送。主要过程如图:
当一个block被写满的时候,dfsClient需要向namenode申请新的block, 通过调用addBlock方式来实现。申请block的时候要带上上一个block的信息,因为namenode需要维护同一个文件的不同block之间的顺序关系。同时还可以提供下一个block所在节点偏向,excludeNodes和favoredNode。
0x04
总结
本文大致讲述了HDFS 文件读写的处理流程和细节。由于HDFS的设计理念是design for failure,所以在实际生产运用中当HDFS面对网络和节点错误的情况下也能够保证数据写入的持久性和一致性, 对于一些常见异常模式都有与之对应的恢复过程,如lease recovery、block recovery、 pipeline recovery。这些恢复过程由于涉及到Replica状态较多,我们会在后续的一些文章中继续探讨。
[1] Tom White. Hadoop The Definitive Guide 4th Edition.
[2] http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign
以上是关于HDFS读写机制剖析的主要内容,如果未能解决你的问题,请参考以下文章