浅谈HDFS写数据流程的核心架构设计(上)

Posted 数据之其然

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅谈HDFS写数据流程的核心架构设计(上)相关的知识,希望对你有一定的参考价值。

读源码至今发现,HDFS的源码写得不好,在学习HDFS过程中,所以我们在需要理解写数据流程的架构设计,另为什么要建立数据管理,这是我们需要去思考的问题,从上一节我们梳理就不再概述。


接下来我们继续深入去阅读源码,在读源代码之前,可以先了解写数据流程的核心架构如下图所示:


思考

假如多个客户端同时要并发的写Hadoop HDFS上的一个文件,这个事儿能成吗?明显不可以接受啊,因为HDFS上的文件是不允许并发写的,比如并发的追加一些数据什么。所以HDFS里有一个机制,叫做文件契约机制


也就是说,同一时间只能有一个客户端获取NameNode上面一个文件的契约,然后才可以向获取契约的文件写入数据

此时如果其他客户端尝试获取文件契约的时候,就获取不到,只能干等着。

通过这个机制,可以保证同一时间只有一个客户端在写一个文件。


在获取到了文件契约之后,在写文件的过程期间,那个客户端需要开启一个线程,不停的发送请求给NameNode进行文件续约,告诉NameNode:


NameNode大哥,我还在写文件啊,你给我一直保留那个契约好吗?而NameNode内部有一个专门的后台线程,负责监控各个契约的续约时间。

如果某个契约很长时间没续约了,此时就自动过期掉这个契约,让别的客户端来写。


接下来我们可以围绕这几个问题去阅读源码,比如:

  1. 创建文件

  2. 创建契约

  3. 启动了DataStramer线程

  4. 开启了续约

  5. 契约的检查

  6. 创建packet

  7. 申请Block

  8. 建立数据管道

  9. ResponseProcessor线程

  10. PacketResponder线程

  11. 如果在写数据的过程出问题了?怎么对应的处理方式是什么呢?


在读源码,我们可以围绕这几个源码类去阅读查看,比如:

EditLogTailer、FSImage、FSImageLoader、FSDirAclOp

EditLogFileInputStream、GetJournalEditServlet

StandbyCheckpointer、TransferFsImage、FsckServlet

DistributedFileSystem、DFSOutputStream

FSNamesystem、LeaseManager

DFSOutputStream、LeaseRenewer、HdfsDataInputStream

FSOutputSummer、LocatedBlock、Sender

DataXceiver、DataXceiverServer、BlockReader、Receiver



比如,我们去阅读EditLogTailer源代码类我们需要了解它是什么类,做了什么事情?

其实,EditLogTailer是一个后台线程,启动了以后会周期性的去journalnode集群上面去读取元数据日志,然后再把这些元数据日志应用到自己的元数据里面(内存+磁盘)


它还会加载当前自己的元数据日志,另通过StandByNamenoe 获取当前的元数据日志的最后一条日志的事务ID是多少


其实还有一个重要的代码,比如:需要去journlanode上面去读取元数据,由于现在的事务id是1000,所以需要到journlanode上面去读取

 注意:读日志的时候,只需要去读取 1001后面的日志就可以。


 另外还有一个参数是用来获取Journalnode获取日志的流:

streams = editLog.selectInputStreams(lastTxnId + 10, null, false)


当它去Journalnode加载日志的代码逻辑用这样写的

editsLoaded = image.loadEdits(streams, namesystem);


还有一点需要说明一下,每隔60秒 StandByNameNode 会去Journalnode获取一下日志

浅谈HDFS写数据流程的核心架构设计(上)



我们继续阅读源代码EditLogFileInputStream类,它的log是URLLog,通过getInpustream()方法找到的。它这个源码代码类也用到设计模式叫做装饰者模式

浅谈HDFS写数据流程的核心架构设计(上)

它读取日志通过reader读取的

浅谈HDFS写数据流程的核心架构设计(上)


它还创建了HttpURLConnetcion,如果我们这儿发送的是HTTP的请求,读取的Journalndoe那儿的日志,说明journalndoe启动起来的时候肯定会有一个JournalnodeHttpServer

NameNode
DataNode
JournalNode
NameNodeRpcServer
RpcServer
JournalnodeRpcServer
NameNodeHttperServer
Httpserver
JournalnodeRpcServer
 @Override public InputStream getInputStream() throws IOException { return SecurityUtil.doAsCurrentUser( new PrivilegedExceptionAction<InputStream>() { @Override public InputStream run() throws IOException { HttpURLConnection connection;              try { //NameNode: NameNodeRpcServer NameNodeHttperServer //DataNode: RpcServer Httpserver //JournalNode: JournalnodeRpcServer JournalnodeHttpserver //TODO 真相大白,我们创建了一个HttpURLConnection对象 connection = (HttpURLConnection) connectionFactory.openConnection(url, isSpnegoEnabled); } catch (AuthenticationException e) { throw new IOException(e);              }      

它是通过对象来获取输入流

浅谈HDFS写数据流程的核心架构设计(上)


我们继续读源代码GetJournalEditServlet类,是通过journalndoe来读取数据流。

还有一个流对拷(editFileIn),这个输入流读取的是journalnode这儿的日志

 TransferFsImage.copyFileToStream(response.getOutputStream(), editFile,          editFileInthrottler);


我们继续阅读源代码StandbyCheckpointer类,需要理解StandbyCheckpointer 是一个运行在standBynamenode上的一个线程。他会周期性的对命名空间做checkpoint的操作(说白了就是把内存里面目录树的信息持久化到磁盘上面)并且会把这个份数据上传到active namenode(用来替换 active namednoe上面的fsimage)


Checkpointer的流程是这样的,我们会看到doCheckpoint(),点击进去查看

会把内存的数据写到磁盘上面,它的写方式使用的是异步线程

 //开启了一个异步的线程 ExecutorService executor = Executors.newSingleThreadExecutor(uploadThreadFactory); Future<Void> upload = executor.submit(new Callable<Void>() { @Override public Void call() throws IOException { //这个操作就要把刚刚从内存里面的元数据持持久化到磁盘上面的 那个份数据 //上传到 active的namenode上面去。 TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem.getFSImage().getStorage(), imageType, txid, canceler); return null; } });

说明:有个uploadImageFromStorage方法,它会替换全量元数据,数据量很大,可能是几十g


由于是线程,所以肯定有run方法,另外它会做判断每隔60检查以下是否需要做checkpoint


我们还需要了解满足checkpoint两个条件。

第一,比如如:数量 10000,我们上一次checkpoint 现在最新的数据差了多少数据?或者说大概的意思就是说我们现在有多少条日志没有checkpoint了。

第二,当前时间 - 上一次checkpoint的时间。说白了这个变量代表的意思就是 已经有多久没有做checkpoint了。

 final long now = monotonicNow();          //TODO checkpoint条件一 数量 10000 final long uncheckpointed = countUncheckpointedTxns(); //TODO checkpoint条件二          //当前时间 - 上一次checkpoint的时间。 final long secsSinceLast = (now - lastCheckpointTime) / 1000;

假如:

在条件一,如果距离上一次做checkpoint超过100万条日志没有做checkpoint,那么就需要做一次

条件二:如果超过一个小时没有做checkpoint了,那么也需要做一次

当然,满足条件,它也会执行checkpoint



我们在读源码,发现一个问题:在HDFS驱动场景里,比如数据量比较大

他就用httphttpserver,它是通过standBynamenode到journalnode做同步的日志操作;当然也可以通过standBYnamenode到activenamenode同步的日志操作,但是,这样可能有好几个g


在HDFS驱动场景里,比如数据量比较小,他就用rpcserver,它是通过datanode到namenode带过去的数据信息很小;还有另一种方式,可以通过namenode到journalnode。


阅读源代码TransferFsImage类,它是把输入流写到数输出流,发现

做一个上传image的操作,需要传参有6个参数:

uploadImage(url, conf, storage, nnf, txid, canceler);


它是通过http方式获取的流,不断读自己的数据

 OutputStream output = connection.getOutputStream(); //输入流肯定是自己这儿的,不断读自己的数据 FileInputStream input = new FileInputStream(imageFile);

我们还需要了解流对烤,就是通过数据网output 输出流里面去写

 try { //一个流对烤把数据网output 输出流里面去写。 copyFileToStream(output, imageFile, input, ImageServlet.getThrottler(conf), canceler); } finally { IOUtils.closeStream(input); IOUtils.closeStream(output); }


知其所以然、知其所以必然,知其然而不知其所以然;蒙惠者虽知其然,而未必知其所以然;也这是我们从学习实践中得出的深切体会!分享完毕,谢谢!


以上是关于浅谈HDFS写数据流程的核心架构设计(上)的主要内容,如果未能解决你的问题,请参考以下文章

浅谈HDFS的写流程

浅谈HDFS源码的启动与心跳流程

Hbase

大数据架构设计与数据计算流程

浅谈三层

浅谈大数据HDFS架构演变的来世今生