FlinkFlink跨集群访问开启Kerberos认证的Kafka

Posted 九师兄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FlinkFlink跨集群访问开启Kerberos认证的Kafka相关的知识,希望对你有一定的参考价值。

1.概述

转载:Flink跨集群访问开启Kerberos认证的Kafka

Flink提供了三个模块来对集群进行安全验证,分别是HadoopModule、JaasModule、ZooKeeperModule。安全认证相关参数对应的类SecurityOptions。

HadoopModule用来对使用UserGroupInformation进行身份验证的框架(kudu、hbase同步框架、hdfs等)进行认证配置。
JaasModule用来对使用JaasConfig进行身份验证的框架(kafka、zk、hbase异步框架等)进行认证配置。
ZooKeeperModule负责安装整个进程的ZooKeeper安全配置。

Flink组件在启动时,会先加载认证相关模块,在构建的安全上下文中,启动集群各个组件。不过Flink整个集群只能使用一份证书进行相关验证,也就是说,如果Flink任务从开启Kerberos认证的Kafka中读取数据,并写入Kudu,则使用的principal和keytab,具有同时访问hdfs、kafka、kudu的权限。如果使用不同的证书,则需要在Flink任务中单独进行Kerberos相关配置。

2.启动流程

以JM启动为例,查看加载安全认证相关组件,记录各个模块调用链。

加载安全认证上下文,从上下文中启动集群各个组件。

ClusterEntrypoint#startCluster
// 
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable<Void>) () -> 
    runCluster(configuration);
    return null;
);

SecurityContext installSecurityContext(Configuration configuration)
    SecurityUtils.install(new SecurityConfiguration(configuration));
    return SecurityUtils.getInstalledContext();

SecurityConfiguration默认提供了两个security.context.factory.classes用来构建SecurityContext:
org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory:根据构建的UserGroupInformation在doAs方法中启动集群。
org.apache.flink.runtime.security.contexts.NoOpSecurityContextFactory:默认在不需要进行安全认证的上下文中启动。
提供三个security.module.factory.classes用来准备安全认证使用的配置:

org.apache.flink.runtime.security.modules.HadoopModuleFactory
org.apache.flink.runtime.security.modules.JaasModuleFactory
org.apache.flink.runtime.security.modules.ZookeeperModuleFactory

主要还是installModules配置认证使用的配置

SecurityUtils#install
public static void install(SecurityConfiguration config) throws Exception 
   // Install the security modules first before installing the security context
   installModules(config);
   installContext(config);


#installModules使用SPI动态创建moduleFactory,分别调用其install方法
static void installModules(SecurityConfiguration config) 
    List<SecurityModule> modules = new ArrayList<>();
    for (String moduleFactoryClass : config.getSecurityModuleFactories()) 
        SecurityModuleFactory moduleFactory = null;
        SecurityModule module = moduleFactory.createModule(config);
            if (module != null) 
                module.install();
                modules.add(module);
            
        
        installedModules = modules;
    

HadoopModule#install。hdfs进行kerberos认证需要UserGroupInformation作为loginUser,该模块用来构建全局的loginUser,如果其他组件能够使用该loginUser进行认证,则不需要单独配置证书。

public void install() throws SecurityInstallException 
    ## 传递hadoop相关参数
        UserGroupInformation.setConfiguration(hadoopConfiguration);

    UserGroupInformation loginUser;

    try 
        ## 开启kerberos认证并传递KeytabPrincipal
            if (UserGroupInformation.isSecurityEnabled() &&
                !StringUtils.isBlank(securityConfig.getKeytab()) && !StringUtils.isBlank(securityConfig.getPrincipal())) 
                String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();

                UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
                ## 当前登陆用户
                    loginUser = UserGroupInformation.getLoginUser();
                //  token cache
                // supplement with any available tokens
                String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
                if (fileLocation != null) 
                    // Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are
                    // used in the context of reading the stored tokens from UGI.
                    // Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
                    // loginUser.addCredentials(cred);
                    try 
                        Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile",
                                                                                        File.class, org.apache.hadoop.conf.Configuration.class);
                        Credentials cred =
                            (Credentials) readTokenStorageFileMethod.invoke(
                            null,
                            new File(fileLocation),
                            hadoopConfiguration);

                        // if UGI uses Kerberos keytabs for login, do not load HDFS delegation token since
                        // the UGI would prefer the delegation token instead, which eventually expires
                        // and does not fallback to using Kerberos tickets
                        Method getAllTokensMethod = Credentials.class.getMethod("getAllTokens");
                        Credentials credentials = new Credentials();
                        final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
                        Collection<Token<? extends TokenIdentifier>> usrTok = (Collection<Token<? extends TokenIdentifier>>) getAllTokensMethod.invoke(cred);
                        //If UGI use keytab for login, do not load HDFS delegation token.
                        for (Token<? extends TokenIdentifier> token : usrTok) 
                            if (!token.getKind().equals(hdfsDelegationTokenKind)) 
                                final Text id = new Text(token.getIdentifier());
                                credentials.addToken(id, token);
                            
                        

                        Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials",
                                                                                           Credentials.class);
                        addCredentialsMethod.invoke(loginUser, credentials);
                     catch (NoSuchMethodException e) 
                        LOG.warn("Could not find method implementations in the shaded jar.", e);
                     catch (InvocationTargetException e) 
                        throw e.getTargetException();
                    
                
             else 
                // login with current user credentials (e.g. ticket cache, OS login)
                // note that the stored tokens are read automatically
                try 
                    //Use reflection API to get the login user object
                    //UserGroupInformation.loginUserFromSubject(null);
                    Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
                    loginUserFromSubjectMethod.invoke(null, (Subject) null);
                 catch (NoSuchMethodException e) 
                    LOG.warn("Could not find method implementations in the shaded jar.", e);
                 catch (InvocationTargetException e) 
                    throw e.getTargetException();
                

                loginUser = UserGroupInformation.getLoginUser();
            

        boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
            loginUser, securityConfig.useTicketCache());

        LOG.info("Hadoop user set to , credentials check status: ", loginUser, isCredentialsConfigured);

     catch (Throwable ex) 
        throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
    

JaasModule#install。准备jaas文件使用的各个属性,先传递给ConfigFile,当各个框架使用jaas文件进行验证时,从javax.security.auth.login.Configuration中提取。

Kafka kerberos认证使用的jaas文件:

 KafkaClient 
        com.sun.security.auth.module.Krb5LoginModule required
        keyTab="/Users/xx/kafka.keytab"
        principal="kafka/cdh002@TEST.COM"
        useKeyTab=true
        useTicketCache=true;
 ;

jaas文件使用的参数,写入javax.security.auth.login.Configuration。各个框架使用时会从改配置中取。

public void install() 
    priorConfigFile = System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, null);
    ## 创建一个空的jaas文件,和环境变量java.security.auth.login.config绑定
        if (priorConfigFile == null) 
            File configFile = generateDefaultConfigFile(workingDir);
            System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, configFile.getAbsolutePath());
            LOG.info("Jaas file will be created as .", configFile);
        
    // read the JAAS configuration file,    创建ConfigFile
    priorConfig = javax.security.auth.login.Configuration.getConfiguration();

    // construct a dynamic JAAS configuration
    currentConfig = new DynamicConfiguration(priorConfig);

    // wire up the configured JAAS login contexts to use the krb5 entries
    AppConfigurationEntry[] krb5Entries = getAppConfigurationEntries(securityConfig);
    if (krb5Entries != null) 
        for (String app : securityConfig.getLoginContextNames()) 
            currentConfig.addAppConfigurationEntry(app, krb5Entries);   // kafkaClient
        
    
    ## 写入javax.security.auth.login.Configuration
        javax.security.auth.login.Configuration.setConfiguration(currentConfig);


ZooKeeperModule#install

public void install() throws SecurityInstallException 

    priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, null);
    System.setProperty(ZK_ENABLE_CLIENT_SASL, String.valueOf(!securityConfig.isZkSaslDisable()));

    priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, null);
    if (!"zookeeper".equals(securityConfig.getZooKeeperServiceName())) 
        System.setProperty(ZK_SASL_CLIENT_USERNAME, securityConfig.getZooKeeperServiceName());
    

    priorLoginContextName = System.getProperty(ZK_LOGIN_CONTEXT_NAME, null);
    if (!"Client".equals(securityConfig.getZooKeeperLoginContextName())) 
        System.setProperty(ZK_LOGIN_CONTEXT_NAME, securityConfig.getZooKeeperLoginContextName());
    

加载完各个Module后,构建installContext。只会有一个installedContext。


    static void installContext(SecurityConfiguration config) throws Exception 
        // install the security context factory
        for (String contextFactoryClass : config.getSecurityContextFactories()) 
            try 
                // spi加载
                SecurityContextFactory contextFactory = SecurityFactoryServiceLoader.findContextFactory(contextFactoryClass);
                //  有hadoop环境就走Hadoop 认证
                if (contextFactory.isCompatibleWith(config))    
                    try 
                        installedContext = contextFactory.createContext(config);
                        // install the first context that's compatible and ignore the remaining.   只加载一个
                        break;
                     catch (SecurityContextInitializeException e) 
                        LOG.error("Cannot instantiate security context with: " + contextFactoryClass, e);
                    
                 else 
                    LOG.warn("Unable to install incompatible security context factory ", contextFactoryClass);
                
             catch (NoMatchSecurityFactoryException ne) 
                LOG.warn("Unable to instantiate security context factory ", contextFactoryClass);
            
        
        if (installedContext == null) 
            LOG.error("Unable to install a valid security context factory!");
            throw new Exception("Unable to install a valid security context factory!");
        
    

3.Flink kafka connector安全认证

Flink读取开启Kerberos认证的kafka时,需要进行如下配置。并未传递java.security.auth.login.config以及sasl.jaas.config配置。

kafka中添加的参数:
        1. security.protocol='SASL_PLAINTEXT'  //使用SASL认证协议
        2. sasl.mechanism = 'GSSAPI'           // 使用kerberos认证
        3. sasl.kerberos.service.name = 'kafka' // 服务名称
flinkconf.yaml中参数:
        1. security.kerberos.login.use-ticket-cache:true
        2. security.kerberos.login.keytab: xxxx
        3. security.kerberos.login.principal:xxxxx
        4. security.kerberos.login.contexts: KafkaClient,Client

如果配置sasl.jaas.config,则格式为:

String config = "com.sun.security.auth.module.Krb5LoginModule  required\\n" +
    "\\tprincipal=\\"kafka/xxxxx@EXAMPLE.COM\\"\\n" +
    "\\tkeyTab=\\"/Users/xxxx/kafka.keytab\\"\\n" +
    "\\tuseKeyTab=true\\n" +
    "\\tuseTicketCache=true;";

Flink kafka connector 使用的jaas配置流程。

FlinkKafkaConsumerBase#open
    |
    this.partitionDiscoverer.open();
    |
    KafkaPartitionDiscoverer#initializeConnections
    |
KafkaConsumer#KafkaConsumer
    |
    ClientUtils.createChannelBuilder(config);
    |
    ChannelBuilders#clientChannelBuilder
    |
    ChannelBuilders#create
    |
    SaslChannelBuilder(0.10版本)#configure
    |
    JaasUtils#jaasConfig
public static Configuration jaasConfig(LoginType loginType, Map<String, ?> configs) 
    Password jaasConfigArgs = (Password) configs.get(SaslConfigs.SASL_JAAS_CONFIG);
    # 绑定了sasl.jaas.config参数,则从val中获取
    if (jaasConfigArgs != null) 
        if (loginType == LoginType.SERVER)
            throw new IllegalArgumentException("JAAS config property not supported for server");
        else 
            JaasConfig jaasConfig = new JaasConfig(loginType, jaasConfigArgs.value());
            AppConfigurationEntry[] clientModules = jaasConfig.getAppConfigurationEntry(LoginType.CLIENT.contextName());
            int numModules = clientModules == null ? 0 : clientModules.length;
            if (numModules != 1)
                throw new IllegalArgumentException("JAAS config property contains " + numModules + " login modules, should be one module");
            return jaasConfig;
        
     else
     
        return defaultJaasConfig(loginType);


private static Configuration defaultJaasConfig(LoginType loginType) 
    # 从Flink jaasModule加载时,已经绑定java.security.auth.login.config
    String jaasConfigFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM);
    if (jaasConfigFile == null) 
        LOG.debug("System property '" + JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' and Kafka SASL property '" +
                  SaslConfigs.SASL_JAAS_CONFIG + "' are not set, using default JAAS configuration.");
    
    #  拿到javax.security.auth.login.Configuration中的配置,jaas中使用的认证实体
    Configuration jaasConfig = Configuration.getConfiguration();
    # KafkaClient
    String loginContextName = loginType.contextName();
    AppConfigurationEntry[] configEntries = jaasConfig.getAppConfigurationEntry(loginContextName);
    if (configEntries == null) 
        String errorMessage;
        errorMessage = "Could not find a '" + loginContextName + "' entry in the JAAS configuration. System property '" +
            JaasUtils.JAVA_LOGIN_CONFIG_PARAM + "' is " + (jaasConfigFile == null ? "not set" : jaasConfigFile);
        throw new IllegalArgumentException(errorMessage);
    
    return jaasConfig;

4.跨集群访问开启Kerberos认证的Kafka

目的,Flink任务运行在开启Kerberos认证的A集群,同时读取A集群以及B集群的Kafka信息。

步骤:

上传新集群的Krb5.conf文件,追加到原有的Krb5文件中。
单独配置sasl.jaas.config参数。

特别注意:

新上传的Krb5.conf文件中,domain_realm一定要做realms的映射,否则会使用default_realm。

[realms]
 PLS.COM = 
  kdc = plscdh00:88
  admin_server = plscdh00
 
将plscdh0-3 映射到PLS.COM
[domain_realm]
     .pls.com = PLS.COM
     pls.com = PLS.COM
     plscdh01  = PLS.COM  
     plscdh02 = PLS.COM
     plscdh03 = PLS.COM
     plscdh00 = PLS.COM

将新上传的Krb5.conf文件中的domain_realm以及realms追加到现有的Krb5文件中。
通过使用sasl.jaas.config来传递Jaas文件内容,如果使用AppConfigurationEntry类传递的话,kafka默认LoginContextName为KafkaClient,多个kafka集群下取出的AppConfigurationEntry会混乱。
相关代码:

krb5文件构建代码:

public class KrbConfManager 
    Logger LOG = LoggerFactory.getLogger(KrbConfManager.class);

    private static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";

    private KrbConfManager()


    public void appendKrbConf(String krbConf) 
        this.appendKrbConf(krbConf, System.getProperty("user.dir"));
    

    /**
     *  将新增的kbr5.conf文件,使用Config类方法进行解析后,追加到现有的kbr5.conf文件中
     * @param krbConf
     */
    public void appendKrbConf(String krbConf, String directory) 
        String krbConfPath = DtFileUtils.getFileAbsolutePath(krbConf);
        LOG.info("krb conf abs path is ", krbConfPath);
        Preconditions.checkArgument(DtFileUtils.fileExistCheck(krbConfPath),"krb file does not exist");
        try 

            Constructor<Config> constructor = Config.class.getDeclaredConstructor();
            constructor.setAccessible(true);
            Config configParser = constructor.newInstance();

            Method loadConfigFile = configParser.getClass().getDeclaredMethod("loadConfigFile", String.class);
            loadConfigFile.setAccessible(true);
            List<String> configFileList = (List配置两个不同kerberos认证中心的集群间的互信

storm 访问 kerberos kafka

大数据问题排查系列-大数据集群开启 kerberos 认证后 HIVE 作业执行失败

Hadoop 发行版本 Hortonworks 安装详解 开启Kerberos集群安全验证

Phoenix连接安全模式下的HBase集群

FlinkFlink kerberos 认证报错 while accessing user/group information via Hadoop utils