HDFS RPC 性能分析

Posted 民生运维人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS RPC 性能分析相关的知识,希望对你有一定的参考价值。

1 前言

HDFS诞生于2006年,以其高并发、高吞吐和PB级的存储能力成为大数据生态的基础。谈到HDFS的高吞吐能力,必须要聊一下HDFS的RPC,本文将带大家来探讨一下HDFS的RPC处理机制和性能优化分析。

2 NameNode处理客户端读写请求

2.1 RPC Server的实现

NameNode作为RPC的服务端,主要通过NameNodeRPCServer实例来接收并处理RPC请求,在NameNode的启动的过程对其进行初始化,并指定NameNode与客户端的通信接口协议、socket信息以及最大连接数(handlerCount)等,HDFS的RPC通信基于protobuf消息格式,可以提升数据的传输效率。

this.clientRpcServer = new RPC.Builder(conf)
    .setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
    .setInstance(clientNNPbService).setBindAddress(bindHost)
    .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount)
    .setVerbose(false)
    .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();

NameNodeRpcServer继承自RPC.Server类,该类是一个典型的RPC Server端的实现,在初始化时会启动handlerCount个线程来响应RPC连接请求:

public synchronized void start() {
  responder.start();
  listener.start();
  handlers = new Handler[handlerCount];
  for (int i = 0; i < handlerCount; i++) {
    handlers[i] = new Handler(i);
    handlers[i].start();
  }
}

RPC.Server启动后持续监听来自客户端的RPC请求并放入callQueue队列中,handler线程则不断从callQueue中提取RPC请求来执行具体的处理逻辑。

private class Handler extends Thread {
    public Handler(int instanceNumber) {
      this.setDaemon(true);
      this.setName("IPC Server handler "+ instanceNumber + " on " + port);
    }

    @Override
    public void run() {
      LOG.debug(Thread.currentThread().getName() + ": starting");
      SERVER.set(Server.this);
      ByteArrayOutputStream buf = 
        new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
      while (running) {
        TraceScope traceScope = null;
        try {
          final Call call = callQueue.take(); // pop the queue; maybe blocked here
    
    ... ...

2.2 HDFS写入流程

客户端对NameNode的RPC请求主要用于对HDFS文件进行数据读写,下面我们从NameNode RPC负载的角度来分析一次HDFS文件的写入流程:

1. 客户端通过NameNodeRPCServer申请Block

1)NameNode为客户端提供可以用于写入该Block的目标DataNode列表

2)创建Block对象

2. 更新NameNode内存中的blocksMap和文件INode信息。

3. 加同步锁,更新txid,并将editlog信息写入内存缓冲池,然后释放同步锁。

private long beginTransaction() {
 synchronized (this) {
   ...
  // wait if an automatic sync is scheduled
  waitIfAutoSyncScheduled();
  long start = beginTransaction();
  op.setTransactionId(txid);
  try {
    editLogStream.write(op);
  } catch (IOException ex) {
    // All journals failed, it is handled in logSync.
  } finally {
    op.reset();
  }
  ... ...
}

4. 将editLog信息刷写到本地磁盘和JournalNode节点上的editLog文件。

5. 创建数据传输链路,并将数据通过RPC调用发送至各个目标DataNode

2.3 性能优化分析

从以上流程来看,NameNode的RPC性能优化可以参考如下几点:

1. 减少小文件写入

我们发现HDFS写入过程其实是一个比较长的链路,其中加全局同步锁、editLog磁盘刷写以及网络数据传输这些步骤都不是非常高效的,HDFS的数据写入并不适合用于频繁写入小文件,因此优化应用端代码逻辑,从而减少不必要的小文件写入将有利于提升整个HDFS集群的处理性能。

2. 调整RPC连接数

如果集群的RPC请求处理比较繁忙,应适当调大RPC服务端的最大连接数,避免大量的连接等待。HDFS中可以通过dfs.namenode.handler.count参数进行配置,社区对于该参数值的建议是20*log2(clustersize),具体的数值也要根据实际集群的运行情况具体设置。

3. 关注editlog写入性能

1)针对本地磁盘上的editlog写入,要尽量为editlog分配的独立的数据盘,避免磁盘IO压力大导致写入性能受到影响。

2)针对向JournalNode节点的editlog并发异步写入性能:

  • 同样,需要为journalnode分配较为独立的数据盘,如果将写入editlog的数据盘和datanode读写数据盘混用则可能造成editlog写入缓慢,导致editlog写入JournalNode缓慢甚至超时。

  • JournalNode的JVM堆内存设置要合理,避免频繁GC造成响应缓慢。

  • 参考滴滴公司的editlog异步写入的特性:

    Hadoop社区从2.8.3开始引入了异步editlog写入功能(HDFS-7964),通过配置dfs.namenode.edits.asynclogging 为true来实现editlog的异步写入。

    考虑极端情况下如果NameNode发送异常crash的时候正在进行数据块的写入,但是由于editlog写入性能较差导致尚未写入所有的操作记录,而此时有些block已经成功写到DataNode上的情况,则可能出现在NameNode重新启动时发现这些block在DataNode中有而在NameNode上没有,但是HDFS的Block汇报机制可以保证在后续的汇报处理中发现如果该DataNode的写入状态为FINALIZED,则可以将该block加入到NameNode的blocksmap中,因此笔者认为该方案理论上来讲可以保证block信息的最终一致性,针对需要进一步进行RPC性能调优的情况下可以考虑开启此选项。

// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
    && (storedBlock.findStorageInfo(storageInfo) == -1 ||
        corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
  toAdd.add(storedBlock);
}

3 NameNode响应DataNode RPC请求

NameNode除了响应来自客户端的RPC请求,还需要处理DataNode向NameNode发送的心跳和Block汇报等来自服务器端的RPC请求。

1. 定期心跳发送

DataNode定期(默认3秒)向NameNode发送心跳信息一方面是为了向NameNode报告DataNode的状态信息,另一方面NameNode通过响应DataNode的心跳来向DataNode下达操作命令。

2. Block增量和全量汇报

DataNode定期向NameNode进行全量块汇报(默认6小时)和增量块汇报(默认300秒),来帮助NameNode构建完整的BlocksMap信息。

因此,DataNode向NameNode发送RPC请求的性能优化可以从以下几个方面来考虑:

1. RPC拆分

由于整个集群的DataNode会频繁地向NameNode进行心跳反馈,如果集群规模比较大,Block数比较多的情况下,NameNode的处理压力将随之增大。如果进行了RPC拆分,则在NameNode启动时还会再启动一个用于处理服务器端RPC请求的NameNodeRPCServer实例serviceRpcServer,serviceRpcServer可以有效分流NameNode的RPC压力:

void start() {
  clientRpcServer.start();
  if (serviceRpcServer != null) {
    serviceRpcServer.start();      
  }
}
}

2. 适当降低块汇报的频率

可以通过适当拉长数据写入后进行汇报的时间间隔来减轻NameNode处理块汇报的压力,HDFS默认是在Block写入后立刻向NameNode进行汇报,可以通过配置参数:dfs.blockreport.incremental.intervalMsec 来实现延迟增量块汇报,参考 HDFS-9710,社区版本从2.9.0开始引入该特性。

4 DataNode响应客户端的RPC请求

DataNode所接收到的RPC请求则主要来自NameNode的操作指令和数据节点间的数据传输请求。

在DataNode启动过程中将会启动ipcServer用于创建DataNode与客户端之间的socket连接,并启动DataXceiverServer线程来不断的获取socket上的RPC请求,如果当前的处理线程数小于maxXceiverCount则启动一个DataXceiver线程来处理Block的读写、复制等操作。

while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
  try {
    peer = peerServer.accept();
    // Make sure the xceiver count is not exceeded
    int curXceiverCount = datanode.getXceiverCount();
    if (curXceiverCount > maxXceiverCount) {
      throw new IOException("Xceiver count " + curXceiverCount
          + " exceeds the limit of concurrent xcievers: "
          + maxXceiverCount);
    }
    new Daemon(datanode.threadGroup,
        DataXceiver.create(peer, datanode, this))
        .start();
  } 

DataNode响应客户端数据读写RPC请求的逻辑架构图如下:


综上,从DataNode作为RPC Server端的角度来看可以从如下几点来考虑RPC性能的优化:

  • 合理分配DataNode的JVM堆内存大小,避免频繁产生GC;并且需要关注DataNode所在服务器的计算资源和网络带宽的利用率,确保DataNode能够正常响应数据读写请求。
  • 合理配置DataNode的handler数和dataxceiver线程数最大值,如果配置的数值较小,则可能导致写入请求延迟的问题。

5 HDFS联邦

随着集群文件和Block对象的不断增长,NameNode必然将面临扩展性问题,通过HDFS联邦机制可以将NameNode划分为多个namespace,并由各个namespace分别管理一部分元数据,从而降低单个NameNode的处理压力。

但是采用联邦也带来了集群管理上的负担,并且只能暂时缓解文件数快速增长所产生的集群压力,因此我们需要继续关注集群所管理的文件对象数,持续进行数据周期管理,并通过应用层面的优化来避免在HDFS上写入大量小文件。

6 OZone解决方案

为了解决HDFS对于集群文件规模的限制,apache社区于2018年推出了ozone项目(https://hadoop.apache.org/ozone),并于今年3月份推出0.5.0-beta版本,ozone是基于对象存储的的架构设计,可管理到上百亿的数据对象,兼容当前的HDFS RPC协议,上层应用基本无需修改,同时也可以支持k8s作为持久化存储,我们后续将关注该项目的进展。



作者简介:

焦媛,2011年加入民生银行,现负责Hadoop平台运维和工具开发,HDFS和Spark的技术支持及源码研究工作。




以上是关于HDFS RPC 性能分析的主要内容,如果未能解决你的问题,请参考以下文章

HDFS QJM机制分析

Hadoop HDFS编程 API入门系列之RPC版本2

Hadoop HDFS编程 API入门系列之RPC版本1

基于Alluxio的HDFS多集群统一入口的实现

eBay 大数据平台的 HDFS 性能优化实践

大数据学习之HDFS的工作机制07