HDFS Multiple Standby原理分析
Posted Android路上的人
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS Multiple Standby原理分析相关的知识,希望对你有一定的参考价值。
文章目录
前言
HDFS在早期实现HA时,是标准的一主一备的服务模式,主的叫Active NameNode,备的叫Standby NameNode。Standby/Active NN间可以互相切换以此达到服务高可用的目的。但是这种双节点的HA模式是否能够满足更高的高可用性的要求呢?在标准的HA模式下,其实只有1个Standby的NN作为bak来使用。假设在极端情况下,Active和Stanby同时出现crash的情况(这种概率在实际生产环境中也是有几率发生的),那么此时HDFS集群将处于不能服务的状态。因此,我们是否能够实现一种具有更高HA特性的服务模式呢,比如1个Active Service+多Standby的运行模式?倘若我们有了多Standby的支持,毫无疑问将会大大提高NN服务的高可用性。目前社区在3.x版本中已经实现了此功能,相关JIRA HDFS-6440(Support more than 2 NameNodes)。本文笔者对此功能做一个简单的原理分析,在了解了其原理实现后,能够帮助我们更好地去使用这个功能特性。
HDFS Multiple Standby的实现要素
首先我们要知道实现HDFS Multiple Standby的前提是基于HDFS原有的HA实现之上的。它只是将原有的Active-Standby模式扩展为Active-多Standby模式。
从Single Standby到Multiple Standby的关系转变上,有一个核心的点是不变的:Active和Standby服务之间的交互行为。简单来说,就是原本Active和Standby NN服务之间的交互通信理应是不变的。在HA核心代码实现上我们并不需要做额外的改动。
因此在这里面,我们要重点关注以下Active、Standby NN之间的交互行为:
- NN Bootstrap行为
- Standby NN的checkpoint upload到Active NN的行为
- Active/Standby NN间的failover行为
- Standby NN向ActiveNN发起的log roll操作行为
上述四类操作在原有1对1的Active/Standby模式时,是比较简单的单向直接调用行为。但是在多Standby时,这个时候target会变成多个,而且这些 target的状态事先也不确定,可能是Standby也可能是Active。这部分也是多Standby实现的一个重点也是难点。
下面我们结合实际场景代码做逐一分析。
Multiple Standby实现分析
此小节部分我们主要对上面提到的4个交互行为做具体分析。
Bootstrap行为
在单Standby模式里,Standby NN的直接向另外一个Active NN发起download image的行为即可。但是在多Standby时,操作行为转变为:只需寻找到一个正在服务的NN进行bootstrp即可,不管这个NN是真正的Active还是Standby。因为bootstrap行为只是去NN的metadata文件数据。
相关代码如下:
BootstrapStandby.java
private int doRun() throws IOException {
// find the active NN
NamenodeProtocol proxy = null;
NamespaceInfo nsInfo = null;
boolean isUpgradeFinalized = false;
RemoteNameNodeInfo proxyInfo = null;
for (int i = 0; i < remoteNNs.size(); i++) {
proxyInfo = remoteNNs.get(i);
InetSocketAddress otherIpcAddress = proxyInfo.getIpcAddress();
proxy = createNNProtocolProxy(otherIpcAddress);
try {
// Get the namespace from any active NN. If you just formatted the primary NN and are
// bootstrapping the other NNs from that layout, it will only contact the single NN.
// However, if there cluster is already running and you are adding a NN later (e.g.
// replacing a failed NN), then this will bootstrap from any node in the cluster.
nsInfo = proxy.versionRequest();
isUpgradeFinalized = proxy.isUpgradeFinalized();
// 寻找到一个可用的NN即可
break;
} catch (IOException ioe) {
LOG.warn("Unable to fetch namespace information from remote NN at " + otherIpcAddress
+ ": " + ioe.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Full exception trace", ioe);
}
}
}
...
// download the fsimage from active namenode
int download = downloadImage(storage, proxy, proxyInfo);
if (download != 0) {
return download;
}
// finish the upgrade: rename previous.tmp to previous
...
return 0;
}
Standby NN的checkpoint upload行为
我们知道在HDFS HA机制里,Standby会定期checkpoint出一个新的image,然后upload到Active NN中。那么在多Standby NN的情况下,需要解决下面2点主要问题:
- Standby NN如何找到谁是真正的Active NN,然后再进行image的upload。
- 找到谁是真正的Active NN后,各个Standby如何协调进行image的upload,这里面会存在潜在的冲突问题。
对于第一个问题,社区的解决思路是假定所有的NN都是潜在的Active NN(里面肯定包括Standby NN)。然后检查每个NN上次发送成功image的状态信息,这个状态信息主要记录以下2点信息:
1)是否是Active节点,这个是基于上次image upload结果发现的,如果后面发生了failover,这个属性值会被更新。
2)上次image upload成功后的时间。
检查的条件:要么target NN是Active节点,要么或者是距上次image upload时间超过指定阈值范围内的即可。相关代码如下:
StandbyCheckpointer.java
private void doCheckpoint() throws InterruptedException, IOException {
assert canceler != null;
final long txid;
final NameNodeFile imageType;
...
// checkpoint操作生成新的image文件
img.saveNamespace(namesystem, imageType, canceler);
txid = img.getStorage().getMostRecentCheckpointTxId();
assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
thisCheckpointTxId + " but instead saved at txid=" + txid;
...
// Upload the saved checkpoint back to the active
// Do this in a separate thread to avoid blocking transition to active, but don't allow more
// than the expected number of tasks to run or queue up
// See HDFS-4816
ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
uploadThreadFactory);
// for right now, just match the upload to the nn address by convention. There is no need to
// directly tie them together by adding a pair class.
HashMap<String, Future<TransferFsImage.TransferResult>> uploads =
new HashMap<>();
for (final URL activeNNAddress : activeNNAddresses) {
// Upload image if at least 1 of 2 following conditions met:
// 1. has been quiet for long enough, try to contact the node.
// 2. this standby IS the primary checkpointer of target NN.
String addressString = activeNNAddress.toString();
assert checkpointReceivers.containsKey(addressString);
// 1)获取目标NN上次checkpoint的状态
CheckpointReceiverEntry receiverEntry =
checkpointReceivers.get(addressString);
long secsSinceLastUpload =
TimeUnit.MILLISECONDS.toSeconds(
monotonicNow() - receiverEntry.getLastUploadTime());
// 2)如果目标NN是Active或者上次checkpoint时间超出阈值外,则应该upload checkpoint到此NN
boolean shouldUpload = receiverEntry.isPrimary() ||
secsSinceLastUpload >= checkpointConf.getQuietPeriod();
if (shouldUpload) {
// 3)提交执行upload image到指定NN的任务
Future<TransferFsImage.TransferResult> upload =
executor.submit(new Callable<TransferFsImage.TransferResult>() {
@Override
public TransferFsImage.TransferResult call()
throws IOException, InterruptedException {
CheckpointFaultInjector.getInstance().duringUploadInProgess();
return TransferFsImage.uploadImageFromStorage(activeNNAddress,
conf, namesystem.getFSImage().getStorage(), imageType, txid,
canceler);
}
});
uploads.put(addressString, upload);
}
}
...
}
然后在获取upload结果时进行相应target NN的receive结果更新。
for (Map.Entry<String, Future<TransferFsImage.TransferResult>> entry :
uploads.entrySet()) {
String url = entry.getKey();
Future<TransferFsImage.TransferResult> upload = entry.getValue();
try {
// TODO should there be some smarts here about retries nodes that
// are not the active NN?
CheckpointReceiverEntry receiverEntry = checkpointReceivers.get(url);
TransferFsImage.TransferResult uploadResult = upload.get();
// 4.a)获取image upload结果,如果执行成功更新对应checkpoint receiver entry的状态:
// 1)标记receiver的Primary为true,意为此为当前真正的Active的节点 2)更新时间
if (uploadResult == TransferFsImage.TransferResult.SUCCESS) {
receiverEntry.setLastUploadTime(monotonicNow());
receiverEntry.setIsPrimary(true);
} else {
// Getting here means image upload is explicitly rejected
// by the other node. This could happen if:
// 1. the other is also a standby, or
// 2. the other is active, but already accepted another
// newer image, or
// 3. the other is active but has a recent enough image.
// All these are valid cases, just log for information.
LOG.info("Image upload rejected by the other NameNode: {}",
uploadResult);
// 4.b)否则将receiver的primary属性设置为false
receiverEntry.setIsPrimary(false);
}
} catch (ExecutionException e) {
// Even if exception happens, still proceeds to next NN url.
// so that fail to upload to previous NN does not cause the
// remaining NN not getting the fsImage.
ioes.add(new IOException("Exception during image upload", e));
} catch (InterruptedException e) {
ie = e;
break;
}
}
从上面代码我们可以看到,这里对每个target NN更新了最新receive image的结果,同时也能够识别出哪个是真正的Active节点(通过Primary属性)。因为还是存在有NN failover到其它NN的可能性,所以这里Standby NN还是向所有的其它NN发起了image的upload请求。如果target是Standby NN的话,这里会接受到一个错误的返回结果,也就实际不会发生upload image到Standby的情况了。
如果target NN是真正的Active NN,那么是否可能会发生多Standby NN频繁upload image到Active NN的情况呢?毕竟每个Standby NN都会做这样的checkpoint逻辑。要解决这个问题,我们需要在Active NN接收方这边做一些额外的判断了。
相关代码如下:
ImageServlet.java
protected void doPut(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
try {
...
UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// 1) 获取当前NN状态,判断是否是Active的NN,这里target的目标是Active NN
// 如果当前是Standby NN收到请求了,则返回的是错误结果
HAServiceProtocol.HAServiceState state = NameNodeHttpServer
.getNameNodeStateFromContext(getServletContext());
if (state != HAServiceProtocol.HAServiceState.ACTIVE &&
state != HAServiceProtocol.HAServiceState.OBSERVER) {
// we need a different response type here so the client can differentiate this
// from the failure to upload due to (1) security, or (2) other checkpoints already
// present
sendError(response, HttpServletResponse.SC_EXPECTATION_FAILED,
"Nameode "+request.getLocalAddr()+" is currently not in a state which can "
+ "accept uploads of new fsimages. State: "+state);
return null;
}
final long txid = parsedParams.getTxId();
String remoteAddr = request.getRemoteAddr();
ImageUploadRequest imageRequest = new ImageUploadRequest(txid, remoteAddr);
final NameNodeFile nnf = parsedParams.getNameNodeFile();
// 2)判断当前是否有别的upload image正在被执行
SortedSet<ImageUploadRequest> larger = currentlyDownloadingCheckpoints.tailSet(imageRequest);
if (larger.size() > 0) {
sendError(response, HttpServletResponse.SC_CONFLICT,
"Another checkpointer is already in the process of uploading a" +
" checkpoint made up to transaction ID " + larger.last());
return null;
}
//make sure no one else has started uploading one
if (!currentlyDownloadingCheckpoints.add(imageRequest)) {
sendError(response, HttpServletResponse.SC_CONFLICT,
"Either current namenode is checkpointing or another"
+ " checkpointer is already in the process of "
+ "uploading a checkpoint made at transaction ID "
+ txid);
return null;
}
...
// 3) 计算NN上次checkpoint距离目前的时间间隔以及Transaction数的差值,
// 然后做阈值的检查,避免过于频繁的image的download
if (checkRecentImageEnable &&
NameNodeFile.IMAGE.equals(parsedParams.getNameNodeFile()) &&
timeDelta < checkpointPeriod &&
txid - lastCheckpointTxid < checkpointTxnCount) {
// only when at least one of two conditions are met we accept
// a new fsImage
// 1. most recent image's txid is too far behind
// 2. last checkpoint time was too old
String message = "Rejecting a fsimage due to small time delta "
+ "and txnid delta. Time since previous checkpoint is "
+ timeDelta + " expecting at least " + checkpointPeriod
+ " txnid delta since previous checkpoint is " +
(txid - lastCheckpointTxid) + " expecting at least "
+ checkpointTxnCount;
LOG.info(message);
sendError(response, HttpServletResponse.SC_CONFLICT, message);
return null;
}
try {
...
InputStream stream = request.getInputStream();
try {
long start = monotonicNow();
//4) 当前NN进行image的download
MD5Hash downloadImageDigest = TransferFsImage
.handleUploadImageRequest(request, txid,
nnImage.getStorage(), stream,
parsedParams.getFileSize(), getThrottler(conf));
nnImage.saveDigestAndRenameCheckpointImage(nnf, txid,
downloadImageDigest);
...
} finally {
// 5)NN image下载完成,从请求list中移除当前equest
currentlyDownloadingCheckpoints.remove(imageRequest);
stream.close();
}
} finally {
nnImage.removeFromCheckpointing(txid);
}
return null;
}
});
} catch (Throwable t) {
...
}
}
从上面代码我们可以看到,Active NN的这部分逻辑会进行upload行为的时间和阈值的检查,避免过于频繁的去下载来自Standby NN的image。
Active/Standby NN间的failover行为
HA服务间的failover行为是另一个需要特别改动的地方,目前HDFS的主从切换依赖的是ZKFC的机制做的。它的一个简单原理是谁率先能够抢到zk的锁并在上面创建相应的znode节点,那么此节点就成为当前的Active服务节点。如果这个znode节点的状态发生改变了,将会触发相应新的Active的选举产生。
我们先来看下原本的failover过程是如何实现的,这样能够方便我们理解多Standby情况的failover实现。
ZKFailoverController.java
/**
* Coordinate a graceful failover. This proceeds in several phases:
* 1) Pre-flight checks: ensure that the local node is healthy, and
* thus a candidate for failover.
* 2) Determine the current active node. If it is the local node, no
* need to failover - return success.
* 3) Ask that node to yield from the election for a number of seconds.
* 4) Allow the normal election path to run in other threads. Wait until
* we either become unhealthy or we see an election attempt recorded by
* the normal code path.
* 5) Allow the old active to rejoin the election, so a future
* failback is possible.
*/
private void doGracefulFailover()
throws ServiceFailedException, IOException, InterruptedException {
int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
// Phase 1: pre-flight checks
checkEligibleForFailover();
// 1) 获取当前Active地址信息,即老Active信息
HAServiceTarget oldActive = getCurrentActive();
if (oldActive == null) {
// No node is currently active. So, if we aren't already
// active ourselves by means of a normal election, then there's
// probably something preventing us from becoming active.
throw new ServiceFailedException(
"No other node is currently active.");
}
// 如果目标target节点已经是active了,则不执行后面的failover逻辑
if (oldActive.getAddress().equals(localTarget.getAddress())) {
LOG.info("Local node " + localTarget + " is already active. " +
"No need to failover. Returning success.");
return;
}
// 2)让当前Active服务在指定时间里退出其Active服务角色,并在此时间之内其不能进行Active服务的竞争获取
LOG.info("Asking " + oldActive + " to cede its active state for " +
timeout + "ms");
ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout);
oldZkfc.cedeActive(timeout);
// 3)等待当前节点成功变为Active服务,因为没有别的服务竞争Active的选举了
ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000);
if (attempt == null) {
// We didn't even make an attempt to become active.
synchronized(this) {
if (lastHealthState != State.SERVICE_HEALTHY) {
throw new ServiceFailedException("Unable to become active. " +
"Service became unhealthy while trying to failover.");
}
}
throw new ServiceFailedException("Unable to become active. " +
"Local node did not get an opportunity to do so from ZooKeeper, " +
"or the local node took too long to transition to active.");
}
// 4)恢复原Active节点的选举权
oldZkfc.cedeActive(-1);
if (attempt.succeeded) {
LOG.info("Successfully became active. " + attempt.status);
} else {
// Propagate failure
String msg = "Failed to become active. " + attempt.status;
throw new ServiceFailedException(msg);
}
}
上面的过程归纳来说是通过短暂剥夺其它服务的Active选举权来完成目标节点Active身份的成功获取。在多Standby的实现中,这套逻辑同样能够完美地适用,只是目标剥夺选举权的对象从一个Standby NN变为多个Standby NN。
相关代码如下,还是ZKFailoverController.java这个类:
private void doGracefulFailover()
throws ServiceFailedException, IOException, InterruptedException {
int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
// Phase 1: pre-flight checks
checkEligibleForFailover();
// 1.a)获取当前active节点信息
HAServiceTarget oldActive = getCurrentActive();
if (oldActive == null) {
// No node is currently active. So, if we aren't already
// active ourselves by means of a normal election, then there's
// probably something preventing us from becoming active.
throw new ServiceFailedException(
"No other node is currently active.");
}
// 1.b)判断如果当前节点已经是active了,则执行返回
if (oldActive.getAddress().equals(localTarget.getAddress())) {
LOG.info("Local node " + localTarget + " is already active. " +
"No need to failover. Returning success.");
return;
}以上是关于HDFS Multiple Standby原理分析的主要内容,如果未能解决你的问题,请参考以下文章