Flink1.15源码解析--安全模块及安全上下文

Posted 宝哥大数据[离职找工作中,大佬帮内推下]

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--安全模块及安全上下文相关的知识,希望对你有一定的参考价值。

文章目录

1.6、通过 SPI 加载安全配置模块

 SecurityUtils.install(new SecurityConfiguration(cli.configuration));

1.6.1、SecurityConfiguration 初始化

    /**
     * Create a security configuration from the global configuration.
     * 从全局配置创建安全配置.
     * @param flinkConf the Flink global configuration.
     */
    public SecurityConfiguration(Configuration flinkConf) 
        this(
                flinkConf,
                flinkConf.get(SECURITY_CONTEXT_FACTORY_CLASSES),
                flinkConf.get(SECURITY_MODULE_FACTORY_CLASSES));
    

    // ------------------------------------------------------------------------
    //  Custom Security Service Loader
    // ------------------------------------------------------------------------

    public static final ConfigOption<List<String>> SECURITY_CONTEXT_FACTORY_CLASSES =
            key("security.context.factory.classes")
                    .stringType()
                    .asList()
                    .defaultValues(
                            "org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory",
                            "org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory")
                    .withDescription(
                            "List of factories that should be used to instantiate a security context. "
                                    + "If multiple are configured, Flink will use the first compatible "
                                    + "factory. You should have a NoOpSecurityContextFactory in this list "
                                    + "as a fallback.");

    public static final ConfigOption<List<String>> SECURITY_MODULE_FACTORY_CLASSES =
            key("security.module.factory.classes")
                    .stringType()
                    .asList()
                    .defaultValues(
                            "org.apache.flink.runtime.security.modules.HadoopModuleFactory",
                            "org.apache.flink.runtime.security.modules.JaasModuleFactory",
                            "org.apache.flink.runtime.security.modules.ZookeeperModuleFactory")
                    .withDescription(
                            "List of factories that should be used to instantiate security "
                                    + "modules. All listed modules will be installed. Keep in mind that the "
                                    + "configured security context might rely on some modules being present.");

继续进入

    /**
     * Create a security configuration from the global configuration.
     *
     * @param flinkConf the Flink global configuration.
     * @param securityModuleFactories the security modules to apply.
     */
    public SecurityConfiguration(
            Configuration flinkConf,
            List<String> securityContextFactory,
            List<String> securityModuleFactories) 
        // 1. 一些全局参数的配置
        this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
        this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
        this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
        this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
        this.loginContextNames =
                parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
        this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
        this.zkLoginContextName =
                flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
         
        this.securityModuleFactories = Collections.unmodifiableList(securityModuleFactories);
        this.securityContextFactory = securityContextFactory;
        this.flinkConfig = checkNotNull(flinkConf);
        // 验证
        validate();
    

进一步看下validate的逻辑:

private void validate() 
        if (!StringUtils.isBlank(keytab)) 
            // principal is required
            if (StringUtils.isBlank(principal)) 
                throw new IllegalConfigurationException(
                        "Kerberos login configuration is invalid: keytab requires a principal.");
            

            // check the keytab is readable
            File keytabFile = new File(keytab);
            if (!keytabFile.exists() || !keytabFile.isFile()) 
                throw new IllegalConfigurationException(
                        "Kerberos login configuration is invalid: keytab ["
                                + keytab
                                + "] doesn't exist!");
             else if (!keytabFile.canRead()) 
                throw new IllegalConfigurationException(
                        "Kerberos login configuration is invalid: keytab ["
                                + keytab
                                + "] is unreadable!");
            
        
    

如果全局配置(flink-conf.yaml)里配置了security.kerberos.login.keytab这个参数。那么要校验这个配置所指定的目录存在以及可读。这里其实有必要对kerberos的安全认证相关知识了解下。

1.6.2、SecurityUtils 的 install 逻辑

    /**
     * Installs a process-wide security configuration.
     *
     * <p>Applies the configuration using the available security modules (i.e. Hadoop, JAAS).
     */
    public static void install(SecurityConfiguration config) throws Exception 
        // Install the security modules first before installing the security context
        installModules(config);
        installContext(config);
    

1.6.2.1、installModules

这里安装的安全模板主要包括了Java认证与授权服务(JAAS),Hadoop用户组信息(UGI)和Zookeeper的全过程安全设置。

    static void installModules(SecurityConfiguration config) throws Exception 

        // install the security module factories
        List<SecurityModule> modules = new ArrayList<>();
       // 遍历 从 security.context.factory.classes 配置的 class string 
        for (String moduleFactoryClass : config.getSecurityModuleFactories()) 
            SecurityModuleFactory moduleFactory = null;
            try 
				// 通过 SecurityFactoryServiceLoader 查找 SecurityModuleFactory
                moduleFactory = SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
             catch (NoMatchSecurityFactoryException ne) 
                LOG.error("Unable to instantiate security module factory ", moduleFactoryClass);
                throw new IllegalArgumentException("Unable to find module factory class", ne);
            
            SecurityModule module = moduleFactory.createModule(config);
            // can be null if a SecurityModule is not supported in the current environment
            // 安装 安全模块
            if (module != null) 
                module.install();
                modules.add(module);
            
        
        installedModules = modules;
    

findModuleFactory 实现


    /** Find a suitable @link SecurityModuleFactory based on canonical name. */
    public static SecurityModuleFactory findModuleFactory(String securityModuleFactoryClass)
            throws NoMatchSecurityFactoryException 
        return findFactoryInternal(
                securityModuleFactoryClass,
                SecurityModuleFactory.class,
                SecurityModuleFactory.class.getClassLoader());
    

继续, 通过 ServiceLoader 构建 服务
ServiceLoader是jdk6里面引进的一个特性。它用来实现SPI(Service Provider Interface),一种服务发现机制,很多框架用它来做来做服务的扩展发现。

    private static <T> T findFactoryInternal(
            String factoryClassCanonicalName, Class<T> factoryClass, ClassLoader classLoader)
            throws NoMatchSecurityFactoryException 

        Preconditions.checkNotNull(factoryClassCanonicalName);

        ServiceLoader<T> serviceLoader;
        
        // 通过 ServiceLoader 构建 服务
        if (classLoader != null) 
            serviceLoader = ServiceLoader.load(factoryClass, classLoader);
         else 
            serviceLoader = ServiceLoader.load(factoryClass);
        

        List<T> matchingFactories = new ArrayList<>();
        Iterator<T> classFactoryIterator = serviceLoader.iterator();
        // 找到 安全模块
        classFactoryIterator.forEachRemaining(
                classFactory -> 
                    if (factoryClassCanonicalName.matches(
                            classFactory.getClass().getCanonicalName())) 
                        matchingFactories.add(classFactory);
                    
                );

        if (matchingFactories.size() != 1) 
            throw new NoMatchSecurityFactoryException(
                    "zero or more than one security factory found",
                    factoryClassCanonicalName,
                    matchingFactories);
        
        return matchingFactories.get(0);
    

1.6.2.2、installContext

流程与 installModules 类似,通过 SecurityFactoryServiceLoader 找到 SecurityContextFactory

 static void installContext(SecurityConfiguration config) throws Exception 
        // install the security context factory
        for (String contextFactoryClass : config.getSecurityContextFactories()) 
            try 
                SecurityContextFactory contextFactory =
                        SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
                if (contextFactory.isCompatibleWith(config)) 
                    try 
                        installedContext = contextFactory.createContext(config);
                        // install the first context that's compatible and ignore the remaining.
                        break;
                     catch (SecurityContextInitializeException e) 
                        LOG.error(
                                "Cannot instantiate security context with: " + contextFactoryClass,
                                e);
                     catch (LinkageError le) 
                        LOG.error(
                                "Error occur when instantiate security context with: "
                                        + contextFactoryClass,
                                le);
                    
                 else 
                    LOG.debug("Unable to install security context factory ", contextFactoryClass);
                
             catch (NoMatchSecurityFactoryException ne) 
                LOG.warn("Unable to instantiate security context factory ", contextFactoryClass);
            
        
        if (installedContext == null) 
            LOG.error("Unable to install a valid security context factory!");
            throw new Exception("Unable to install a valid security context factory!");
        
    

返回Flink1.15源码解析-总目录

以上是关于Flink1.15源码解析--安全模块及安全上下文的主要内容,如果未能解决你的问题,请参考以下文章

Flink1.15源码解析

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----ResourceManager启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager----Dispatcher启动

Flink1.15源码解析--启动JobManager