FlinkFlink 源码之 安全认证
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink 源码之 安全认证相关的知识,希望对你有一定的参考价值。
1.概述
这篇文章转载:Flink 源码之 安全认证 非常巧合的是,我上周用了2-4天才完成flink安全认证,还是没有搜过,任何安全相关的代码,完成了。结果今天就看到了。
本篇分析下Flink安全认证部分的处理方式。主要为Kerberos认证相关内容。下面从配置项开始分析。
2.SecurityConfiguration
此类包含了Flink安全认证相关的配置项。它们的含义如下:
-
zookeeper.sasl.disable
:是否启用Zookeeper SASL。 -
security.kerberos.login.keytab
:Kerberos认证keytab文件的路径。 -
security.kerberos.login.principal
:Kerberos认证principal。 -
security.kerberos.login.use-ticket-cache
:Kerberos认证是否使用票据缓存。 -
security.kerberos.login.contexts
:Kerberos登录上下文名称,等效于JAAS文件的entry name。 -
zookeeper.sasl.service-name
:Zookeeper SASL服务名。默认为zookeeper。 -
zookeeper.sasl.login-context-name
:Zookeeper SASL登陆上下文名称。默认为Client。 -
security.context.factory.classes
:包含哪些 SecurityContextFactory。默认值为:HadoopSecurityContextFactory
或者NoOpSecurityContextFactory
-
security.module.factory.classes
:包含哪些SecurityModuleFactory
。 默认值为:HadoopModuleFactory,JaasModuleFactory,ZookeeperModuleFactory
3.SecurityUtils
SecurityUtils.install
方法是提交Flink任务安全认证的入口方法,用于安装安全配置。它的代码如下所示:
public static void install(SecurityConfiguration config) throws Exception
// Install the security modules first before installing the security context
// 安装安全模块
installModules(config);
// 安装安全上下文
installContext(config);
installModules方法用于安装安全认证模块。安全认证模块的内容在后面分析。
static void installModules(SecurityConfiguration config) throws Exception
// install the security module factories
List<SecurityModule> modules = new ArrayList<>();
// 遍历所有SecurityModuleFactory的配置
for (String moduleFactoryClass : config.getSecurityModuleFactories())
SecurityModuleFactory moduleFactory = null;
try
// 使用ServiceLoader加载ModuleFactory
moduleFactory = SecurityFactoryServiceLoader.findModuleFactory(moduleFactoryClass);
catch (NoMatchSecurityFactoryException ne)
LOG.error("Unable to instantiate security module factory ", moduleFactoryClass);
throw new IllegalArgumentException("Unable to find module factory class", ne);
// 使用factory创建出SecurityModule
SecurityModule module = moduleFactory.createModule(config);
// can be null if a SecurityModule is not supported in the current environment
if (module != null)
// 安装module
// 添加module到modules集合
module.install();
modules.add(module);
installedModules = modules;
installContext方法用于安装安全上下文环境,它的用途同样在后面章节介绍。
static void installContext(SecurityConfiguration config) throws Exception
// install the security context factory
// 遍历SecurityContextFactories
// 配置项名称为security.context.factory.classes
for (String contextFactoryClass : config.getSecurityContextFactories())
try
// 使用ServiceLoader,加载SecurityContextFactory
SecurityContextFactory contextFactory =
SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
// 检查SecurityContextFactory是否和配置文件兼容(1)
if (contextFactory.isCompatibleWith(config))
try
// 创建出第一个兼容的SecurityContext
installedContext = contextFactory.createContext(config);
// install the first context that's compatible and ignore the remaining.
break;
catch (SecurityContextInitializeException e)
LOG.error(
"Cannot instantiate security context with: " + contextFactoryClass,
e);
catch (LinkageError le)
LOG.error(
"Error occur when instantiate security context with: "
+ contextFactoryClass,
le);
else
LOG.debug("Unable to install security context factory ", contextFactoryClass);
catch (NoMatchSecurityFactoryException ne)
LOG.warn("Unable to instantiate security context factory ", contextFactoryClass);
if (installedContext == null)
LOG.error("Unable to install a valid security context factory!");
throw new Exception("Unable to install a valid security context factory!");
数字标注内容解析:
这里分析下isCompatibleWith方法逻辑,SecurityContextFactory
具有HadoopSecurityContextFactory
和NoOpSecurityContextFactory
两个实现类。其中HadoopSecurityContextFactory
要求security.module.factory.classes配置项包含org.apache.flink.runtime.security.modules.HadoopModuleFactory
,并且要求org.apache.hadoop.security.UserGroupInformation
在classpath中。NoOpSecurityContextFactory
无任何要求。
4.SecurityModule
SecurityModule分别为不同类型服务提供安全认证功能,包含3个子类:
HadoopModule
:使用UserGroupInformation
方式认证。JaasModule
:负责安装JAAS配置,在进程范围内生效。ZookeeperModule
:提供Zookeeper
安全配置。
4.1 HadoopModule
HadoopModule包含了Flink的SecurityConfiguration和Hadoop的配置信息(从Hadoop配置文件读取,读取逻辑在HadoopUtils.getHadoopConfiguration
,后面分析)。
4.1.1 install方法
install方法使用Hadoop提供的UserGroupInformation进行认证操作。
@Override
public void install() throws SecurityInstallException
// UGI设置hadoop conf
UserGroupInformation.setConfiguration(hadoopConfiguration);
UserGroupInformation loginUser;
try
// 如果Hadoop启用了安全配置
if (UserGroupInformation.isSecurityEnabled()
&& !StringUtils.isBlank(securityConfig.getKeytab())
&& !StringUtils.isBlank(securityConfig.getPrincipal()))
// 获取keytab路径
String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
// 使用UGI认证Flink conf中配置的keytab和principal
UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
// 获取认证的用户
loginUser = UserGroupInformation.getLoginUser();
// supplement with any available tokens
// 从HADOOP_TOKEN_FILE_LOCATION读取token缓存文件
String fileLocation =
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
// 如果有本地token缓存
if (fileLocation != null)
Credentials credentialsFromTokenStorageFile =
Credentials.readTokenStorageFile(
new File(fileLocation), hadoopConfiguration);
// if UGI uses Kerberos keytabs for login, do not load HDFS delegation token
// since
// the UGI would prefer the delegation token instead, which eventually expires
// and does not fallback to using Kerberos tickets
// 如果UGI使用keytab方式登录,不用加载HDFS的delegation token
// 因为UGI倾向于使用delegation token,这些token最终会失效,不会使用kerberos票据
Credentials credentialsToBeAdded = new Credentials();
final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
Collection<Token<? extends TokenIdentifier>> usrTok =
credentialsFromTokenStorageFile.getAllTokens();
// If UGI use keytab for login, do not load HDFS delegation token.
// 遍历token存储文件中的token
// 将所有的非delegation token添加到凭据中
for (Token<? extends TokenIdentifier> token : usrTok)
if (!token.getKind().equals(hdfsDelegationTokenKind))
final Text id = new Text(token.getIdentifier());
credentialsToBeAdded.addToken(id, token);
// 为loginUser添加凭据
loginUser.addCredentials(credentialsToBeAdded);
else
// 如果没有启动安全配置
// 从当前用户凭据认证
// login with current user credentials (e.g. ticket cache, OS login)
// note that the stored tokens are read automatically
try
// 反射调用如下方法
// Use reflection API to get the login user object
// UserGroupInformation.loginUserFromSubject(null);
Method loginUserFromSubjectMethod =
UserGroupInformation.class.getMethod(
"loginUserFromSubject", Subject.class);
loginUserFromSubjectMethod.invoke(null, (Subject) null);
catch (NoSuchMethodException e)
LOG.warn("Could not find method implementations in the shaded jar.", e);
catch (InvocationTargetException e)
throw e.getTargetException();
// 获取当前登录用户
loginUser = UserGroupInformation.getLoginUser();
LOG.info("Hadoop user set to ", loginUser);
if (HadoopUtils.isKerberosSecurityEnabled(loginUser))
boolean isCredentialsConfigured =
HadoopUtils.areKerberosCredentialsValid(
loginUser, securityConfig.useTicketCache());
LOG.info(
"Kerberos security is enabled and credentials are .",
isCredentialsConfigured ? "valid" : "invalid");
catch (Throwable ex)
throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
4.1.2 getHadoopConfiguration
这个方法为读取Hadoop配置文件的逻辑,较为复杂,接下来详细分析下。
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration)
// Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
// from the classpath
// 创建个空的conf
Configuration result = new HdfsConfiguration();
// 标记是否找到hadoop配置文件
boolean foundHadoopConfiguration = false;
// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
// the hdfs configuration.
// The properties of a newly added resource will override the ones in previous resources, so
// a configuration
// file with higher priority should be added later.
// Approach 1: HADOOP_HOME environment variables
// 保存两个可能的hadoop conf路径
String[] possibleHadoopConfPaths = new String[2];
// 获取HADOOP_HOME环境变量的值
final String hadoopHome = System.getenv("HADOOP_HOME");
if (hadoopHome != null)
LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: ", hadoopHome);
// 如果发现HADOOP_HOME环境变量的值
// 尝试分别从如下路径获取:
// $HADOOP_HOME/conf
// $HADOOP_HOME/etc/hadoop
possibleHadoopConfPaths[0] = hadoopHome + "/conf";
possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
for (String possibleHadoopConfPath : possibleHadoopConfPaths)
if (possibleHadoopConfPath != null)
// 依次尝试读取possibleHadoopConfPath下的core-site.xml文件和hdfs-site.xml文件到hadoop conf中
foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
// Approach 2: Flink configuration (deprecated)
// 获取Flink配置项 fs.hdfs.hdfsdefault 对应的配置文件,加入hadoop conf
final String hdfsDefaultPath =
flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
if (hdfsDefaultPath != null)
result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
LOG.debug(
"Using hdfs-default configuration-file path from Flink config: ",
hdfsDefaultPath);
foundHadoopConfiguration = true;
// 获取Flink配置项 fs.hdfs.hadoopconf 对应的配置文件,加入hadoop conf
final String hdfsSitePath =
flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null)
result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
LOG.debug(
"Using hdfs-site configuration-file path from Flink config: ", hdfsSitePath);
foundHadoopConfiguration = true;
// 获取Flink配置项 fs.hdfs.hadoopconf 对应的配置文件,加入hadoop conf
final String hadoopConfigPath =
flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
if (hadoopConfigPath != null)
LOG.debug("Searching Hadoop configuration files in Flink config: ", hadoopConfigPath);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
// Approach 3: HADOOP_CONF_DIR environment variable
// 从系统环境变量HADOOP_CONF_DIR目录中读取hadoop配置文件
String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
if (hadoopConfDir != null)
LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: ", hadoopConfDir);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
// Approach 4: Flink configuration
// add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
// 读取Flink配置文件中所有以flink.hadoop.为前缀的key
// 将这些key截掉这个前缀作为新的key,和原先的value一起作为hadoop conf的配置项,存放入hadoop conf
for (String key : flinkConfiguration.keySet())
for (String prefix : FLINK_CONFIG_PREFIXES)
if (key.startsWith(prefix))
String newKey = key.substring(prefix.length());
String value = flinkConfiguration.getString(key, null);
result.set(newKey, value);
LOG.debug(
"Adding Flink config entry for as = to Hadoop config",
key,
newKey,
value);
foundHadoopConfiguration = true;
// 如果以上途径均未找到hadoop conf,显示告警信息
if (!foundHadoopConfiguration)
LOG.warn(
"Could not find Hadoop configuration via any of the supported methods "
+ "(Flink configuration, environment variables).");
return result;
我们总结下Flink读取Hadoop配置文件的完整逻辑,从上到下为读取顺序:
-
读取
HADOOP_HOME
环境变量,如果存在,分别从它的conf和etc/hadoop
目录下读取core-site.xml和hdfs-site.xml
文件。 -
从Flink配置文件的
fs.hdfs.hdfsdefault
配置项所在目录下寻找。 -
从Flink配置文件的
fs.hdfs.hadoopconf
配置项所在目录下寻找。 -
从
HADOOP_CONF_DIR
环境变量对应的目录下寻找。 -
读取Flink配置文件中所有以flink.hadoop.为前缀的key,将这些key截掉这个前缀作为新的key,和原先的value一起作为hadoop conf的配置项,存放入hadoop conf。
4.2 .JaasModule
install方法读取了java.security.auth.login.config
系统变量对应的jaas配置,并且将Flink配置文件中相关配置转换为JAAS中的entry,合并到系统变量对应的jaas配置中并设置给JVM。代码如下所示:
@Override
public void install()
// ensure that a config file is always defined, for compatibility with
// ZK and Kafka which check for the system property and existence of the file
// 读取java.security.auth.login.config系统变量值,用于在卸载module的时候恢复
priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
// 如果没有配置
if (priorConfigFile == null)
// Flink的 io.tmp.dirs配置项第一个目录为workingDir
// 将默认的flink-jaas.conf文件写入这个位置,创建临时文件,名为jass-xxx.conf
// 在JVM进程关闭的时候删除这个临时文件
File configFile = generateDefaultConfigFile(workingDir);
// 配置java.security.auth.login.config系统变量值
// 保证这个系统变量的值始终存在,这是为了兼容Zookeeper和Kafka
// 他们会去检查这个jaas文件是否存在
System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
LOG.info("Jaas file will be created as .", configFile);
// read the JAAS configuration file
// 读取已安装的jaas配置文件
priorConfig = javax.security.auth.login.Configuration.getConfiguration();
// construct a dynamic JAAS configuration
// 包装为DynamicConfiguration,这个配置是可以修改的
currentConfig = new DynamicConfiguration(priorConfig);
// wire up the configured JAAS login contexts to use the krb5 entries
// 从Flink配置文件中读取kerberos配置
// AppConfigurationEntry为Java读取Jaas配置文件中一段配置项的封装
// 一段配置项指的是大括号之内的配置
AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
if (krb5Entries != null)
// 遍历Flink配置项security.kerberos.login.contexts,作为entry name使用
for (String app : securityConfig.getLoginContextNames())
// 将krb5Entries对应的AppConfigurationEntry添加入currrentConfig
// 使用security.kerberos.login.contexts对应的entry name
currentConfig.addAppConfigurationEntry(app, krb5Entries);
// 设置新的currentConfig
javax.security.auth.login.Configuration.setConfiguration(currentConfig);
上面代码中getAppConfigurationEntries
方法逻辑较为复杂,下面给出它的解析。
getAppConfigurationEntries方法从Flink的securityConfig
中读取配置,转换为JAAS entry的格式,存入AppConfigurationEntry
。如果Flink配置了security.kerberos.login.use-ticket-cache
,加载类似如下内容的文件,生成一个AppConfigurationEntry
叫做userKerberosAce
:
EntryName
com.sun.security.auth.module.Krb5LoginModule optional
doNotPrompt=true
useTicketCache=true
renewTGT=true;
;
如果Flink中配置了security.kerberos.login.keytab
,会加载如下配置,生成一个AppConfigurationEntry叫做keytabKerberosAce
:
EntryName
com.sun.security.auth.module.Krb5LoginModule required
keyTab=keytab路径
doNotProm以上是关于FlinkFlink 源码之 安全认证的主要内容,如果未能解决你的问题,请参考以下文章
FlinkFlink 源码之Buffer Debloating