FlinkFlink 源码之 安全认证

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之 安全认证相关的知识,希望对你有一定的参考价值。

1.概述

这篇文章转载:Flink 源码之 安全认证 非常巧合的是,我上周用了2-4天才完成flink安全认证,还是没有搜过,任何安全相关的代码,完成了。结果今天就看到了。

本篇分析下Flink安全认证部分的处理方式。主要为Kerberos认证相关内容。下面从配置项开始分析。

2.SecurityConfiguration

此类包含了Flink安全认证相关的配置项。它们的含义如下:

  • zookeeper.sasl.disable:是否启用Zookeeper SASL。

  • security.kerberos.login.keytab:Kerberos认证keytab文件的路径。

  • security.kerberos.login.principal:Kerberos认证principal。

  • security.kerberos.login.use-ticket-cache:Kerberos认证是否使用票据缓存。

  • security.kerberos.login.contexts:Kerberos登录上下文名称,等效于JAAS文件的entry name。

  • zookeeper.sasl.service-name:Zookeeper SASL服务名。默认为zookeeper。

  • zookeeper.sasl.login-context-name:Zookeeper SASL登陆上下文名称。默认为Client。

  • security.context.factory.classes:包含哪些 SecurityContextFactory。默认值为:HadoopSecurityContextFactory 或者 NoOpSecurityContextFactory

  • security.module.factory.classes:包含哪些SecurityModuleFactory。 默认值为:HadoopModuleFactory,JaasModuleFactory,ZookeeperModuleFactory

3.SecurityUtils

SecurityUtils.install方法是提交Flink任务安全认证的入口方法,用于安装安全配置。它的代码如下所示:

public static void install(SecurityConfiguration config) throws Exception 
    // Install the security modules first before installing the security context
    // 安装安全模块
    installModules(config);
    // 安装安全上下文
    installContext(config);

installModules方法用于安装安全认证模块。安全认证模块的内容在后面分析。

static void installModules(SecurityConfiguration config) throws Exception 

        // install the security module factories
        List<SecurityModule> modules = new ArrayList<>();
        // 遍历所有SecurityModuleFactory的配置
        for (String moduleFactoryClass : config.getSecurityModuleFactories()) 
            SecurityModuleFactory moduleFactory = null;
            try 
                // 使用ServiceLoader加载ModuleFactory
                moduleFactory = SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
             catch (NoMatchSecurityFactoryException ne) 
                LOG.error("Unable to instantiate security module factory ", moduleFactoryClass);
                throw new IllegalArgumentException("Unable to find module factory class", ne);
            
            // 使用factory创建出SecurityModule
            SecurityModule module = moduleFactory.createModule(config);
            // can be null if a SecurityModule is not supported in the current environment
            if (module != null) 
                // 安装module
                // 添加module到modules集合
                module.install();
                modules.add(module);
            
        
        installedModules = modules;
    

installContext方法用于安装安全上下文环境,它的用途同样在后面章节介绍。

static void installContext(SecurityConfiguration config) throws Exception 
        // install the security context factory
        // 遍历SecurityContextFactories
        // 配置项名称为security.context.factory.classes
        for (String contextFactoryClass : config.getSecurityContextFactories()) 
            try 
                // 使用ServiceLoader,加载SecurityContextFactory
                SecurityContextFactory contextFactory =
                        SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
                // 检查SecurityContextFactory是否和配置文件兼容(1)
                if (contextFactory.isCompatibleWith(config)) 
                    try 
                        // 创建出第一个兼容的SecurityContext
                        installedContext = contextFactory.createContext(config);
                        // install the first context that's compatible and ignore the remaining.
                        break;
                     catch (SecurityContextInitializeException e) 
                        LOG.error(
                                "Cannot instantiate security context with: " + contextFactoryClass,
                                e);
                     catch (LinkageError le) 
                        LOG.error(
                                "Error occur when instantiate security context with: "
                                        + contextFactoryClass,
                                le);
                    
                 else 
                    LOG.debug("Unable to install security context factory ", contextFactoryClass);
                
             catch (NoMatchSecurityFactoryException ne) 
                LOG.warn("Unable to instantiate security context factory ", contextFactoryClass);
            
        
        if (installedContext == null) 
            LOG.error("Unable to install a valid security context factory!");
            throw new Exception("Unable to install a valid security context factory!");
        
    

数字标注内容解析:

这里分析下isCompatibleWith方法逻辑,SecurityContextFactory具有HadoopSecurityContextFactoryNoOpSecurityContextFactory两个实现类。其中HadoopSecurityContextFactory要求security.module.factory.classes配置项包含org.apache.flink.runtime.security.modules.HadoopModuleFactory,并且要求org.apache.hadoop.security.UserGroupInformation在classpath中。NoOpSecurityContextFactory无任何要求。

4.SecurityModule

SecurityModule分别为不同类型服务提供安全认证功能,包含3个子类:

  • HadoopModule:使用UserGroupInformation方式认证。
  • JaasModule:负责安装JAAS配置,在进程范围内生效。
  • ZookeeperModule:提供Zookeeper安全配置。

4.1 HadoopModule

HadoopModule包含了Flink的SecurityConfiguration和Hadoop的配置信息(从Hadoop配置文件读取,读取逻辑在HadoopUtils.getHadoopConfiguration,后面分析)。

4.1.1 install方法

install方法使用Hadoop提供的UserGroupInformation进行认证操作。

@Override
    public void install() throws SecurityInstallException 

        // UGI设置hadoop conf
        UserGroupInformation.setConfiguration(hadoopConfiguration);

        UserGroupInformation loginUser;

        try 
            // 如果Hadoop启用了安全配置
            if (UserGroupInformation.isSecurityEnabled()
                    && !StringUtils.isBlank(securityConfig.getKeytab())
                    && !StringUtils.isBlank(securityConfig.getPrincipal())) 
                // 获取keytab路径
                String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();

                // 使用UGI认证Flink conf中配置的keytab和principal
                UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);

                // 获取认证的用户
                loginUser = UserGroupInformation.getLoginUser();

                // supplement with any available tokens
                // 从HADOOP_TOKEN_FILE_LOCATION读取token缓存文件
                String fileLocation =
                        System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
                // 如果有本地token缓存
                if (fileLocation != null) 
                    Credentials credentialsFromTokenStorageFile =
                            Credentials.readTokenStorageFile(
                                    new File(fileLocation), hadoopConfiguration);

                    // if UGI uses Kerberos keytabs for login, do not load HDFS delegation token
                    // since
                    // the UGI would prefer the delegation token instead, which eventually expires
                    // and does not fallback to using Kerberos tickets
                    // 如果UGI使用keytab方式登录,不用加载HDFS的delegation token
                    // 因为UGI倾向于使用delegation token,这些token最终会失效,不会使用kerberos票据
                    Credentials credentialsToBeAdded = new Credentials();
                    final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
                    Collection<Token<? extends TokenIdentifier>> usrTok =
                            credentialsFromTokenStorageFile.getAllTokens();
                    // If UGI use keytab for login, do not load HDFS delegation token.
                    // 遍历token存储文件中的token
                    // 将所有的非delegation token添加到凭据中
                    for (Token<? extends TokenIdentifier> token : usrTok) 
                        if (!token.getKind().equals(hdfsDelegationTokenKind)) 
                            final Text id = new Text(token.getIdentifier());
                            credentialsToBeAdded.addToken(id, token);
                        
                    

                    // 为loginUser添加凭据
                    loginUser.addCredentials(credentialsToBeAdded);
                
             else 
                // 如果没有启动安全配置
                // 从当前用户凭据认证

                // login with current user credentials (e.g. ticket cache, OS login)
                // note that the stored tokens are read automatically
                try 
                    // 反射调用如下方法
                    
                    // Use reflection API to get the login user object
                    // UserGroupInformation.loginUserFromSubject(null);
                    Method loginUserFromSubjectMethod =
                            UserGroupInformation.class.getMethod(
                                    "loginUserFromSubject", Subject.class);
                    loginUserFromSubjectMethod.invoke(null, (Subject) null);
                 catch (NoSuchMethodException e) 
                    LOG.warn("Could not find method implementations in the shaded jar.", e);
                 catch (InvocationTargetException e) 
                    throw e.getTargetException();
                

                // 获取当前登录用户
                loginUser = UserGroupInformation.getLoginUser();
            

            LOG.info("Hadoop user set to ", loginUser);

            if (HadoopUtils.isKerberosSecurityEnabled(loginUser)) 
                boolean isCredentialsConfigured =
                        HadoopUtils.areKerberosCredentialsValid(
                                loginUser, securityConfig.useTicketCache());

                LOG.info(
                        "Kerberos security is enabled and credentials are .",
                        isCredentialsConfigured ? "valid" : "invalid");
            
         catch (Throwable ex) 
            throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
        
    

4.1.2 getHadoopConfiguration

这个方法为读取Hadoop配置文件的逻辑,较为复杂,接下来详细分析下。

public static Configuration getHadoopConfiguration(
    org.apache.flink.configuration.Configuration flinkConfiguration) 

    // Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
    // from the classpath

    // 创建个空的conf
    Configuration result = new HdfsConfiguration();
    // 标记是否找到hadoop配置文件
    boolean foundHadoopConfiguration = false;

    // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
    // the hdfs configuration.
    // The properties of a newly added resource will override the ones in previous resources, so
    // a configuration
    // file with higher priority should be added later.

    // Approach 1: HADOOP_HOME environment variables
    // 保存两个可能的hadoop conf路径
    String[] possibleHadoopConfPaths = new String[2];

    // 获取HADOOP_HOME环境变量的值
    final String hadoopHome = System.getenv("HADOOP_HOME");
    if (hadoopHome != null) 
        LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: ", hadoopHome);
        // 如果发现HADOOP_HOME环境变量的值
        // 尝试分别从如下路径获取:
        // $HADOOP_HOME/conf
        // $HADOOP_HOME/etc/hadoop
        possibleHadoopConfPaths[0] = hadoopHome + "/conf";
        possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
    

    for (String possibleHadoopConfPath : possibleHadoopConfPaths) 
        if (possibleHadoopConfPath != null) 
            // 依次尝试读取possibleHadoopConfPath下的core-site.xml文件和hdfs-site.xml文件到hadoop conf中
            foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
        
    

    // Approach 2: Flink configuration (deprecated)
    // 获取Flink配置项 fs.hdfs.hdfsdefault 对应的配置文件,加入hadoop conf
    final String hdfsDefaultPath =
        flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
    if (hdfsDefaultPath != null) 
        result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
        LOG.debug(
            "Using hdfs-default configuration-file path from Flink config: ",
            hdfsDefaultPath);
        foundHadoopConfiguration = true;
    
    
    // 获取Flink配置项 fs.hdfs.hadoopconf 对应的配置文件,加入hadoop conf
    final String hdfsSitePath =
        flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
    if (hdfsSitePath != null) 
        result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
        LOG.debug(
            "Using hdfs-site configuration-file path from Flink config: ", hdfsSitePath);
        foundHadoopConfiguration = true;
    

    // 获取Flink配置项 fs.hdfs.hadoopconf 对应的配置文件,加入hadoop conf
    final String hadoopConfigPath =
        flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
    if (hadoopConfigPath != null) 
        LOG.debug("Searching Hadoop configuration files in Flink config: ", hadoopConfigPath);
        foundHadoopConfiguration =
            addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
    

    // Approach 3: HADOOP_CONF_DIR environment variable
    // 从系统环境变量HADOOP_CONF_DIR目录中读取hadoop配置文件
    String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
    if (hadoopConfDir != null) 
        LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: ", hadoopConfDir);
        foundHadoopConfiguration =
            addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
    

    // Approach 4: Flink configuration
    // add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
    // 读取Flink配置文件中所有以flink.hadoop.为前缀的key
    // 将这些key截掉这个前缀作为新的key,和原先的value一起作为hadoop conf的配置项,存放入hadoop conf
    for (String key : flinkConfiguration.keySet()) 
        for (String prefix : FLINK_CONFIG_PREFIXES) 
            if (key.startsWith(prefix)) 
                String newKey = key.substring(prefix.length());
                String value = flinkConfiguration.getString(key, null);
                result.set(newKey, value);
                LOG.debug(
                    "Adding Flink config entry for  as = to Hadoop config",
                    key,
                    newKey,
                    value);
                foundHadoopConfiguration = true;
            
        
    

    // 如果以上途径均未找到hadoop conf,显示告警信息
    if (!foundHadoopConfiguration) 
        LOG.warn(
            "Could not find Hadoop configuration via any of the supported methods "
            + "(Flink configuration, environment variables).");
    

    return result;

我们总结下Flink读取Hadoop配置文件的完整逻辑,从上到下为读取顺序:

  • 读取HADOOP_HOME环境变量,如果存在,分别从它的conf和etc/hadoop目录下读取core-site.xml和hdfs-site.xml文件。

  • 从Flink配置文件的fs.hdfs.hdfsdefault配置项所在目录下寻找。

  • 从Flink配置文件的fs.hdfs.hadoopconf配置项所在目录下寻找。

  • HADOOP_CONF_DIR环境变量对应的目录下寻找。

  • 读取Flink配置文件中所有以flink.hadoop.为前缀的key,将这些key截掉这个前缀作为新的key,和原先的value一起作为hadoop conf的配置项,存放入hadoop conf。

4.2 .JaasModule

install方法读取了java.security.auth.login.config系统变量对应的jaas配置,并且将Flink配置文件中相关配置转换为JAAS中的entry,合并到系统变量对应的jaas配置中并设置给JVM。代码如下所示:

@Override
public void install() 

    // ensure that a config file is always defined, for compatibility with
    // ZK and Kafka which check for the system property and existence of the file
    // 读取java.security.auth.login.config系统变量值,用于在卸载module的时候恢复
    priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
    // 如果没有配置
    if (priorConfigFile == null) 
        // Flink的 io.tmp.dirs配置项第一个目录为workingDir
        // 将默认的flink-jaas.conf文件写入这个位置,创建临时文件,名为jass-xxx.conf
        // 在JVM进程关闭的时候删除这个临时文件
        File configFile = generateDefaultConfigFile(workingDir);
        // 配置java.security.auth.login.config系统变量值
        // 保证这个系统变量的值始终存在,这是为了兼容Zookeeper和Kafka
        // 他们会去检查这个jaas文件是否存在
        System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
        LOG.info("Jaas file will be created as .", configFile);
    

    // read the JAAS configuration file
    // 读取已安装的jaas配置文件
    priorConfig = javax.security.auth.login.Configuration.getConfiguration();

    // construct a dynamic JAAS configuration
    // 包装为DynamicConfiguration,这个配置是可以修改的
    currentConfig = new DynamicConfiguration(priorConfig);

    // wire up the configured JAAS login contexts to use the krb5 entries
    // 从Flink配置文件中读取kerberos配置
    // AppConfigurationEntry为Java读取Jaas配置文件中一段配置项的封装
    // 一段配置项指的是大括号之内的配置
    AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
    if (krb5Entries != null) 
        // 遍历Flink配置项security.kerberos.login.contexts,作为entry name使用
        for (String app : securityConfig.getLoginContextNames()) 
            // 将krb5Entries对应的AppConfigurationEntry添加入currrentConfig
            // 使用security.kerberos.login.contexts对应的entry name
            currentConfig.addAppConfigurationEntry(app, krb5Entries);
        
    

    // 设置新的currentConfig
    javax.security.auth.login.Configuration.setConfiguration(currentConfig);

上面代码中getAppConfigurationEntries方法逻辑较为复杂,下面给出它的解析。

getAppConfigurationEntries方法从Flink的securityConfig中读取配置,转换为JAAS entry的格式,存入AppConfigurationEntry。如果Flink配置了security.kerberos.login.use-ticket-cache,加载类似如下内容的文件,生成一个AppConfigurationEntry叫做userKerberosAce

EntryName 
    com.sun.security.auth.module.Krb5LoginModule optional
    doNotPrompt=true
    useTicketCache=true
    renewTGT=true;
;

如果Flink中配置了security.kerberos.login.keytab,会加载如下配置,生成一个AppConfigurationEntry叫做keytabKerberosAce

EntryName 
    com.sun.security.auth.module.Krb5LoginModule required
    keyTab=keytab路径
    doNotProm

以上是关于FlinkFlink 源码之 安全认证的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFlink 源码之时间处理

FlinkFlink 源码之ExecutionGraph

FlinkFlink 源码之RPC调用

FlinkFlink 源码之Buffer Debloating

FlinkFlink 源码之AsyncFunction异步 IO 源码

FlinkFlink 源码之OperatorChain