FlinkFlink跨集群访问开启Kerberos认证的Kafka
Posted 九师兄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink跨集群访问开启Kerberos认证的Kafka相关的知识,希望对你有一定的参考价值。
1.概述
转载:Flink跨集群访问开启Kerberos认证的Kafka
Flink提供了三个模块来对集群进行安全验证,分别是HadoopModule、JaasModule、ZooKeeperModule。安全认证相关参数对应的类SecurityOptions。
HadoopModule用来对使用UserGroupInformation进行身份验证的框架(kudu、hbase同步框架、hdfs等)进行认证配置。
JaasModule用来对使用JaasConfig进行身份验证的框架(kafka、zk、hbase异步框架等)进行认证配置。
ZooKeeperModule负责安装整个进程的ZooKeeper安全配置。
Flink组件在启动时,会先加载认证相关模块,在构建的安全上下文中,启动集群各个组件。不过Flink整个集群只能使用一份证书进行相关验证,也就是说,如果Flink任务从开启Kerberos认证的Kafka中读取数据,并写入Kudu,则使用的principal和keytab,具有同时访问hdfs、kafka、kudu的权限。如果使用不同的证书,则需要在Flink任务中单独进行Kerberos相关配置。
2.启动流程
以JM启动为例,查看加载安全认证相关组件,记录各个模块调用链。
加载安全认证上下文,从上下文中启动集群各个组件。
ClusterEntrypoint#startCluster
//
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable<Void>) () ->
runCluster(configuration);
return null;
);
SecurityContext installSecurityContext(Configuration configuration))
SecurityUtils.install(new SecurityConfiguration(configuration));
return SecurityUtils.getInstalledContext();
SecurityConfiguration默认提供了两个security.context.factory.classes用来构建SecurityContext:
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory:根据构建的UserGroupInformation在doAs方法中启动集群。
org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory:默认在不需要进行安全认证的上下文中启动。
提供三个security.module.factory.classes用来准备安全认证使用的配置:
org.apache.flink.runtime.security.modules.HadoopModuleFactory
org.apache.flink.runtime.security.modules.JaasModuleFactory
org.apache.flink.runtime.security.modules.ZookeeperModuleFactory
主要还是installModules配置认证使用的配置
SecurityUtils#install
public static void install(SecurityConfiguration config) throws Exception
// Install the security modules first before installing the security context
installModules(config);
installContext(config);
#installModules使用SPI动态创建moduleFactory,分别调用其install方法
static void installModules(SecurityConfiguration config)
List<SecurityModule> modules = new ArrayList<>();
for (String moduleFactoryClass : config.getSecurityModuleFactories())
SecurityModuleFactory moduleFactory = null;
SecurityModule module = moduleFactory.createModule(config);
if (module != null)
module.install();
modules.add(module);
installedModules = modules;
HadoopModule#install。hdfs进行kerberos认证需要UserGroupInformation作为loginUser,该模块用来构建全局的loginUser,如果其他组件能够使用该loginUser进行认证,则不需要单独配置证书。
public void install() throws SecurityInstallException
## 传递hadoop相关参数
UserGroupInformation.setConfiguration(hadoopConfiguration);
UserGroupInformation loginUser;
try
## 开启kerberos认证并传递Keytab和Principal
if (UserGroupInformation.isSecurityEnabled() &&
!StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal()))
String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
## 当前登陆用户
loginUser = UserGroupInformation.getLoginUser();
// token cache
// supplement with any available tokens
String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null)
// Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
// used in the context of reading the stored tokens from UGI.
// Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
// loginUser.addCredentials(cred);
try
Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
File.class, org.apache.hadoop.conf.Configuration.class);
Credentials cred =
(Credentials) readTokenStorageFileMethod.invoke(
null,
new File(fileLocation),
hadoopConfiguration);
// if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
// the UGI would prefer the delegation token instead, which eventually expires
// and does not fallback to using Kerberos tickets
Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
Credentials credentials = new Credentials();
final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
Collection<Token<? extends TokenIdentifier>> usrTok = (Collection<Token<? extends TokenIdentifier>>) getAllTokensMethod.invoke(cred);
//If UGI use keytab for login, do not load HDFS delegation token.
for (Token<? extends TokenIdentifier> token : usrTok)
if (!token.getKind().equals(hdfsDelegationTokenKind))
final Text id = new Text(token.getIdentifier());
credentials.addToken(id, token);
Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
Credentials.class);
addCredentialsMethod.invoke(loginUser, credentials);
catch (NoSuchMethodException e)
LOG.warn("Could not find method implementations in the shaded jar.", e);
catch (InvocationTargetException e)
throw e.getTargetException();
else
// login with current user credentials (e.g. ticket cache, OS login)
// note that the stored tokens are read automatically
try
//Use reflection API to get the login user object
//UserGroupInformation.loginUserFromSubject(null);
Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
loginUserFromSubjectMethod.invoke(null, (Subject) null);
catch (NoSuchMethodException e)
LOG.warn("Could not find method implementations in the shaded jar.", e);
catch (InvocationTargetException e)
throw e.getTargetException();
loginUser = UserGroupInformation.getLoginUser();
boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
loginUser, securityConfig.useTicketCache());
LOG.info("Hadoop user set to , credentials check status: ", loginUser, isCredentialsConfigured);
catch (Throwable ex)
throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
JaasModule#install。准备jaas文件使用的各个属性,先传递给ConfigFile,当各个框架使用jaas文件进行验证时,从javax.security.auth.login.Configuration中提取。
Kafka kerberos认证使用的jaas文件:
KafkaClient
com.sun.security.auth.module.Krb5LoginModule required
keyTab="/Users/xx/kafka.keytab"
principal="kafka/cdh002@TEST.COM"
useKeyTab=true
useTicketCache=true;
;
jaas文件使用的参数,写入javax.security.auth.login.Configuration。各个框架使用时会从改配置中取。
public void install()
priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
## 创建一个空的jaas文件,和环境变量java.security.auth.login.config绑定
if (priorConfigFile == null)
File configFile = generateDefaultConfigFile(workingDir);
System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
LOG.info("Jaas file will be created as .", configFile);
// read the JAAS configuration file, 创建ConfigFile
priorConfig = javax.security.auth.login.Configuration.getConfiguration();
// construct a dynamic JAAS configuration
currentConfig = new DynamicConfiguration(priorConfig);
// wire up the configured JAAS login contexts to use the krb5 entries
AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
if (krb5Entries != null)
for (String app : securityConfig.getLoginContextNames())
currentConfig.addAppConfigurationEntry(app, krb5Entries); // kafkaClient
## 写入javax.security.auth.login.Configuration
javax.security.auth.login.Configuration.setConfiguration(currentConfig);
ZooKeeperModule#install
public void install() throws SecurityInstallException
priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
System.setProperty(ZK_ENABLE_CLIENT_SASL, String.valueOf(!securityConfig.isZkSaslDisable()));
priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
if (!"zookeeper".equals(securityConfig.getZooKeeperServiceName()))
System.setProperty(ZK_SASL_CLIENT_USERNAME, securityConfig.getZooKeeperServiceName());
priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
if (!"Client".equals(securityConfig.getZooKeeperLoginContextName()))
System.setProperty(ZK_LOGIN_CONTEXT_NAME, securityConfig.getZooKeeperLoginContextName());
加载完各个Module后,构建installContext。只会有一个installedContext。
static void installContext(SecurityConfiguration config) throws Exception
// install the security context factory
for (String contextFactoryClass : config.getSecurityContextFactories())
try
// spi加载
SecurityContextFactory contextFactory = SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
// 有hadoop环境就走Hadoop 认证
if (contextFactory.isCompatibleWith(config))
try
installedContext = contextFactory.createContext(config);
// install the first context that's compatible and ignore the remaining. 只加载一个
break;
catch (SecurityContextInitializeException e)
LOG.error("Cannot instantiate security context with: " + contextFactoryClass, e);
else
LOG.warn("Unable to install incompatible security context factory ", contextFactoryClass);
catch (NoMatchSecurityFactoryException ne)
LOG.warn("Unable to instantiate security context factory ", contextFactoryClass);
if (installedContext == null)
LOG.error("Unable to install a valid security context factory!");
throw new Exception("Unable to install a valid security context factory!");
3.Flink kafka connector安全认证
Flink读取开启Kerberos认证的kafka时,需要进行如下配置。并未传递java.security.auth.login.config以及sasl.jaas.config配置。
kafka中添加的参数:
1. security.protocol='SASL_PLAINTEXT' //使用SASL认证协议
2. sasl.mechanism = 'GSSAPI' // 使用kerberos认证
3. sasl.kerberos.service.name = 'kafka' // 服务名称
flinkconf.yaml中参数:
1. security.kerberos.login.use-ticket-cache:true
2. security.kerberos.login.keytab: xxxx
3. security.kerberos.login.principal:xxxxx
4. security.kerberos.login.contexts: KafkaClient,Client
如果配置sasl.jaas.config,则格式为:
String config = "com.sun.security.auth.module.Krb5LoginModule required\\n" +
"\\tprincipal=\\"kafka/xxxxx@EXAMPLE.COM\\"\\n" +
"\\tkeyTab=\\"/Users/xxxx/kafka.keytab\\"\\n" +
"\\tuseKeyTab=true\\n" +
"\\tuseTicketCache=true;";
Flink kafka connector 使用的jaas配置流程。
FlinkKafkaConsumerBase#open
|
this.partitionDiscoverer.open();
|
KafkaPartitionDiscoverer#initializeConnections
|
KafkaConsumer#KafkaConsumer
|
ClientUtils.createChannelBuilder(config);
|
ChannelBuilders#clientChannelBuilder
|
ChannelBuilders#create
|
SaslChannelBuilder(0.10版本)#configure
|
JaasUtils#jaasConfig
public static Configuration jaasConfig(LoginType loginType, Map<String, ?> configs)
Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
# 绑定了sasl.jaas.config参数,则从val中获取
if (jaasConfigArgs != null)
if (loginType == LoginType.SERVER)
throw new IllegalArgumentException("JAAS config property not supported for server");
else
JaasConfig jaasConfig = new JaasConfig(loginType, jaasConfigArgs.value());
AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(LoginType.CLIENT.contextName());
int numModules = clientModules == null ? 0 : clientModules.length;
if (numModules != 1)
throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be one module");
return jaasConfig;
else
return defaultJaasConfig(loginType);
private static Configuration defaultJaasConfig(LoginType loginType)
# 从Flink jaasModule加载时,已经绑定java.security.auth.login.config
String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
if (jaasConfigFile == null)
LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' and Kafka SASL property '" +
SaslConfigs.SASL_JAAS_CONFIG + "' are not set, using default JAAS configuration.");
# 拿到javax.security.auth.login.Configuration中的配置,jaas中使用的认证实体
Configuration jaasConfig = Configuration.getConfiguration();
# KafkaClient
String loginContextName = loginType.contextName();
AppConfigurationEntry[] configEntries = jaasConfig.getAppConfigurationEntry(loginContextName);
if (configEntries == null)
String errorMessage;
errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
throw new IllegalArgumentException(errorMessage);
return jaasConfig;
4.跨集群访问开启Kerberos认证的Kafka
目的,Flink任务运行在开启Kerberos认证的A集群,同时读取A集群以及B集群的Kafka信息。
步骤:
上传新集群的Krb5.conf文件,追加到原有的Krb5文件中。
单独配置sasl.jaas.config参数。
特别注意:
新上传的Krb5.conf文件中,domain_realm一定要做realms的映射,否则会使用default_realm。
[realms]
PLS.COM =
kdc = plscdh00:88
admin_server = plscdh00
将plscdh0-3 映射到PLS.COM
[domain_realm]
.pls.com = PLS.COM
pls.com = PLS.COM
plscdh01 = PLS.COM
plscdh02 = PLS.COM
plscdh03 = PLS.COM
plscdh00 = PLS.COM
将新上传的Krb5.conf文件中的domain_realm以及realms追加到现有的Krb5文件中。
通过使用sasl.jaas.config来传递Jaas文件内容,如果使用AppConfigurationEntry类传递的话,kafka默认LoginContextName为KafkaClient,多个kafka集群下取出的AppConfigurationEntry会混乱。
相关代码:
krb5文件构建代码:
public class KrbConfManager
Logger LOG = LoggerFactory.getLogger(KrbConfManager.class);
private static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
private KrbConfManager()
public void appendKrbConf(String krbConf)
this.appendKrbConf(krbConf, System.getProperty("user.dir"));
/**
* 将新增的kbr5.conf文件,使用Config类方法进行解析后,追加到现有的kbr5.conf文件中
* @param krbConf
*/
public void appendKrbConf(String krbConf, String directory)
String krbConfPath = DtFileUtils.getFileAbsolutePath(krbConf);
LOG.info("krb conf abs path is ", krbConfPath);
Preconditions.checkArgument(DtFileUtils.fileExistCheck(krbConfPath),"krb file does not exist");
try
Constructor<Config> constructor = Config.class.getDeclaredConstructor();
constructor.setAccessible(true);
Config configParser = constructor.newInstance();
Method loadConfigFile = configParser.getClass().getDeclaredMethod("loadConfigFile", String.class);
loadConfigFile.setAccessible(true);
List<String> configFileList = (List配置两个不同kerberos认证中心的集群间的互信
大数据问题排查系列-大数据集群开启 kerberos 认证后 HIVE 作业执行失败
Hadoop 发行版本 Hortonworks 安装详解 开启Kerberos集群安全验证
FlinkFlink kerberos 认证报错 while accessing user/group information via Hadoop utils