HDFS NN refreshNodes操作的可用性和效率的改进
Posted Android路上的人
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS NN refreshNodes操作的可用性和效率的改进相关的知识,希望对你有一定的参考价值。
前言
我们知道在HDFS里面有,存着一类白名单和黑名单的列表来控制其下允许进行注册的DN节点。这样可以防止一些外部恶意节点注册到我们的NN上来。在HDFS的概念里,这个黑白名单叫做include file和exclude file。在一般情况下,exclude file的使用范围会更管一些,因为DN的decommission下线需要将待下线机器加到此exclude file中,然后再手动执行dfsadmin的refreshNodes命令进行刷新即可。至于include file白名单,它的管理其实比较复杂。在默认情况下,include file是为空的,意味着默认所有的注册的上来的节点都是被允许的。但是这样会有严重的安全隐患,所以在注重Security环境的集群内,需要管理员每次进行include file的更新来确保当前服务的DN都是有效的。Include file/execlude file的更新需要触发NN的refreshNodes操作来生效。因此这里会存在大量refreshNodes操作的发生。最近笔者在生产环境中就遇到了NN refreshNodes的一些性能问题,本文将讲述围绕refreshNodes的性能问题以及其改进点设计实现。
NN refreshNodes的可用性以及效率问题
笔者在最近工作中遇到了由于refreshNodes失败导致的集群出现大量missing block的情况,鉴于大量missing block造成了比较大的impact。笔者于是开始对此问题进行了深入的分析。
我们从结果开始往前分析,missing block的出现是因为NN上掉了大量节点。NN掉了大量节点的时间段,发现某一台挂掉的DN log里显示了很多下面的Disallow异常:
2021-05-23 07:10:07,609 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: RemoteException in offerService
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException): Datanode denied communication with namenode because the host is not in the include-list: xx.xx.xx.xx:50010
at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.handleHeartbeat(DatanodeManager.java:1499)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.handleHeartbeat(FSNamesystem.java:4978)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.sendHeartbeat(NameNodeRpcServer.java:1539)
at org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB.sendHeartbeat(DatanodeProtocolServerSideTranslatorPB.java:118)
at org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos$DatanodeProtocolService$2.callBlockingMethod(DatanodeProtocolProtos.java:31228)
看到这个的第一反应,是这个节点不在NN的inlcude host列表里面了,然后才发生的上述拒绝错误。再往上进行分析,又发现了很多诸如“fail to resolve”这类的错误,基本断定是NN解析include host时的dns解析问题。
2021-05-23 07:09:59,030 INFO org.apache.hadoop.util.HostsFileReader: Adding a node "xx.xx.xx.xx" to the list of included hosts from /xxx/hadoop/hosts
2021-05-23 07:09:59,932 WARN org.apache.hadoop.hdfs.server.blockmanagement.HostFileManager: Failed to resolve address `xx.xx.xx.xx` in `/xxx/hadoop/hosts`. Ignoring in the included list.
而触发上面resolve行为的是dfsadmin refreshNodes命令,不过这个refreshNodes执行的耗时相当长。
2021-05-23 07:01:39,001 WARN org.apache.hadoop.ipc.Server: Slow RPC : refreshNodes took 5269MILLISECONDS to process from client Call#0 Retry#0 org.apache.hadoop.hdfs.protocol.ClientProtocol.refreshNodes from xx.xx.xx.xx
后来经过分析发现,refershNodes失败的原因是短时段内大量的dns查询导致了dns解析的延迟,这个延时在NN的代码调用层面来看到的情况,就是解析失败的情况。
在这个问题里,有很多可以值得讨论的点:
-
第一,为什么这里会有这么大量的dns查询请求?后来发现是因为集群DN数总量在持续不断变多,导致每次refreshNodes的cost越来越高。因为NN的refreshNodes行为会重刷所有的DN节点,相当于这是一个全量解析host地址的行为。
-
第二,为什么这里没有NN解析重试行为?refreshNodes执行失败了,最好能够有一个重试机制,否则失败造成的掉节点,missing block对用户影响太大。
-
第三,现有refreshNodes的效率不高,它是一种全量解析的方式,是否我们能够支持增量式地refresh node,对于大部分分状态不变的节点,NN完全没必要再对其做dns的重新解析。在增量式 refesh node的情况里,admin管理员能够指定特定的host进行node的refresh,从而使得refreshNode的操作变得非常轻量级。
针对上面的第二,第三点,我们内部对refreshNode逻辑进行了调整和改进,使其在可用性以及执行效率上得到一个号的提升。
NN refreshNodes的改造升级
针对上节部分提出的几个关键点,我们对此进行了如下两部分的改进。
refreshNodes重试机制的引入
在原有逻辑里,NN refreshNodes会触发重新读取所有的include/exclude文件中的host信息。在这个过程里,这些文件里可能会包含有上千甚至上万个节点的host信息,每次读取到一个新的host后,NN尝试会进行对其host的ip地址的解析,相关代码如下:
private static HostSet readFile(String type, String filename)
throws IOException {
HostSet res = new HostSet();
if (!filename.isEmpty()) {
HashSet<String> entrySet = new HashSet<String>();
HostsFileReader.readFileToSet(type, filename, entrySet);
for (String str : entrySet) {
// 构造InetSocketAddress地址时,会进行地址解析
InetSocketAddress addr = parseEntry(type, filename, str);
if (addr != null) {
res.add(addr);
}
}
}
return res;
}
@VisibleForTesting
static InetSocketAddress parseEntry(String type, String fn, String line) {
try {
URI uri = new URI("dummy", line, null, null, null);
int port = uri.getPort() == -1 ? 0 : uri.getPort();
InetSocketAddress addr = new InetSocketAddress(uri.getHost(), port);
if (addr.isUnresolved()) {
LOG.warn(String.format("Failed to resolve address `%s` in `%s`. " +
"Ignoring in the %s list.", line, fn, type));
return null;
}
return addr;
} catch (URISyntaxException e) {
LOG.warn(String.format("Failed to parse `%s` in `%s`. " + "Ignoring in " +
"the %s list.", line, fn, type));
}
return null;
}
因此我们这里所说的重试行为指的就是对parseEntry里面失败的情况再一次进行retry。
那么这里的retry机制要怎么实现呢?我们在这里首先引入了一个新的用于存放paseEntry阶段解析host失败的集合对象,这里叫它notInclude的集合对象,对应成功解析的include host的集合对象。
class HostFileManager {
private static final Log LOG = LogFactory.getLog(HostFileManager.class);
private HostSet includes = new HostSet();
private HostSet excludes = new HostSet();
// 读取include文件时,存放那些解析失败的host
private HostSet notIncludes = new HostSet();
...
然后对parseEntry 逻辑进行了微调,以此能够进行失败host的收集:
HostFileManager#readFile方法:
private static HostSet[] readFile(String type, String filename)
throws IOException {
HostSet[] hostSets = new HostSet[2];
HostSet res = new HostSet();
HostSet failedRes = new HostSet();
if (!filename.isEmpty()) {
HashSet<String> entrySet = new HashSet<String>();
HostsFileReader.readFileToSet(type, filename, entrySet);
for (String str : entrySet) {
InetSocketAddress addr = parseEntry(type, filename, str);
if (addr != null) {
// 根据host的地址是否能够解析,将其放入对应的host set集合里
if (addr.isUnresolved()) {
failedRes.add(addr);
} else {
res.add(addr);
}
}
}
}
hostSets[0] = res;
hostSets[1] = failedRes;
return hostSets;
}
上述步骤结束之后,HostFileManager里的notIncludes变量就会存有失败的host列表了,然后我们在后续的refreshDatanodes方法里进行重试,逻辑如下:
DatanodeManager#refreshDatanodes方法:
public void refreshDatanodes() {
Collection<DatanodeDescriptor> nodes = null;
synchronized (datanodeMap) {
nodes = datanodeMap.values();
}
List<DatanodeDescriptor> notIncluded =
new ArrayList<DatanodeDescriptor>();
List<DatanodeDescriptor> included =
new ArrayList<DatanodeDescriptor>(nodes.size());
// 根据判断是否在include列表里,进行筛选
for (DatanodeDescriptor node : nodes) {
if (hostFileManager.isIncluded(node)) {
included.add(node);
} else {
notIncluded.add(node);
}
}
// there could be DNS resolve failure causing healthy node is treated as
// not included, retry resolving to avoid the issue;
int resolveTry = 0;
final int RESOLVE_MAX = 3;
// 对not include列表里的host进行解析重试,
while ((!notIncluded.isEmpty()) && (resolveTry < RESOLVE_MAX)) {
if (LOG.isDebugEnabled()) {
LOG.debug("resolve retry #" + resolveTry);
LOG.debug("notInclude hosts: " + notIncluded);
}
Iterator<DatanodeDescriptor> iter = notIncluded.iterator();
while (iter.hasNext()) {
DatanodeDescriptor node = iter.next();
InetSocketAddress addr =
HostFileManager.resolvedAddressFromDatanodeID(node);
if (!addr.isUnresolved()) {
// 如果属于是在hostFileManager notIncludes里的解析失败情况的话,
// 则进行host的移除,然后再重新把host加入include list
if (hostFileManager.isNotIncluded(addr)) {
hostFileManager.addToIncludeHosts(node.getIpAddr());
included.add(node);
iter.remove();
} else if (hostFileManager.isIncluded(addr)) {
// 如果只是在前面调用isIncluded执行时候失败的情况,
// 则再进行一次此方法的调用,hostFileManager#isIncluded方法里面也会有一次地址解析
included.add(node);
iter.remove();
}
}
}
// 最多允许重试3次,每次睡眠一定的间隔,此间隔支持可配置化
resolveTry++;
try {
Thread.sleep(hostResolveRetryInterval);
} catch (InterruptedException ie) {}
}
// 对notIncluded、included进行相应action的take操作
final String operationName = "inner-refreshNodes";
namesystem.writeLock();
try {
for (DatanodeDescriptor node : notIncluded) {
node.setDisallowed(true); // case 2.
}
for (DatanodeDescriptor node : included) {
node.setDisallowed(false);
if (hostFileManager.isExcluded(node)) {
decomManager.startDecommission(node); // case 3.
} else {
decomManager.stopDecommission(node); // case 4.
}
}
countSoftwareVersions();
} finally {
namesystem.writeUnlock(operationName);
}
}
上面每次重试过程是支持时间可配置化,因为有时dns解析的恢复可能需要多达几分钟这样的时间,需要根据实际的使用场景进行配置,默认是一分钟,然后最多重试3次。上面的代码已经是我们内部进行过特别改造过的代码,和社区代码已有所不同,大家看明白主要修改的逻辑思路即可。
增量refreshNodes命令的实现
另外一个关于执行效率上的改进点是增量refreshNodes的实现。
针对NN维护有include/exclude两类文件的情况,我们实现了对应如下的4个新的refreshNodes命令操作,分别是加减host到include/exclude文件里面,然后后面是需要传入的host信息,可支持多host的传入。
bin/hdfs dfsadmin refreshNodes -addIncludeHost [host1,host2,...]
bin/hdfs dfsadmin refreshNodes -removeIncludeHost [host1,host2,...]
bin/hdfs dfsadmin refreshNodes -addExcludeHost [host1,host2,...]
bin/hdfs dfsadmin refreshNodes -removeExcludeHost [host1,host2,...]
从本质上来说,上面4类命令是对HostFileManager里面includes,excludes变量的一个内存操作,然后再进行对应节点的action操作。
这里分别以-addIncludeHost/-addExcludeHost命令实现为例,这里只展示server端的代码实现:
DatanodeManager#refreshNodeWithAddHostIntoInclude方法:
/**
* Refresh with adding the given host string into include list.
* @param host A single host or multiple host string.
*/
public void refreshNodeWithAddHostIntoInclude(String host) {
String ipAddr;
// 1) 解析传入参数,得到指定的host信息
String[] hosts = host.split(",");
List<String> ipAddrList = new ArrayList<>();
for(String h : hosts) {
// 2)遍历host信息,得到其对应的地址,如果解析失败的情况,则跳过此host
InetSocketAddress addr = this.hostFileManager.resolveAndAddIncludes(h);
if (addr == null) {
continue;
}
// 从地址对象里拿到ip信息,进行收集,因为NN在内部存DN信息时,是通过ip进行区分
ipAddr = addr.getAddress() != null ?
addr.getAddress().getHostAddress() : null;
if (ipAddr != null) {
ipAddrList.add(ipAddr);
}
}
// 3)根据ip地址进行现有LIVE DN的过滤查找
List<DatanodeDescriptor> targetNodeList = findTargetDatanode(ipAddrList);
if (!targetNodeList.isEmpty()) {
final String operationName = "refreshNodeWithAddHostIntoInclude";
namesystem.writeLock();
try {
// 4) 找到目标DN节点对象后,进行include host节点的对应的属性操作,即设置Disallowed属性为false
for (DatanodeDescriptor dd : targetNodeList) {
dd.setDisallowed(false);
}
countSoftwareVersions();
} finally {
namesystem.writeUnlock(operationName);
}
} else {
LOG.info("Refresh node with specific node," +
" target host doesn't find in NN for host " + host);
}
}
this.hostFileManager的resolveAndAddIncludes方法如下:
/**
* Add specific host into the include host.
* @param host Host to be added.
*/
InetSocketAddress resolveAndAddIncludes(String host) {
InetSocketAddress addr = parseEntry("included", "non-file", host);
if (addr != null) {
if (!addr.isUnresolved()) {
synchronized (this) {
// 如果该host地址信息已经在include集合里存在,则忽略添加
if (includes.match(addr)) {
LOG.warn("Target node " + host + " is already in include list," +
" ignoring to add into include host.");
} else {
includes.add(addr);
}
}
return addr;
} else {
// 如果地址解析失败,则也忽略添加
LOG.warn("Failed to resolve the host " + host + "," +
" ignoring to add into include host.");
}
}
return null;
}
addExcludeHost的实现和addIncludeHost实现几乎一致,区别只是后续DN的action操作不同:
/**
* Refresh with adding the given host string into exclude list.
* @param host A single host or multiple host string.
*/
public void refreshNodeWithAddHostIntoExclude(String host) {
String ipAddr = null;
String[] hosts = host.split(",");
List<String> ipAddrList = new ArrayList<>();
for (String h : hosts) {
InetSocketAddress addr = this.hostFileManager.resolveAndAddExcludes(h);
if (addr == null) {
continue;
}
ipAddr = addr.getAddress() != null ?
addr.getAddress().getHostAddress() : null;
if (ipAddr != null) {
ipAddrList.add(ipAddr);
}
}
// find the node that match the target ip
List<DatanodeDescriptor> targetNodeList = findTargetDatanode(ipAddrList);
if (!targetNodeList.isEmpty()) {
final String operationName = "refreshNodeWithAddHostIntoExclude";
namesystem.writeLock();
try {
// 加入exclude host的对应action操作是进行此DN的下线
for (DatanodeDescriptor dd : targetNodeList) {
decomManager.startDecommission(dd);
}
countSoftwareVersions();
} finally {
namesystem.writeUnlock(operationName);
}
} else {
LOG.info("Refresh node with specific node," +
" target host doesn't find in NN for host " + host);
}
}
上面的this.hostFileManager.resolveAndAddExcludes方法如下:
/**
* Add specific host into the exclude host.
* @param host Host to be added.
*/
InetSocketAddress resolveAndAddExcludes(String host) {
InetSocketAddress addr = parseEntry("excluded", "non-file", host);
if (addr != null) {
if (!addr.isUnresolved()) {
synchronized (this) {
// 如果exclude列表里已经有此地址时,则忽略添加
if (excludes.match(addr)) {
LOG.warn("Target node " + host + " is already in exclude list," +
" ignoring to add into exclude host.");
} else {
// 在添加地址到exclude前,以上是关于HDFS NN refreshNodes操作的可用性和效率的改进的主要内容,如果未能解决你的问题,请参考以下文章