从DFSOutputStream的pipeline写机制到Streamer线程泄漏问题
Posted Android路上的人
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从DFSOutputStream的pipeline写机制到Streamer线程泄漏问题相关的知识,希望对你有一定的参考价值。
前言
之前一段时间写了篇文章DataNode数据处理中心DataXceiver从大的方向了解了下datanode读写操作的过程.但是并没有具体细粒度的去关注读写操作中的细节以及可能存在的问题,本篇文章算是对这方面的一个补充吧.尽管本文所涉及的范围面看起来很窄,但是所呈现出来的结果一定会让你有所收获的.
DFSOutputStream写数据以及周边相关类,变量
本文主要阐述的datanode写数据的过程,而写数据过程中,第一个联系到的就是DFSOutputStream对象类.但其实这只是其中的一个大类,内部还包括了与其内部对象类的各种交互,协同的操作.下面花简短的篇幅介绍这几个类.
DataStreamer
数据流类,这是数据写操作时调用的主要类,DFSOutputStream的start()方法调用的就是dataStreamer的线程run方法,DFSOutputStream的主操作都是依靠内部对象类dataStreamer完成实现,可以说,这二者的联系最为紧密.
ResponseProcessor
ResponseProcessor类是DataStreamer中的内部类,主要作用是接收pipeline中datanode的ack回复,它是一个线程类.给出源码中的注释:
//
// Processes responses from the datanodes. A packet is removed
// from the ackQueue when its response arrives.
//
DFSPacket
数据包类,在DataStreamer和DFSOutputStream中都是用的这个类进行数据的传输的,给出源码中的注释:
/****************************************************************
* DFSPacket is used by DataStreamer and DFSOutputStream.
* DFSOutputStream generates packets and then ask DatStreamer
* to send them to datanodes.
****************************************************************/
除了以上3个大类需要了解之外,还有几个变量同样需要重视,因为这些变量会在后面的分析中经常出现.
1.dataQueue(List<DFSPacket>)
待发送数据包列表
2.ackQueue(List<DFSPacket>)
数据包回复列表,数据包发送成功后,dfsPacket将会从dataQueue移到ackQueue中.
3.pipeline
pipeline是一个经常看见的名词,中文翻译的意思是"管道",但是这个词我在网上也搜了相关的更好的解释,稍稍比较好理解的方式是"流水线模型",也有些人把它与设计模式中的责任链模式相挂钩,所以这个词用中文翻译总是不能很好的表达他的原意,在后面的篇幅中还会继续提到.
DataStreamer数据流对象
了解写数据的具体细节,需要首先了解DataStreamer的实现机理,因为DFSOutputStream的主操作无非是调用了dataStreamer的内部方法.DataStreamer源码中的注释很好的解释了DataStreamer所做的事,学习DataStreamer可以从阅读他的注释开始.
/*********************************************************************
*
* The DataStreamer class is responsible for sending data packets to the
* datanodes in the pipeline. It retrieves a new blockid and block locations
* from the namenode, and starts streaming packets to the pipeline of
* Datanodes. Every packet has a sequence number associated with
* it. When all the packets for a block are sent out and acks for each
* if them are received, the DataStreamer closes the current block.
*
* The DataStreamer thread picks up packets from the dataQueue, sends it to
* the first datanode in the pipeline and moves it from the dataQueue to the
* ackQueue. The ResponseProcessor receives acks from the datanodes. When an
* successful ack for a packet is received from all datanodes, the
* ResponseProcessor removes the corresponding packet from the ackQueue.
*
* In case of error, all outstanding packets are moved from ackQueue. A new
* pipeline is setup by eliminating the bad datanode from the original
* pipeline. The DataStreamer now starts sending packets from the dataQueue.
*
*********************************************************************/
如果看不懂这么多的英文,没有关系,我特地对其进行了翻译,帮助大家理解:
DataStreamer对象类是负责发送data packets数据包到pipeline中的各个datanode中.
它会从namenode中寻求一个新的blockId和block的位置信息,然后开始以流式的方式在pipeline
的datanode中进行packet数据包的传输.每个包有属于它自己的一个数字序列号.当属于一个block
块的所有的数据包发生完毕并且对应的ack回复都被接收到了, 则表明此次的block写入完成,dataStreamer
将会关闭当前block块.
DataStreamer线程从dataQueue中选取packets数据包,发送此数据包给pipeline中的首个datanode,
然后移动此数据从dataQueue列表到ackQueue.ResponseProcessor会从各个datanode中接收ack回复.
当对于一个packet的成功的ack回复被所有的datanode接收到了,ResponseProcessor将会从ackQueue列
表中移除相应的packet包.
当出现错误的时候,所有的未完成的packet数据包将会从ackQueue中移除掉.一个新的
pipeline会被重新建立,新建立的pipeline会除掉坏的datanode.DataStreamer会从dataQueue
中重新发送数据包
OK,读完官方注释,想必或多或少已经对其中的机理有所了解.下图是我做的一张结构简图:
这张图对应的程序逻辑在run()方法中,首先在while循环中会获取一个dataPacket数据包:
one = dataQueue.getFirst(); // regular data packet
然后在接下来的操作中会出现packet的转移
// send the packet
SpanId spanId = SpanId.INVALID;
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
if (scope != null) {
spanId = scope.getSpanId();
scope.detach();
one.setTraceScope(scope);
}
scope = null;
dataQueue.removeFirst();
ackQueue.addLast(one);
dataQueue.notifyAll();
}
}
然后发送数据到远程datanode节点
// write out data to remote datanode
try (TraceScope ignored = dfsClient.getTracer().
newScope("DataStreamer#writeTo", spanId)) {
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
...
dataStreamer发送完数据包之后,responseProcessor进程会收到来自datanode的ack回复,如果对于一个block块,收到了pipeline中datanode所有的ack回复信息,则代表这个block块发送完成了.pipeline的datanode的构建分为2种情形,代表着2种情形的数据传输
BlockConstructionStage.PIPELINE_SETUP_CREATE
BlockConstructionStage.PIPELINE_SETUP_APPEND
第一种情况在新分配块的时候进行的,从namenode上获取新的blockId和位置,然后连接上第一个datanode.
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
if (LOG.isDebugEnabled()) {
LOG.debug("Allocating new block: " + this);
}
setPipeline(nextBlockOutputStream());
initDataStreaming();
}
/**
* Open a DataStreamer to a DataNode so that it can be written to.
* This happens when a file is created and each time a new block is allocated.
* Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes.
*/
protected LocatedBlock nextBlockOutputStream() throws IOException {
...
//
// Connect to first DataNode in the list.
//
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
...
pipeline的第一阶段可以用下图表示
以上是关于从DFSOutputStream的pipeline写机制到Streamer线程泄漏问题的主要内容,如果未能解决你的问题,请参考以下文章
从 sklearn.pipeline.Pipeline 获取转换器结果
从 FeatureUnion + Pipeline 中获取功能名称