hadoop一文讲透hdfs的delegation token

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop一文讲透hdfs的delegation token相关的知识,希望对你有一定的参考价值。

1.概述

转载并且补充:一文讲透hdfs的delegation token 最近我也在研究这个,学习一下。

1.1 起因

我最近在做FLink kerberos认证。我在flink配置文件中配置正确的认证方式,然后我都是RichSinkFunction 我在RichSinkFunction中构建了new KafkaProducer 自己构建了客户端,我还需要给我New的客户端传入相关的认证信息吗?还是不用了


然后搜索到一篇文章说如下


所以在这里向了解一下tokern是什么?凭什么能做到一次认证呢?为啥我的不行。我的在open方法创建的kafka生产者还有进行认证。

1.2 概述

前一段时间总结了hadoop中的token认证、yarn任务运行中的token,其中也都提到了delegation token。而最近也遇到了一个问题,问题现象是:flink任务运行超过七天后,由于宿主机异常导致任务失败,继而触发任务的重试,但接连重试几次都是失败的,并且任务的日志也没有聚合,导致无法分析问题失败的原因。最后发现是和delegation token有关,本文就来总结下相关的原理。

2. 【原理】

2.1. 什么是delegation token

先简单描述下为什么需要delegation token。在开启kerberos之后,服务之间交互前,都需要先向KDC认证获取对应的票据。而在一个yarn任务运行过程中可能会产生很多任务container,每个这样的任务container都可能会访问hdfs,由于访问前需要先获取票据来进行认证,那么这个时候KDC就很容易成为性能瓶颈。delegation token(委派token)就是为了减少不必要的认证工作而出现的。

2.2. delegation token在任务提交运行过程中的使用

任务提交运行过程中,delegation token相关的流程如下图所示:


1)首先,RM启动后,内部会创建一个服务线程专门用于处理token的更新

// ResourceManager.java
protected void serviceInit(Configuration configuration) throws Exception 
    ...
    if (UserGroupInformation.isSecurityEnabled()) 
        delegationTokenRenewer = createDelegationTokenRenewer();
        rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
    
    ....

 
protected DelegationTokenRenewer createDelegationTokenRenewer() 
    return new DelegationTokenRenewer();

2)客户端申请delegation token

客户端在提交任务前,通常需要先向hdfs上传资源文件(包括运行所需的jar包等),在此过程中会向nn申请一个delegation token,并放到任务启动上下文中,然后向rm发送提交任务请求(请求中包含任务的启动上下文)。

下面是flink on yarn提交任务时的代码片段:

// flink YarnClusterDescriptor.java
private ApplicationReport startAppMaster(...)
    // 开启kerberos的情况下,获取token
    if (UserGroupInformation.isSecurityEnabled()) 
      // set HDFS delegation tokens when security is enabled
      LOG.info("Adding delegation token to the AM container.");
      List<Path> yarnAccessList =
        ConfigUtils.decodeListFromConfig(
          configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
      Utils.setTokensFor(
        amContainer,
        ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()),
        yarnConfiguration);
    

 
public static void setTokensFor(
    ContainerLaunchContext amContainer, List<Path> paths, Configuration conf)
    throws IOException 
    Credentials credentials = new Credentials();
    // for HDFS
    TokenCache.obtainTokensForNamenodes(credentials, paths.toArray(new Path[0]), conf);
    // for HBase
    obtainTokenForHBase(credentials, conf);
    // for user
    UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
    // 获取到的token 放到启动上下文中
    Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
    for (Token<? extends TokenIdentifier> token : usrTok) 
        final Text id = new Text(token.getIdentifier());
        LOG.info("Adding user token " + id + " with " + token);
        credentials.addToken(id, token);
    
    try (DataOutputBuffer dob = new DataOutputBuffer()) 
        credentials.writeTokenStorageToStream(dob);
 
        if (LOG.isDebugEnabled()) 
            LOG.debug("Wrote tokens. Credentials buffer length: " + dob.getLength());
        
 
        ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
        amContainer.setTokens(securityTokens);
    

 
// TokenCache.java
// 调用hadoop的接口 向nn请求token
public static void obtainTokensForNamenodes(
    Credentials credentials,
    Path[] ps, Configuration conf) 
    throws IOException 
    if (!UserGroupInformation.isSecurityEnabled()) 
        return;
    
    obtainTokensForNamenodesInternal(credentials, ps, conf);

 
static void obtainTokensForNamenodesInternal(
    Credentials credentials,
    Path[] ps, 
    Configuration conf) 
    throws IOException 
    Set<FileSystem> fsSet = new HashSet<FileSystem>();
    for (Path p : ps) 
        fsSet.add(p.getFileSystem(conf));
    
    String masterPrincipal = Master.getMasterPrincipal(conf);
    for (FileSystem fs : fsSet) 
        obtainTokensForNamenodesInternal(fs, credentials, conf, masterPrincipal);
    

 
static void obtainTokensForNamenodesInternal(
    FileSystem fs,
    Credentials credentials, 
    Configuration conf, 
    String renewer)
    throws IOException 
    ...
    final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer, credentials);
    ...

 
// FileSystem.java
public Token<?>[] addDelegationTokens(
    final String renewer, Credentials credentials) 
    throws IOException 
    if (credentials == null) 
        credentials = new Credentials();
    
    final List<Token<?>> tokens = new ArrayList<>();
    collectDelegationTokens(renewer, credentials, tokens);
    return tokens.toArray(new Token<?>[tokens.size()]);

 
private void collectDelegationTokens(
    final String renewer,
    final Credentials credentials,
    final List<Token<?>> tokens)
    throws IOException 
    final String serviceName = getCanonicalServiceName();
    // Collect token of the this filesystem and then of its embedded children
    if (serviceName != null)  // fs has token, grab it
        final Text service = new Text(serviceName);
        Token<?> token = credentials.getToken(service);
        if (token == null) 
            // 向NN 请求delegation token
            token = getDelegationToken(renewer);
            if (token != null) 
                tokens.add(token);
                credentials.addToken(service, token);
            
        
    
    ...

3)RM将token添加到delegation token更新服务中

RM在处理客户端提交任务请求时,判断是否启用kerberos认证,如果启用则从任务启动上下文中解析出delegation token,并添加到delegation token更新服务中。在该服务中,会启动线程定时对delegation token进行更新。此后,继续向NM发送启动container的请求,delegation token则随启动上下文被带到NM中。

// RMAppManager.java
protected void submitApplication(
    ApplicationSubmissionContext submissionContext, 
    long submitTime,
    String user)
    throws YarnException 
    ...
    if (UserGroupInformation.isSecurityEnabled()) 
        this.rmContext.getDelegationTokenRenewer().addApplicationAsync(
            applicationId,
            BuilderUtils.parseCredentials(submissionContext),
            submissionContext.getCancelTokensWhenComplete(),
            application.getUser(),
            BuilderUtils.parseTokensConf(submissionContext));
    
    ...

4)NM使用delegation token

NM收到启动container的请求后,从请求(任务启动上下文)中解析出delegation token,并为该container构造一个对应的实例对象,同时将delegation token保存在该实例对象中,然后为该container进行资源本地化,即从hdfs中下载必须的资源文件,这里就会用到传递过来的delegation token。同时在任务结束时,如果需要进行任务日志聚合,仍旧会使用该delegation token将任务的日志上传到hdfs的指定路径。

另外,delegation token还会写入到持久化文件中,一方面用于NM的异常恢复,另一方面是将token传递给任务container进程以供使用。

3. delegation token的更新与生命周期

1)申请token时已经指定了token的最大生命周期

// FSNamesystem.java
Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException 
    ...
    DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, renewer, realUser);
    token = new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
    ...
    return token;

 
// Token.java
public Token(T id, SecretManager<T> mgr) 
    password = mgr.createPassword(id);
    identifier = id.getBytes();
    kind = id.getKind();
    service = new Text();

 
// AbstractDelegationTokenSecretManager
protected synchronized byte[] createPassword(TokenIdent identifier) 
    long now = Time.now();
    identifier.setMaxDate(now + tokenMaxLifetime);
    ...

2)RM接收到任务提交请求后,先进行一次更新得到token的下次超时时间,然后再根据超时时间设置定时器时间触发进行更新。

public void addApplicationSync(
    ApplicationId applicationId, 
    Credentials ts,
    boolean shouldCancelAtEnd, 
    String user) 
    throws IOException, InterruptedException 
    handleAppSubmitEvent(
        new DelegationTokenRenewerAppSubmitEvent(
            applicationId, ts, shouldCancelAtEnd, user, new Configuration()));

 
private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
    throws IOException, InterruptedException 
    ...
    Credentials ts = evt.getCredentials();
    Collection<Token<?>> tokens = ts.getAllTokens();
    for (Token<?> token : tokens) 
        DelegationTokenToRenew dttr = allTokens.get(token);
        if (dttr == null) 
            dttr = new DelegationTokenToRenew(
                Arrays.asList(applicationId), 
                token, tokenConf, now, shouldCancelAtEnd, 
                evt.getUser());
            try 
                // 先进行一次更新
                renewToken(dttr)
             catch (IOException ioe) 
                ...
            
        
        tokenList.add(dttr);
    
    
    if (!tokenList.isEmpty()) 
        for (DelegationTokenToRenew dtr : tokenList) 
            DelegationTokenToRenew currentDtr = allTokens.putIfAbsent(dtr.token, dtr);
            if (currentDtr != null) 
                // another job beat us
                currentDtr.referringAppIds.add(applicationId);
                appTokens.get(applicationId).add(currentDtr);
             else 
                appTokens.get(applicationId).add(dtr);
                setTimerForTokenRenewal(dtr);
            
        
    

 
protected void renewToken(final DelegationTokenToRenew dttr)
    throws IOException 
    // need to use doAs so that http can find the kerberos tgt
    // NOTE: token renewers should be responsible for the correct UGI!
    try 
        // 更新delegation token 并得到下次超时时间
        dttr.expirationDate =
            UserGroupInformation.getLoginUser().doAs(
                new PrivilegedExceptionAction<Long>() 
                    @Override
                    public Long run() throws Exception 
                        return dttr.token.renew(dttr.conf);
                    
                );
     catch (InterruptedException e) 
        throw new IOException(e);
    
    LOG.info("Renewed delegation-token= [" + dttr + "]");

 
protected void setTimerForTokenRenewal(DelegationTokenToRenew token)
    throws IOException 
    // calculate timer time
    long expiresIn = token.expirationDate - System.currentTimeMillis();
    if (expiresIn <= 0) 
        LOG.info("Will not renew token " + token);
        return;
    
    long renewIn = token.expirationDate - expiresIn / 10; // little bit before the expiration
    // need to create new task every time
    RenewalTimerTask tTask = new RenewalTimerTask(token);
    token.setTimerTask(tTask); // keep reference to the timer
 
    renewalTimer.schedule(token.timerTask, new Date(renewIn));
    LOG.info(
        "Renew " + token + " in " + expiresIn + " ms, appId = " +
        token.referringAppIds);

再来看更新token的请求与处理细节:

// 客户端发送更新请求
public long renew(Token<?> token, Configuration conf) throws IOException 
    Token<DelegationTokenIdentifier> delToken = (Token<DelegationTokenIdentifier>) token;
    ClientProtocol nn = getNNProxy(delToken, conf);
    try 
        return nn.renewDelegationToken(delToken);
     catch (RemoteException re) 
        throw re.unwrapRemoteException(InvalidToken.class,
                AccessControlException.class);
    

 
// 服务端的响应处理
long renewDelegationToken(Token<DelegationTokenIdentifier> token)
    throws InvalidToken, IOException 
    try 
        ...
        expiryTime = dtSecretManager.renewToken(token, renewer);
     catch (AccessControlException ace) 
        ...
    
    return expiryTime;

 
public synchronized long renewToken(
    Token<TokenIdent> token,
    String renewer) 
    throws InvalidToken, IOException 
    ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
    DataInputStream in = new DataInputStream(buf);
    TokenIdent id = createIdentifier();
    id.readFields(in);
    LOG.info(
        "Token re

以上是关于hadoop一文讲透hdfs的delegation token的主要内容,如果未能解决你的问题,请参考以下文章

一文讲透大数据列存标准格式:Parquet

一文即懂Hadoop及hdfs回收站

HDFS原理 | 一文读懂HDFS架构与设计

一文带你了解大数据技术之Hadoop

大数据系列:一文初识Hdfs

MySQL系列5 - 一文讲透 join语句