HDFS文件误删怎么办,一招教你恢复回来,再也不用担心删库跑路了
Posted Vicky_Tang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS文件误删怎么办,一招教你恢复回来,再也不用担心删库跑路了相关的知识,希望对你有一定的参考价值。
本文基于 Hadoop3.1.2版本讲解
HDFS 文件删除过程
下面是hdfs删除路径的方法,源码路径org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete()
/**
* Remove the indicated file from namespace.
*
* @see ClientProtocol#delete(String, boolean) for detailed description and
* description of exceptions
*/
boolean delete(String src, boolean recursive, boolean logRetryCache)
throws IOException {
waitForLoadingFSImage();
BlocksMapUpdateInfo toRemovedBlocks = null;
writeLock();
boolean ret = false;
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot delete " + src);
toRemovedBlocks = FSDirDeleteOp.delete( // @1
this, src, recursive, logRetryCache);
ret = toRemovedBlocks != null;
} catch (AccessControlException e) {
logAuditEvent(false, "delete", src);
throw e;
} finally {
writeUnlock();
}
getEditLog().logSync();
if (toRemovedBlocks != null) {
removeBlocks(toRemovedBlocks); // @2
}
logAuditEvent(true, "delete", src);
return ret;
}
代码@1里面做了三件比较重要的事
①从 NameNode 维护的的目录树里面删除路径,这也是为什么执行删除操作之后就无法再通过hdfs dfs -ls xxx 或其它 api 方式再查看到路径的根本原因(拒绝需要被删除的文件的外部访问)
②找出被删路径关联的 block 信息,每个文件包含多个 block 块,分布在各个 DataNode,此时并未真正物理删除 DataNode 上物理磁盘上的block块
③记录删除日志到editlog(这一步也很重要,甚至是后面恢复数据的关键)
代码@2把将要删除的block信息添加到org.apache.hadoop.hdfs.server.blockmanagement.BlockManager 里面维护的 InvalidateBlocks 对象中,InvalidateBlocks 专门用于保存等待删除的数据块副本
以上步骤并未涉及真正的物理删除的操作
BlockManager
BlockManager 管理了hdfs block 的生命周期并且维护在 Hadoop 集群中的块相关的信息,包括快的上报、复制、删除、监控、标记等等一系列功能。
BlockManager 中有个方法 invalidateWorkForOneNode() 专门用于定时删除 InvalidateBlocks 中存储的待删除的快,此方法会在 NameNode 启动时在 BlockManager 的内部线程类ReplicationMonitor 定时轮循把要删除的块放入 DatanodeDescriptor 中的逻辑,方法的调用路径如下:
org.apache.hadoop.hdfs.server.namenode.NameNode#initialize(Configuration conf)
org.apache.hadoop.hdfs.server.namenode.NameNode#startCommonServices(Configuration conf)
org.apache.hadoop.hdfs.server.namenode.FSNamesystem#startCommonServices(Configuration conf, HAContext haContext)
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#activate(Configuration conf)
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.ReplicationMonitor#run()
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#computeDatanodeWork()
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#computeDatanodeWorkcomputeInvalidateWork(int nodesToProcess)
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager#invalidateWorkForOneNode(DatanodeInfo dn)
org.apache.hadoop.hdfs.server.blockmanagement.InvalidateBlocks#invalidateWork(final DatanodeDescriptor dn)
BlockManager 维护了 InvalidateBlocks,存放了待删除的 block,BlockManager 在 NameNode 启动时会单独启动一个线程,定时把要删除的块信息放入 InvalidateBlocks 中,每次会从InvalidateBlocks 队列中为每个 DataNode 取出 blockInvalidateLimit(由配置项dfs.block.invalidate.limit,默认1000)个块逻辑在 BlockManager.computeInvalidateWork() 方法里会把要删除的块信息放入 DatanodeDescriptor 中的 invalidateBlocks 数组,DatanodeManager 再通过 DataNode 与NameNode 心跳时,构建删除块的指令集,NameNode 再把指令下发给DataNode,心跳由 DatanodeProtocol 调用,方法的调用路径如下:
org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol#sendHeartbeat()
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer#sendHeartbeat()
org.apache.hadoop.hdfs.server.namenode.FSNamesystem#handleHeartbeat()
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager#handleHeartbeat()
DatanodeManager.handleHeartbeat() 中构建删除的指令给 DataNode,待 NameNode 发送的代码如下:
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
final DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);
} catch (UnregisteredNodeException e) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
// Check if this datanode should actually be shutdown instead.
if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
if (nodeinfo == null || !nodeinfo.isRegistered()) {
return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.
if (namesystem.isInSafeMode()) {
return new DatanodeCommand[0];
}
// block recovery command
final BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId,
nodeinfo);
if (brCommand != null) {
return new DatanodeCommand[]{brCommand};
}
final List<DatanodeCommand> cmds = new ArrayList<>();
// Allocate _approximately_ maxTransfers pending tasks to DataNode.
// NN chooses pending tasks based on the ratio between the lengths of
// replication and erasure-coded block queues.
int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks();
int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded();
int totalBlocks = totalReplicateBlocks + totalECBlocks;
if (totalBlocks > 0) {
int numReplicationTasks = (int) Math.ceil(
(double) (totalReplicateBlocks * maxTransfers) / totalBlocks);
int numECTasks = (int) Math.ceil(
(double) (totalECBlocks * maxTransfers) / totalBlocks);
if (LOG.isDebugEnabled()) {
LOG.debug("Pending replication tasks: " + numReplicationTasks
+ " erasure-coded tasks: " + numECTasks);
}
// check pending replication tasks
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
numReplicationTasks);
if (pendingList != null && !pendingList.isEmpty()) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
// check pending erasure coding tasks
List<BlockECReconstructionInfo> pendingECList = nodeinfo
.getErasureCodeCommand(numECTasks);
if (pendingECList != null && !pendingECList.isEmpty()) {
cmds.add(new BlockECReconstructionCommand(
DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList));
}
}
// check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId,
blks));
}
// cache commands
addCacheCommands(blockPoolId, nodeinfo, cmds);
// key update command
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
// check for balancer bandwidth update
if (nodeinfo.getBalancerBandwidth() > 0) {
cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
// set back to 0 to indicate that datanode has been sent the new value
nodeinfo.setBalancerBandwidth(0);
}
if (slowPeerTracker != null) {
final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
if (!slowPeersMap.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
slowPeersMap);
}
for (String slowNodeId : slowPeersMap.keySet()) {
slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
}
}
}
if (slowDiskTracker != null) {
if (!slowDisks.getSlowDisks().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow disks: " +
slowDisks.getSlowDisks());
}
slowDiskTracker.addSlowDiskReport(nodeReg.getIpcAddr(false), slowDisks);
}
}
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
return new DatanodeCommand[0];
}
定时轮循+limit 1000个块删除的特性决定了hdfs删除数据并不会立即真正的执行物理删除,并且一次删除的数量也有限,所以出现误删操作需要立即停止HDFS,虽然有的数据在轮循中已被删除,所以事发后停止HDFS集群越早,被删的数据越少,损失越小!
EditLog
EditLog记录了hdfs操作的每一条日志记录,包括当然包括删除,我们所熟知的文件操作类型只有增、删、改,但是在 HDFS 的领域里,远远不止这些操作,我们看看 EditLog 操作类型的枚举类org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
/**
* Op codes for edits file
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public enum FSEditLogOpCodes {
// last op code in file
OP_ADD ((byte) 0, AddOp.class),
// deprecated operation
OP_RENAME_OLD ((byte) 1, RenameOldOp.class),
OP_DELETE ((byte) 2, DeleteOp.class),
OP_MKDIR ((byte) 3, MkdirOp.class),
OP_SET_REPLICATION ((byte) 4, SetReplicationOp.class),
@Deprecated OP_DATANODE_ADD ((byte) 5), // obsolete
@Deprecated OP_DATANODE_REMOVE((byte) 6), // obsolete
OP_SET_PERMISSIONS ((byte) 7, SetPermissionsOp.class),
OP_SET_OWNER ((byte) 8, SetOwnerOp.class),
OP_CLOSE ((byte) 9, CloseOp.class),
OP_SET_GENSTAMP_V1 ((byte) 10, SetGenstampV1Op.class),
OP_SET_NS_QUOTA ((byte) 11, SetNSQuotaOp.class), // obsolete
OP_CLEAR_NS_QUOTA ((byte) 12, ClearNSQuotaOp.class), // obsolete
OP_TIMES ((byte) 13, TimesOp.class), // set atime, mtime
OP_SET_QUOTA ((byte) 14, SetQuotaOp.class),
// filecontext rename
OP_RENAME ((byte) 15, RenameOp.class),
// concat files
OP_CONCAT_DELETE ((byte) 16, ConcatDeleteOp.class),
OP_SYMLINK ((byte) 17, SymlinkOp.class),
OP_GET_DELEGATION_TOKEN ((byte) 18, GetDelegationTokenOp.class),
OP_RENEW_DELEGATION_TOKEN ((byte) 19, RenewDelegationTokenOp.class),
OP_CANCEL_DELEGATION_TOKEN ((byte) 20, CancelDelegationTokenOp.class),
OP_UPDATE_MASTER_KEY ((byte) 21, UpdateMasterKeyOp.class),
OP_REASSIGN_LEASE ((byte) 22, ReassignLeaseOp.class),
OP_END_LOG_SEGMENT ((byte) 23, EndLogSegmentOp.class),
OP_START_LOG_SEGMENT ((byte) 24, StartLogSegmentOp.class),
OP_UPDATE_BLOCKS ((byte) 25, UpdateBlocksOp.class),
OP_CREATE_SNAPSHOT ((byte) 26, CreateSnapshotOp.class),
OP_DELETE_SNAPSHOT ((byte) 27, DeleteSnapshotOp.class),
OP_RENAME_SNAPSHOT ((byte) 28, RenameSnapshotOp.class),
OP_ALLOW_SNAPSHOT ((byte) 29, AllowSnapshotOp.class),
OP_DISALLOW_SNAPSHOT ((byte) 30, DisallowSnapshotOp.class),
OP_SET_GENSTAMP_V2 ((byte) 31, SetGenstampV2Op.class),
OP_ALLOCATE_BLOCK_ID ((byte) 32, AllocateBlockIdOp.class),
OP_ADD_BLOCK ((byte) 33, AddBlockOp.class),
OP_ADD_CACHE_DIRECTIVE ((byte) 34, AddCacheDirectiveInfoOp.class),
OP_REMOVE_CACHE_DIRECTIVE ((byte) 35, RemoveCacheDirectiveInfoOp.class),
OP_ADD_CACHE_POOL ((byte) 36, AddCachePoolOp.class),
OP_MODIFY_CACHE_POOL ((byte) 37, ModifyCachePoolOp.class),
OP_REMOVE_CACHE_POOL ((byte) 38, RemoveCachePoolOp.class),
OP_MODIFY_CACHE_DIRECTIVE ((byte) 39, ModifyCacheDirectiveInfoOp.class),
OP_SET_ACL ((byte) 40, SetAclOp.class),
OP_ROLLING_UPGRADE_START ((byte) 41, RollingUpgradeStartOp.class),
OP_ROLLING_UPGRADE_FINALIZE ((byte) 42, RollingUpgradeFinalizeOp.class),
OP_SET_XATTR ((byte) 43, SetXAttrOp.class),
OP_REMOVE_XATTR ((byte) 44, RemoveXAttrOp.class),
OP_SET_STORAGE_POLICY ((byte) 45, SetStoragePolicyOp.class),
OP_TRUNCATE ((byte) 46, TruncateOp.class),
OP_APPEND ((byte) 47, AppendOp.class),
OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48, SetQuotaByStorageTypeOp.class),
OP_ADD_ERASURE_CODING_POLICY ((byte) 49, AddErasureCodingPolicyOp.class),
OP_ENABLE_ERASURE_CODING_POLICY((byte) 50, EnableErasureCodingPolicyOp.class),
OP_DISABLE_ERASURE_CODING_POLICY((byte) 51,
DisableErasureCodingPolicyOp.class),
OP_REMOVE_ERASURE_CODING_POLICY((byte) 52, RemoveErasureCodingPolicyOp.class),
// Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1);
private final byte opCode;
private final Class<? extends FSEditLogOp> opClass;
/**
* Constructor
*
* @param opCode byte value of constructed enum
*/
FSEditLogOpCodes(byte opCode) {
this(opCode, null);
}
FSEditLogOpCodes(byte opCode, Class<? extends FSEditLogOp> opClass) {
this.opCode = opCode;
this.opClass = opClass;
}
/**
* return the byte value of the enum
*
* @return the byte value of the enum
*/
public byte getOpCode() {
return opCode;
}
public Class<? extends FSEditLogOp> getOpClass() {
return opClass;
}
private static final FSEditLogOpCodes[] VALUES;
static {
byte max = 0;
for (FSEditLogOpCodes code : FSEditLogOpCodes.values()) {
if (code.getOpCode() > max) {
max = code.getOpCode();
}
}
VALUES = new FSEditLogOpCodes[max + 1];
for (FSEditLogOpCodes code : FSEditLogOpCodes.values()) {
if (code.getOpCode() >= 0) {
VALUES[code.getOpCode()] = code;
}
}
}
/**
* Converts byte to FSEditLogOpCodes enum value
*
* @param opCode get enum for this opCode
* @return enum with byte value of opCode
*/
public static FSEditLogOpCodes fromByte(byte opCode) {
if (opCode >= 0 && opCode < VALUES.length) {
return VALUES[opCode];
}
return opCode == -1 ? OP_INVALID : null;
}
}
总计54种操作类型!打破了人们印象中文件只有增删改读的几种操作。在hadoop的配置参数dfs.namenode.name.dir
可以找到路径
这里EditLog文件是序列化后的二进制文件不能直接查看,hdfs自带了解析的命令,可以解析成xml明文格式
hdfs oev -i edits_0000000000035854978-0000000000035906741 -o edits.xml
对 hdfs 的每一个操作都会记录一串 RECORD,RECORD 里面不同的操作包含的字段属性也不同,但是所有操作都具备的属性是 OPCODE,对应上面的枚举类org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes
中的操作
hdfs元数据的加载
hdfs 启动时,NameNode 会加载 Fsimage,Fsimage 记录了 hdfs 现有的全量的路径信息,启动过程中仅仅加载 Fsimage?这句话不完全正确!启动的同时,还会加载未被合并成 fsimage 的EditLog。关于 fsimage 具体细节这里不展开。举个栗子:
假设Hadoop 3分钟checkpoint一次生成Fsimage文件,EditLog 1分钟生成一个文件,下面是依次生成的文件:
fsimage_1
editlog_1
editlog_2
editlog_3
fsimage_2
editlog_4
editlog_5
当NameNode启动时,会加载后缀时间戳最大的那个fsimage文件和它后面产生的editlog文件,也就是会加载fsimage_2、editlog_4、editlog_5进NameNode内存。
恢复方法
假设我们执行 hdfs dfs -rmr xxx 命令的操作记录在了 editlog_5 上面,那么,重启 NameNode 后,我们查看hdfs无法再查看到xxx路径,如果我们把fsimage_2删掉,NameNode则会加载fsimage_1、editlog_1、editlog_2,此时的元数据里面xxx还未被删除,如果此时DataNode未物理删除block,则数据可以恢复,但是editlog_4、editlog_5对应的hdfs操作会丢失。有没有更好的方法呢?
方案一:
删掉 fsimage_2,从上一次 checkpoint 的地方也就是 fsimage_1 恢复,我们集群的实际配置,是一个小时生成一次 fsimage 文件,也就是说,这种恢复方案会导致近一小时 hdfs 新增的文件全部丢失,这一个小时不知道发生了多少事情,可想而知的后果是恢复之后一堆报错,显然不是最好的方案
方案二:
修改editlog_5,把删除xxx
那条操作改成其它安全的操作类型,这样重启NameNode后,又可以看到这个路径。
步骤:
-
关闭HDFS集群
-
解析editlog
找到删除操作时间点范围内所属的editlog文件,解析
hdfs oev -i edits_0000000000000000336-0000000000000000414 -o edits.xml
查看editlog.xml,执行删除操作的日志已经记录在里面了
- 替换删除操作
把OP_DELETE操作替换成比较安全的操作,例如
<RECORD>
<OPCODE>OP_DELETE</OPCODE>
<DATA>
<TXID>374</TXID>
<LENGTH>0</LENGTH>
<PATH>/hbase/oldWALs/test%2C16020%2C1630456996531.1630464212306</PATH>
<TIMESTAMP>1630724840046</TIMESTAMP>
<RPC_CLIENTID>d1e2ae59-ba0c-4385-87d4-4da3d9ec019b</RPC_CLIENTID>
<RPC_CALLID>1049</RPC_CALLID>
</DATA>
</RECORD>
注意:只能改变OPCODE,其他不能修改!!!
-
反解析成editlog
# 反解析更改后的xml文件成editlog
hdfs oev -i editlog.xml -o edits_0000000000000000336-0000000000000000414.tmp -p binary
# 重命名掉之前的editlog
mv edits_0000000000000000336-0000000000000000414 edits_0000000000000000336-0000000000000000414.bak
# 替换反解析后的editlog
mv edits_0000000000000000336-0000000000000000414.tmp edits_0000000000000000336-0000000000000000414
- 使用 scp 将修改后的 editlog 同步到其他 JournalNode 节点
- 重启 HDFS,完成文件恢复
以上是关于HDFS文件误删怎么办,一招教你恢复回来,再也不用担心删库跑路了的主要内容,如果未能解决你的问题,请参考以下文章