storm 访问 kerberos kafka

Posted 数客联盟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm 访问 kerberos kafka相关的知识,希望对你有一定的参考价值。

本文整理下hdp中开启了kerberos后在storm中访问的kafka所遇到的问题。
测试使用的环境为: hdp2.6.0.3 storm 1.1.0, kafka 0.10.1 ,前置条件是开启了kerberos

1 环境准备

集群开启kerberos后,创建kafka topic,注意需要kinit 所需用户的keytab 比如:

1klist -k /etc/security/keytabs/storm.headless.keytab ocdp-clusteraa@ASIAINFO.COM
2./kafka-topics.sh --create --topic inputTopicStorm  --zookeeper host-10-1-236-128:2181 --partitions 3 --replication-factor 1
3./kafka-topics.sh --create --topic outputTopicStorm  --zookeeper host-10-1-236-128:2181 --partitions 3 --replication-factor 1

创建完topic后,进行一些简单的kafka数据读写测试,向对应的topic中写入数据,命令行是否可以读取成功:

1./kafka-console-producer.sh  --topic inputTopicStorm --broker-list host-10-1-236-128:6667 --security-protocol PLAINTEXTSASL
2./kafka-console-consumer.sh --new-consumer --topic inputTopicStorm --bootstrap-server host-10-1-236-128:6667 --security-protocol PLAINTEXTSASL --from-beginning
3上述命令如果可以正常执行,说明kerberos环境正常。如果遇到权限问题,见文末的kafka权限命令。

2 storm kafka 测试代码

注意支持kerberos的api 最低从kafka0.9开始,需要用新的API,需要传递的参数如下:

 1 Properties props = new Properties();
2        props.put("bootstrap.servers", "host-10-1-236-128:6667,host-10-1-236-129:6667,host-10-1-236-130:6667");
3        props.put("acks", "1");
4        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
5        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
6        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
7        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
8        props.put("security.protocol", "SASL_PLAINTEXT");
9        KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig
10                .builder(props.getProperty("bootstrap.servers"), "foo")
11                .setGroupId("KafkaStormGroupID")
12                .setProp(props)
13                .setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()), new Fields("topic", "key", "message"))
14                .build();

注意,根据kafka 0.10.2 的feature Dynamic JAAS configuration for Kafka clients  jaas 配置文件的内容已经可以直接写在代码中了,参考原版本代码的实现 https://github.com/pvillard31/storm-kafka-kerberos
为了做到基础组件的透明,避免所有应用实现时都需要指定一遍kerberos的详细访问信息,这里还是采用了旧的方式,即ambari已经写入到了storm每个supervisor节点的/usr/hdp/current/storm-supervisor/conf/storm_jaas.conf文件中。正确的版本为:

1KafkaClient {
2   com.sun.security.auth.module.Krb5LoginModule required
3   useKeyTab=true
4   keyTab="/etc/security/keytabs/storm.headless.keytab"
5   storeKey=true
6   useTicketCache=false
7   serviceName="ocdp"
8   principal="ocdp-clusteraa@ASIAINFO.COM";
9};

4 开源版本storm的配置

在开源版本中,缺少了ambari的角色,需要我们自己修改storm.yaml,加入以下配置:

1java.security.auth.login.config : '/data/storm/apache-storm-1.1.1/conf/storm_jaas.conf'
2worker.childopts : '-Djava.security.auth.login.config=/data/storm/apache-storm-1.1.1/conf/storm_jaas.conf'

这样worker中就会有java.security.auth.login.config

5 安全模式ZK问题:

注意如果需要连接安全模式ZK:
storm.yaml中需要加上

 1supervisor.childopts : '-Djava.security.auth.login.config=/data/storm/apache-storm-1.1.1/conf/storm_jaas.conf '
2并且storm_jaas.conf中也需要加上zk的安全配置:
3Client {
4com.sun.security.auth.module.Krb5LoginModule required
5useKeyTab=true
6keyTab="/opt/apache-storm-1.1.1/conf/storm.headless.keytab"
7storeKey=true
8useTicketCache=false
9serviceName="zookeeper"
10principal="[ocdp-clusteraa@ASIAINFO.COM](https://link.jianshu.com?t=mailto%3Aocdp-clusteraa%40ASIAINFO.COM)";
11};

6 kafka权限问题:

使得ocsp用户对topic的所有权限

1./kafka-acls.sh --add --allow-principal user:ocsp --operation ALL --topic inputTopicStorm --authorizer-properties zookeeper.connect=host-10-1-236-128:2181

使得ocsp用户能够使用任意的groupID来消费所有的topic

1./kafka-acls.sh --authorizer-properties zookeeper.connect=host-10-1-236-128:2181 --allow-principal user:ocsp  --consumer --topic=* --group=*  --add

7 常见错误:

1. javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user

该问题是jaas config 配置有误

2. 较低版本的HDP、kafka

如果在/usr/hdp/current/storm-supervisor/conf/storm_jaas.conf文件中没有KafkaClient字段,需要自行加入
详见: https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.2/bk_storm-user-guide/content/stormkafka-secure-config.html

3 Storm开启kerberos后 UI访问较为麻烦的问题

通过Ambari 创建一个 Storm View, 在新建的Storm View中就可以直接访问Storm UI了。
详见 https://docs.hortonworks.com/HDPDocuments/Ambari-2.4.0.0/bk_ambari-views/content/creating_the_storm_view_instance.html


以上是关于storm 访问 kerberos kafka的主要内容,如果未能解决你的问题,请参考以下文章

如何在Kerberos环境使用Flume采集Kafka数据并写入HDFS

kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用

Kafka Kerberos客户端访问

Kafka Kerberos多端口访问

kafka kerberos 认证访问与非认证访问共存下的ACL问题

FlinkFlink跨集群访问开启Kerberos认证的Kafka