HDFS RPC 性能分析
Posted 民生运维人
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS RPC 性能分析相关的知识,希望对你有一定的参考价值。
1 前言
HDFS诞生于2006年,以其高并发、高吞吐和PB级的存储能力成为大数据生态的基础。谈到HDFS的高吞吐能力,必须要聊一下HDFS的RPC,本文将带大家来探讨一下HDFS的RPC处理机制和性能优化分析。
2 NameNode处理客户端读写请求
2.1 RPC Server的实现
NameNode作为RPC的服务端,主要通过NameNodeRPCServer实例来接收并处理RPC请求,在NameNode的启动的过程对其进行初始化,并指定NameNode与客户端的通信接口协议、socket信息以及最大连接数(handlerCount)等,HDFS的RPC通信基于protobuf消息格式,可以提升数据的传输效率。
this.clientRpcServer = new RPC.Builder(conf)
.setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService).setBindAddress(bindHost)
.setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager()).build();
NameNodeRpcServer继承自RPC.Server类,该类是一个典型的RPC Server端的实现,在初始化时会启动handlerCount个线程来响应RPC连接请求:
public synchronized void start() {
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start();
}
}
RPC.Server启动后持续监听来自客户端的RPC请求并放入callQueue队列中,handler线程则不断从callQueue中提取RPC请求来执行具体的处理逻辑。
private class Handler extends Thread {
public Handler(int instanceNumber) {
this.setDaemon(true);
this.setName("IPC Server handler "+ instanceNumber + " on " + port);
}
@Override
public void run() {
LOG.debug(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
ByteArrayOutputStream buf =
new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
while (running) {
TraceScope traceScope = null;
try {
final Call call = callQueue.take(); // pop the queue; maybe blocked here
... ...
2.2 HDFS写入流程
客户端对NameNode的RPC请求主要用于对HDFS文件进行数据读写,下面我们从NameNode RPC负载的角度来分析一次HDFS文件的写入流程:
1. 客户端通过NameNodeRPCServer申请Block
1)NameNode为客户端提供可以用于写入该Block的目标DataNode列表
2)创建Block对象
2. 更新NameNode内存中的blocksMap和文件INode信息。
3. 加同步锁,更新txid,并将editlog信息写入内存缓冲池,然后释放同步锁。
private long beginTransaction() {
synchronized (this) {
...
// wait if an automatic sync is scheduled
waitIfAutoSyncScheduled();
long start = beginTransaction();
op.setTransactionId(txid);
try {
editLogStream.write(op);
} catch (IOException ex) {
// All journals failed, it is handled in logSync.
} finally {
op.reset();
}
... ...
}
4. 将editLog信息刷写到本地磁盘和JournalNode节点上的editLog文件。
5. 创建数据传输链路,并将数据通过RPC调用发送至各个目标DataNode
2.3 性能优化分析
从以上流程来看,NameNode的RPC性能优化可以参考如下几点:
1. 减少小文件写入
我们发现HDFS写入过程其实是一个比较长的链路,其中加全局同步锁、editLog磁盘刷写以及网络数据传输这些步骤都不是非常高效的,HDFS的数据写入并不适合用于频繁写入小文件,因此优化应用端代码逻辑,从而减少不必要的小文件写入将有利于提升整个HDFS集群的处理性能。
2. 调整RPC连接数
如果集群的RPC请求处理比较繁忙,应适当调大RPC服务端的最大连接数,避免大量的连接等待。HDFS中可以通过dfs.namenode.handler.count参数进行配置,社区对于该参数值的建议是20*log2(clustersize),具体的数值也要根据实际集群的运行情况具体设置。
3. 关注editlog写入性能
1)针对本地磁盘上的editlog写入,要尽量为editlog分配的独立的数据盘,避免磁盘IO压力大导致写入性能受到影响。
2)针对向JournalNode节点的editlog并发异步写入性能:
-
同样,需要为journalnode分配较为独立的数据盘,如果将写入editlog的数据盘和datanode读写数据盘混用则可能造成editlog写入缓慢,导致editlog写入JournalNode缓慢甚至超时。
-
JournalNode的JVM堆内存设置要合理,避免频繁GC造成响应缓慢。
-
参考滴滴公司的editlog异步写入的特性:
Hadoop社区从2.8.3开始引入了异步editlog写入功能(HDFS-7964),通过配置dfs.namenode.edits.asynclogging 为true来实现editlog的异步写入。
考虑极端情况下如果NameNode发送异常crash的时候正在进行数据块的写入,但是由于editlog写入性能较差导致尚未写入所有的操作记录,而此时有些block已经成功写到DataNode上的情况,则可能出现在NameNode重新启动时发现这些block在DataNode中有而在NameNode上没有,但是HDFS的Block汇报机制可以保证在后续的汇报处理中发现如果该DataNode的写入状态为FINALIZED,则可以将该block加入到NameNode的blocksmap中,因此笔者认为该方案理论上来讲可以保证block信息的最终一致性,针对需要进一步进行RPC性能调优的情况下可以考虑开启此选项。
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
&& (storedBlock.findStorageInfo(storageInfo) == -1 ||
corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
toAdd.add(storedBlock);
}
3 NameNode响应DataNode RPC请求
NameNode除了响应来自客户端的RPC请求,还需要处理DataNode向NameNode发送的心跳和Block汇报等来自服务器端的RPC请求。
1. 定期心跳发送
DataNode定期(默认3秒)向NameNode发送心跳信息一方面是为了向NameNode报告DataNode的状态信息,另一方面NameNode通过响应DataNode的心跳来向DataNode下达操作命令。
2. Block增量和全量汇报
DataNode定期向NameNode进行全量块汇报(默认6小时)和增量块汇报(默认300秒),来帮助NameNode构建完整的BlocksMap信息。
因此,DataNode向NameNode发送RPC请求的性能优化可以从以下几个方面来考虑:
1. RPC拆分
由于整个集群的DataNode会频繁地向NameNode进行心跳反馈,如果集群规模比较大,Block数比较多的情况下,NameNode的处理压力将随之增大。如果进行了RPC拆分,则在NameNode启动时还会再启动一个用于处理服务器端RPC请求的NameNodeRPCServer实例serviceRpcServer,serviceRpcServer可以有效分流NameNode的RPC压力:
void start() {
clientRpcServer.start();
if (serviceRpcServer != null) {
serviceRpcServer.start();
}
}
}
2. 适当降低块汇报的频率
可以通过适当拉长数据写入后进行汇报的时间间隔来减轻NameNode处理块汇报的压力,HDFS默认是在Block写入后立刻向NameNode进行汇报,可以通过配置参数:dfs.blockreport.incremental.intervalMsec 来实现延迟增量块汇报,参考 HDFS-9710,社区版本从2.9.0开始引入该特性。
4 DataNode响应客户端的RPC请求
DataNode所接收到的RPC请求则主要来自NameNode的操作指令和数据节点间的数据传输请求。
在DataNode启动过程中将会启动ipcServer用于创建DataNode与客户端之间的socket连接,并启动DataXceiverServer线程来不断的获取socket上的RPC请求,如果当前的处理线程数小于maxXceiverCount则启动一个DataXceiver线程来处理Block的读写、复制等操作。
while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
try {
peer = peerServer.accept();
// Make sure the xceiver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > maxXceiverCount) {
throw new IOException("Xceiver count " + curXceiverCount
+ " exceeds the limit of concurrent xcievers: "
+ maxXceiverCount);
}
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
}
DataNode响应客户端数据读写RPC请求的逻辑架构图如下:
综上,从DataNode作为RPC Server端的角度来看可以从如下几点来考虑RPC性能的优化:
-
合理分配DataNode的JVM堆内存大小,避免频繁产生GC;并且需要关注DataNode所在服务器的计算资源和网络带宽的利用率,确保DataNode能够正常响应数据读写请求。 -
合理配置DataNode的handler数和dataxceiver线程数最大值,如果配置的数值较小,则可能导致写入请求延迟的问题。
5 HDFS联邦
随着集群文件和Block对象的不断增长,NameNode必然将面临扩展性问题,通过HDFS联邦机制可以将NameNode划分为多个namespace,并由各个namespace分别管理一部分元数据,从而降低单个NameNode的处理压力。
但是采用联邦也带来了集群管理上的负担,并且只能暂时缓解文件数快速增长所产生的集群压力,因此我们需要继续关注集群所管理的文件对象数,持续进行数据周期管理,并通过应用层面的优化来避免在HDFS上写入大量小文件。
6 OZone解决方案
为了解决HDFS对于集群文件规模的限制,apache社区于2018年推出了ozone项目(https://hadoop.apache.org/ozone),并于今年3月份推出0.5.0-beta版本,ozone是基于对象存储的的架构设计,可管理到上百亿的数据对象,兼容当前的HDFS RPC协议,上层应用基本无需修改,同时也可以支持k8s作为持久化存储,我们后续将关注该项目的进展。
作者简介:
焦媛,2011年加入民生银行,现负责Hadoop平台运维和工具开发,HDFS和Spark的技术支持及源码研究工作。
以上是关于HDFS RPC 性能分析的主要内容,如果未能解决你的问题,请参考以下文章