kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用
Posted harsenzhao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用相关的知识,希望对你有一定的参考价值。
目录
kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用
一.访问的kafka的一些配置(已集成kerberos )
由于kafka集成了kerberos 所以需要通过kerberos的认证
认证方式有两种
- 1.通过配置文件
- 2.通过keytab文件
我们这里采用第一种
首先先在目录/usr/local/kafka_client下创建两个文件一个是client.properties,一个是jaas.conf
在client.properties文件里面写入
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
group.id=testgroup
在jaas.conf写入
KafkaClient
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
;
之后在shell命令行执行一下命令来配置环境变量(这样只针对当前进程有效)
[root@cdh-datanode03 kafka_client]# export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_client/jaas.conf"
[root@cdh-datanode03 kafka_client]# echo $KAFKA_OPTS
在执行kinit命令登陆kerberos用户
二.Shell 命令行使用Kafka(已集成sentry)
1.创建kafka topic
[root@cdh-datanode03 kafka_client]# kafka-topics --create --zookeeper cdh-master01:2181 --replication-factor 1 --partitions 1 --topic testTopic
2.查看Topic列表
[root@cdh-datanode03 kafka_client]# kafka-topics --zookeeper cdh-master01:2181 --list
3.删除Topic
[root@cdh-datanode03 kafka_client]# kafka-topics --delete --zookeeper cdh-master01:2181 --topic testTopic
4.向Topic生产数据(需要权限)
[root@cdh-datanode03 kafka_client]# kafka-console-producer --broker-list cdh-datanode03:9092,cdh-datanode04:9092 --topic testTopic --producer.config /usr/local/kafka_client/client.properties
5.消费Topic数据(需要权限)
[root@cdh-datanode03 kafka_client]# kafka-console-consumer --topic testTopic --from-beginning --bootstrap-server cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092 --consumer.config /usr/local/kafka_client/client.properties
此时会报以下错误 表示没有权限向testTopicTopic 写入数据此时我们需要给我们kinit登陆的用户赋予权限
ERROR internals.ErrorLoggingCallback: Error when sending message to topic testTopic with key: null, value: 3 bytes with error:
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [testTopic]
我们以fayson用户为例 它属于user组(id+用户名 查看组)
- 1.我们需要首先创建一个kafka的principle
- 2.我们给user用户组赋权可以写入数据到testTopic,注意需要使用管理员kafka用户登录Kerberos才能进行操作
[root@cdh-datanode03 kafka_client]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka@GREE.IO
Valid starting Expires Service principal
09/11/2019 20:47:25 09/12/2019 20:47:25 krbtgt/GREE.IO@GREE.IO
renew until 09/18/2019 20:47:25
- 3.创建一个role
[root@cdh-datanode03 kafka_client]# kafka-sentry -cr -r kafka_role
- 4.给kafka_role赋予写入testTopic权限
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=write"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=describe"
- 5.将角色加入到user组下面
[root@cdh-datanode03 kafka_client]# kafka-sentry -arg -r kafka_role -g user
- 6.以fayson用户登录(输入密码)
[root@cdh-datanode03 kafka_client]# kinit fayson
之后以此用户写入testTopic 就不会报权限问题了
此时我们还需要给 fayson 用户赋予读取testTopic的权限,所以需要给kafka_role赋予读取testtopic的权限
- 1.我们在上面完成的基础之上需要对kafka_role角色赋予读取testTopic 的权限
- 2.执行以下命令需要使用kafka 用户
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=read"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=describe"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=zhcTestTopic->action=read"
三.代码访问(java)
需要创建consumer.properties,producer.properties,jaas.conf文件 还要引入krb5.conf文件
producer.properties文件内容
bootstrap.servers=cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092
#实现了Serializer接口的序列化类。用于告诉kafka如何序列化key
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#告诉kafka如何序列化value
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=1
#访问kerberos的kafka client 配置
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
consumer.properties文件内容
bootstrap.servers=cdh-datanode04:9092
group.id=testgroup1
enable.auto.commit=true
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
jaas.conf文件内容
KafkaClient
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
storeKey=true
renewTicket=true
keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
principal="gree1@GREE.IO";
;
Client
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
principal="gree1@GREE.IO";
;
- 1.pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1-cdh6.3.0</version>
</dependency>
- 2.producer
package producer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.IOException;
import java.util.Properties;
class MyProducer
private static final MyProducer Instance = new MyProducer();
private MyProducer()
public static MyProducer getInstance()
return Instance;
public int messageNo = 1;
/**
* 获得一个Kafka生产者实例
*
* @return
*/
public KafkaProducer Produce()
System.setProperty("java.security.auth.login.config", "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");
Properties props = new Properties();
try
props.load(this.getClass().getResourceAsStream("/producer.properties"));
catch (IOException e)
e.printStackTrace();
KafkaProducer producer = new KafkaProducer(props);
return producer;
public class ProducerStarter implements Runnable
private int threadIndex;
public ProducerStarter(int threadIndex)
this.threadIndex = threadIndex;
/**
* 生产数据
*/
public void run()
MyProducer pro = MyProducer.getInstance();
KafkaProducer prod = pro.Produce();
String topic = "testTopic";
int i = 0;
while (1 == 1)
final int index = i++;
try
Thread.sleep(200);
catch (InterruptedException e)
e.printStackTrace();
prod.send(new ProducerRecord<String, String>(topic,String.valueOf(index), String.valueOf(i)), new Callback()
public void onCompletion(RecordMetadata recordMetadata, Exception e)
if (e != null)
e.printStackTrace();
System.out.println("message send to partition " + recordMetadata.partition() + /*value*/ ": hello word " + index);
);
prod.flush();
//sleep 1min
try
Thread.sleep(2000);
catch (InterruptedException e)
e.printStackTrace();
/**
* 启动200个线程,生产
*
* @param args
*/
public static void main(String args[])
for (int i = 0; i < 1; i++)
System.out.println("启动线程:" + i);
Thread thread = new Thread(new ProducerStarter(i));
thread.start();
- 3.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
public class ConsumerStarter
public static void main(String[] args) throws InterruptedException
KafkaConsumer consumer = Consumer.getInstance().Consume();
consumer.subscribe(Arrays.asList("testTopic"));
//消费并打印消费结果
while (true)
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord record: records)
System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
Thread.sleep(1000);
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.Properties;
/**
* Created by 260212 on 2018/4/12.
* Author:JackLee -赵化臣
* 描述:
*/
class Consumer
private static final Consumer Instance=new Consumer();
private Consumer()
public static Consumer getInstance()
return Instance;
/**
* 获得一个Kafka消费者
* kafka-clients版本要高于0.9.0.1,否则会取出为null
* @return
*/
public KafkaConsumer Consume ()
System.setProperty("java.security.auth.login.config", "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");
Properties props=new Properties();
try
props.load(this.getClass().getResourceAsStream("/consumer.properties"));
catch (IOException e)
e.printStackTrace();
KafkaConsumer consumerSelf=new KafkaConsumer<String,String>(props);
return consumerSelf;
以上是关于kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用的主要内容,如果未能解决你的问题,请参考以下文章