Flink1.15源码解析--安全模块及安全上下文
Posted 宝哥大数据[离职找工作中,大佬帮内推下]
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink1.15源码解析--安全模块及安全上下文相关的知识,希望对你有一定的参考价值。
文章目录
1.6、通过 SPI 加载安全配置模块
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
1.6.1、SecurityConfiguration 初始化
/**
* Create a security configuration from the global configuration.
* 从全局配置创建安全配置.
* @param flinkConf the Flink global configuration.
*/
public SecurityConfiguration(Configuration flinkConf)
this(
flinkConf,
flinkConf.get(SECURITY_CONTEXT_FACTORY_CLASSES),
flinkConf.get(SECURITY_MODULE_FACTORY_CLASSES));
// ------------------------------------------------------------------------
// Custom Security Service Loader
// ------------------------------------------------------------------------
public static final ConfigOption<List<String>> SECURITY_CONTEXT_FACTORY_CLASSES =
key("security.context.factory.classes")
.stringType()
.asList()
.defaultValues(
"org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory",
"org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory")
.withDescription(
"List of factories that should be used to instantiate a security context. "
+ "If multiple are configured, Flink will use the first compatible "
+ "factory. You should have a NoOpSecurityContextFactory in this list "
+ "as a fallback.");
public static final ConfigOption<List<String>> SECURITY_MODULE_FACTORY_CLASSES =
key("security.module.factory.classes")
.stringType()
.asList()
.defaultValues(
"org.apache.flink.runtime.security.modules.HadoopModuleFactory",
"org.apache.flink.runtime.security.modules.JaasModuleFactory",
"org.apache.flink.runtime.security.modules.ZookeeperModuleFactory")
.withDescription(
"List of factories that should be used to instantiate security "
+ "modules. All listed modules will be installed. Keep in mind that the "
+ "configured security context might rely on some modules being present.");
继续进入
/**
* Create a security configuration from the global configuration.
*
* @param flinkConf the Flink global configuration.
* @param securityModuleFactories the security modules to apply.
*/
public SecurityConfiguration(
Configuration flinkConf,
List<String> securityContextFactory,
List<String> securityModuleFactories)
// 1. 一些全局参数的配置
this.isZkSaslDisable = flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
this.keytab = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
this.principal = flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
this.useTicketCache = flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
this.loginContextNames =
parseList(flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_CONTEXTS));
this.zkServiceName = flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_SERVICE_NAME);
this.zkLoginContextName =
flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
this.securityModuleFactories = Collections.unmodifiableList(securityModuleFactories);
this.securityContextFactory = securityContextFactory;
this.flinkConfig = checkNotNull(flinkConf);
// 验证
validate();
进一步看下validate的逻辑:
private void validate()
if (!StringUtils.isBlank(keytab))
// principal is required
if (StringUtils.isBlank(principal))
throw new IllegalConfigurationException(
"Kerberos login configuration is invalid: keytab requires a principal.");
// check the keytab is readable
File keytabFile = new File(keytab);
if (!keytabFile.exists() || !keytabFile.isFile())
throw new IllegalConfigurationException(
"Kerberos login configuration is invalid: keytab ["
+ keytab
+ "] doesn't exist!");
else if (!keytabFile.canRead())
throw new IllegalConfigurationException(
"Kerberos login configuration is invalid: keytab ["
+ keytab
+ "] is unreadable!");
如果全局配置(flink-conf.yaml)里配置了security.kerberos.login.keytab这个参数。那么要校验这个配置所指定的目录存在以及可读。这里其实有必要对kerberos的安全认证相关知识了解下。
1.6.2、SecurityUtils 的 install 逻辑
/**
* Installs a process-wide security configuration.
*
* <p>Applies the configuration using the available security modules (i.e. Hadoop, JAAS).
*/
public static void install(SecurityConfiguration config) throws Exception
// Install the security modules first before installing the security context
installModules(config);
installContext(config);
1.6.2.1、installModules
这里安装的安全模板主要包括了Java认证与授权服务(JAAS),Hadoop用户组信息(UGI)和Zookeeper的全过程安全设置。
static void installModules(SecurityConfiguration config) throws Exception
// install the security module factories
List<SecurityModule> modules = new ArrayList<>();
// 遍历 从 security.context.factory.classes 配置的 class string
for (String moduleFactoryClass : config.getSecurityModuleFactories())
SecurityModuleFactory moduleFactory = null;
try
// 通过 SecurityFactoryServiceLoader 查找 SecurityModuleFactory
moduleFactory = SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
catch (NoMatchSecurityFactoryException ne)
LOG.error("Unable to instantiate security module factory ", moduleFactoryClass);
throw new IllegalArgumentException("Unable to find module factory class", ne);
SecurityModule module = moduleFactory.createModule(config);
// can be null if a SecurityModule is not supported in the current environment
// 安装 安全模块
if (module != null)
module.install();
modules.add(module);
installedModules = modules;
findModuleFactory 实现
/** Find a suitable @link SecurityModuleFactory based on canonical name. */
public static SecurityModuleFactory findModuleFactory(String securityModuleFactoryClass)
throws NoMatchSecurityFactoryException
return findFactoryInternal(
securityModuleFactoryClass,
SecurityModuleFactory.class,
SecurityModuleFactory.class.getClassLoader());
继续, 通过 ServiceLoader 构建 服务
ServiceLoader是jdk6里面引进的一个特性。它用来实现SPI(Service Provider Interface),一种服务发现机制,很多框架用它来做来做服务的扩展发现。
private static <T> T findFactoryInternal(
String factoryClassCanonicalName, Class<T> factoryClass, ClassLoader classLoader)
throws NoMatchSecurityFactoryException
Preconditions.checkNotNull(factoryClassCanonicalName);
ServiceLoader<T> serviceLoader;
// 通过 ServiceLoader 构建 服务
if (classLoader != null)
serviceLoader = ServiceLoader.load(factoryClass, classLoader);
else
serviceLoader = ServiceLoader.load(factoryClass);
List<T> matchingFactories = new ArrayList<>();
Iterator<T> classFactoryIterator = serviceLoader.iterator();
// 找到 安全模块
classFactoryIterator.forEachRemaining(
classFactory ->
if (factoryClassCanonicalName.matches(
classFactory.getClass().getCanonicalName()))
matchingFactories.add(classFactory);
);
if (matchingFactories.size() != 1)
throw new NoMatchSecurityFactoryException(
"zero or more than one security factory found",
factoryClassCanonicalName,
matchingFactories);
return matchingFactories.get(0);
1.6.2.2、installContext
流程与 installModules 类似,通过 SecurityFactoryServiceLoader 找到 SecurityContextFactory
static void installContext(SecurityConfiguration config) throws Exception
// install the security context factory
for (String contextFactoryClass : config.getSecurityContextFactories())
try
SecurityContextFactory contextFactory =
SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
if (contextFactory.isCompatibleWith(config))
try
installedContext = contextFactory.createContext(config);
// install the first context that's compatible and ignore the remaining.
break;
catch (SecurityContextInitializeException e)
LOG.error(
"Cannot instantiate security context with: " + contextFactoryClass,
e);
catch (LinkageError le)
LOG.error(
"Error occur when instantiate security context with: "
+ contextFactoryClass,
le);
else
LOG.debug("Unable to install security context factory ", contextFactoryClass);
catch (NoMatchSecurityFactoryException ne)
LOG.warn("Unable to instantiate security context factory ", contextFactoryClass);
if (installedContext == null)
LOG.error("Unable to install a valid security context factory!");
throw new Exception("Unable to install a valid security context factory!");
返回Flink1.15源码解析-总目录
以上是关于Flink1.15源码解析--安全模块及安全上下文的主要内容,如果未能解决你的问题,请参考以下文章
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----ResourceManager启动
Flink1.15源码解析--启动JobManager----Dispatcher启动