hadoop一文讲透hdfs的delegation token
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop一文讲透hdfs的delegation token相关的知识,希望对你有一定的参考价值。
1.概述
转载并且补充:一文讲透hdfs的delegation token 最近我也在研究这个,学习一下。
1.1 起因
我最近在做FLink kerberos认证。我在flink配置文件中配置正确的认证方式,然后我都是RichSinkFunction 我在RichSinkFunction中构建了new KafkaProducer 自己构建了客户端,我还需要给我New的客户端传入相关的认证信息吗?还是不用了
然后搜索到一篇文章说如下
所以在这里向了解一下tokern是什么?凭什么能做到一次认证呢?为啥我的不行。我的在open方法创建的kafka生产者还有进行认证。
1.2 概述
前一段时间总结了hadoop中的token认证、yarn任务运行中的token,其中也都提到了delegation token。而最近也遇到了一个问题,问题现象是:flink任务运行超过七天后,由于宿主机异常导致任务失败,继而触发任务的重试,但接连重试几次都是失败的,并且任务的日志也没有聚合,导致无法分析问题失败的原因。最后发现是和delegation token有关,本文就来总结下相关的原理。
2. 【原理】
2.1. 什么是delegation token
先简单描述下为什么需要delegation token。在开启kerberos之后,服务之间交互前,都需要先向KDC认证获取对应的票据。而在一个yarn任务运行过程中可能会产生很多任务container,每个这样的任务container都可能会访问hdfs,由于访问前需要先获取票据来进行认证,那么这个时候KDC就很容易成为性能瓶颈。delegation token(委派token)就是为了减少不必要的认证工作而出现的。
2.2. delegation token在任务提交运行过程中的使用
任务提交运行过程中,delegation token相关的流程如下图所示:
1)首先,RM启动后,内部会创建一个服务线程专门用于处理token的更新
// ResourceManager.java
protected void serviceInit(Configuration configuration) throws Exception
...
if (UserGroupInformation.isSecurityEnabled())
delegationTokenRenewer = createDelegationTokenRenewer();
rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
....
protected DelegationTokenRenewer createDelegationTokenRenewer()
return new DelegationTokenRenewer();
2)客户端申请delegation token
客户端在提交任务前,通常需要先向hdfs上传资源文件(包括运行所需的jar包等),在此过程中会向nn申请一个delegation token,并放到任务启动上下文中,然后向rm发送提交任务请求(请求中包含任务的启动上下文)。
下面是flink on yarn提交任务时的代码片段:
// flink YarnClusterDescriptor.java
private ApplicationReport startAppMaster(...)
// 开启kerberos的情况下,获取token
if (UserGroupInformation.isSecurityEnabled())
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container.");
List<Path> yarnAccessList =
ConfigUtils.decodeListFromConfig(
configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
Utils.setTokensFor(
amContainer,
ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
yarnConfiguration);
public static void setTokensFor(
ContainerLaunchContext amContainer, List<Path> paths, Configuration conf)
throws IOException
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
// for HBase
obtainTokenForHBase(credentials, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
// 获取到的token 放到启动上下文中
Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok)
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token " + id + " with " + token);
credentials.addToken(id, token);
try (DataOutputBuffer dob = new DataOutputBuffer())
credentials.writeTokenStorageToStream(dob);
if (LOG.isDebugEnabled())
LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
// TokenCache.java
// 调用hadoop的接口 向nn请求token
public static void obtainTokensForNamenodes(
Credentials credentials,
Path[] ps, Configuration conf)
throws IOException
if (!UserGroupInformation.isSecurityEnabled())
return;
obtainTokensForNamenodesInternal(credentials, ps, conf);
static void obtainTokensForNamenodesInternal(
Credentials credentials,
Path[] ps,
Configuration conf)
throws IOException
Set<FileSystem> fsSet = new HashSet<FileSystem>();
for (Path p : ps)
fsSet.add(p.getFileSystem(conf));
String masterPrincipal = Master.getMasterPrincipal(conf);
for (FileSystem fs : fsSet)
obtainTokensForNamenodesInternal(fs, credentials, conf, masterPrincipal);
static void obtainTokensForNamenodesInternal(
FileSystem fs,
Credentials credentials,
Configuration conf,
String renewer)
throws IOException
...
final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer, credentials);
...
// FileSystem.java
public Token<?>[] addDelegationTokens(
final String renewer, Credentials credentials)
throws IOException
if (credentials == null)
credentials = new Credentials();
final List<Token<?>> tokens = new ArrayList<>();
collectDelegationTokens(renewer, credentials, tokens);
return tokens.toArray(new Token<?>[tokens.size()]);
private void collectDelegationTokens(
final String renewer,
final Credentials credentials,
final List<Token<?>> tokens)
throws IOException
final String serviceName = getCanonicalServiceName();
// Collect token of the this filesystem and then of its embedded children
if (serviceName != null) // fs has token, grab it
final Text service = new Text(serviceName);
Token<?> token = credentials.getToken(service);
if (token == null)
// 向NN 请求delegation token
token = getDelegationToken(renewer);
if (token != null)
tokens.add(token);
credentials.addToken(service, token);
...
3)RM将token添加到delegation token更新服务中
RM在处理客户端提交任务请求时,判断是否启用kerberos认证,如果启用则从任务启动上下文中解析出delegation token,并添加到delegation token更新服务中。在该服务中,会启动线程定时对delegation token进行更新。此后,继续向NM发送启动container的请求,delegation token则随启动上下文被带到NM中。
// RMAppManager.java
protected void submitApplication(
ApplicationSubmissionContext submissionContext,
long submitTime,
String user)
throws YarnException
...
if (UserGroupInformation.isSecurityEnabled())
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(
applicationId,
BuilderUtils.parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete(),
application.getUser(),
BuilderUtils.parseTokensConf(submissionContext));
...
4)NM使用delegation token
NM收到启动container的请求后,从请求(任务启动上下文)中解析出delegation token,并为该container构造一个对应的实例对象,同时将delegation token保存在该实例对象中,然后为该container进行资源本地化,即从hdfs中下载必须的资源文件,这里就会用到传递过来的delegation token。同时在任务结束时,如果需要进行任务日志聚合,仍旧会使用该delegation token将任务的日志上传到hdfs的指定路径。
另外,delegation token还会写入到持久化文件中,一方面用于NM的异常恢复,另一方面是将token传递给任务container进程以供使用。
3. delegation token的更新与生命周期
1)申请token时已经指定了token的最大生命周期
// FSNamesystem.java
Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException
...
DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, renewer, realUser);
token = new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
...
return token;
// Token.java
public Token(T id, SecretManager<T> mgr)
password = mgr.createPassword(id);
identifier = id.getBytes();
kind = id.getKind();
service = new Text();
// AbstractDelegationTokenSecretManager
protected synchronized byte[] createPassword(TokenIdent identifier)
long now = Time.now();
identifier.setMaxDate(now + tokenMaxLifetime);
...
2)RM接收到任务提交请求后,先进行一次更新得到token的下次超时时间,然后再根据超时时间设置定时器时间触发进行更新。
public void addApplicationSync(
ApplicationId applicationId,
Credentials ts,
boolean shouldCancelAtEnd,
String user)
throws IOException, InterruptedException
handleAppSubmitEvent(
new DelegationTokenRenewerAppSubmitEvent(
applicationId, ts, shouldCancelAtEnd, user, new Configuration()));
private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
throws IOException, InterruptedException
...
Credentials ts = evt.getCredentials();
Collection<Token<?>> tokens = ts.getAllTokens();
for (Token<?> token : tokens)
DelegationTokenToRenew dttr = allTokens.get(token);
if (dttr == null)
dttr = new DelegationTokenToRenew(
Arrays.asList(applicationId),
token, tokenConf, now, shouldCancelAtEnd,
evt.getUser());
try
// 先进行一次更新
renewToken(dttr)
catch (IOException ioe)
...
tokenList.add(dttr);
if (!tokenList.isEmpty())
for (DelegationTokenToRenew dtr : tokenList)
DelegationTokenToRenew currentDtr = allTokens.putIfAbsent(dtr.token, dtr);
if (currentDtr != null)
// another job beat us
currentDtr.referringAppIds.add(applicationId);
appTokens.get(applicationId).add(currentDtr);
else
appTokens.get(applicationId).add(dtr);
setTimerForTokenRenewal(dtr);
protected void renewToken(final DelegationTokenToRenew dttr)
throws IOException
// need to use doAs so that http can find the kerberos tgt
// NOTE: token renewers should be responsible for the correct UGI!
try
// 更新delegation token 并得到下次超时时间
dttr.expirationDate =
UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Long>()
@Override
public Long run() throws Exception
return dttr.token.renew(dttr.conf);
);
catch (InterruptedException e)
throw new IOException(e);
LOG.info("Renewed delegation-token= [" + dttr + "]");
protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
throws IOException
// calculate timer time
long expiresIn = token.expirationDate - System.currentTimeMillis();
if (expiresIn <= 0)
LOG.info("Will not renew token " + token);
return;
long renewIn = token.expirationDate - expiresIn / 10; // little bit before the expiration
// need to create new task every time
RenewalTimerTask tTask = new RenewalTimerTask(token);
token.setTimerTask(tTask); // keep reference to the timer
renewalTimer.schedule(token.timerTask, new Date(renewIn));
LOG.info(
"Renew " + token + " in " + expiresIn + " ms, appId = " +
token.referringAppIds);
再来看更新token的请求与处理细节:
// 客户端发送更新请求
public long renew(Token<?> token, Configuration conf) throws IOException
Token<DelegationTokenIdentifier> delToken = (Token<DelegationTokenIdentifier>) token;
ClientProtocol nn = getNNProxy(delToken, conf);
try
return nn.renewDelegationToken(delToken);
catch (RemoteException re)
throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class);
// 服务端的响应处理
long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException
try
...
expiryTime = dtSecretManager.renewToken(token, renewer);
catch (AccessControlException ace)
...
return expiryTime;
public synchronized long renewToken(
Token<TokenIdent> token,
String renewer)
throws InvalidToken, IOException
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
TokenIdent id = createIdentifier();
id.readFields(in);
LOG.info(
"Token re以上是关于hadoop一文讲透hdfs的delegation token的主要内容,如果未能解决你的问题,请参考以下文章