kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用

Posted harsenzhao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用相关的知识,希望对你有一定的参考价值。

kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用

一.访问的kafka的一些配置(已集成kerberos )

由于kafka集成了kerberos 所以需要通过kerberos的认证

认证方式有两种

  • 1.通过配置文件
  • 2.通过keytab文件

我们这里采用第一种

首先先在目录/usr/local/kafka_client下创建两个文件一个是client.properties,一个是jaas.conf

在client.properties文件里面写入

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
group.id=testgroup

在jaas.conf写入

KafkaClient 
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
;

之后在shell命令行执行一下命令来配置环境变量(这样只针对当前进程有效)

[root@cdh-datanode03 kafka_client]# export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_client/jaas.conf"
[root@cdh-datanode03 kafka_client]# echo $KAFKA_OPTS

在执行kinit命令登陆kerberos用户

二.Shell 命令行使用Kafka(已集成sentry)

1.创建kafka topic

[root@cdh-datanode03 kafka_client]# kafka-topics --create --zookeeper cdh-master01:2181 --replication-factor 1 --partitions 1 --topic testTopic

2.查看Topic列表

[root@cdh-datanode03 kafka_client]# kafka-topics --zookeeper cdh-master01:2181 --list

3.删除Topic

[root@cdh-datanode03 kafka_client]# kafka-topics --delete --zookeeper cdh-master01:2181 --topic testTopic

4.向Topic生产数据(需要权限)

[root@cdh-datanode03 kafka_client]# kafka-console-producer --broker-list cdh-datanode03:9092,cdh-datanode04:9092 --topic testTopic --producer.config /usr/local/kafka_client/client.properties

5.消费Topic数据(需要权限)

[root@cdh-datanode03 kafka_client]# kafka-console-consumer --topic testTopic --from-beginning --bootstrap-server cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092 --consumer.config /usr/local/kafka_client/client.properties

此时会报以下错误 表示没有权限向testTopicTopic 写入数据此时我们需要给我们kinit登陆的用户赋予权限

ERROR internals.ErrorLoggingCallback: Error when sending message to topic testTopic with key: null, value: 3 bytes with error:
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [testTopic]

我们以fayson用户为例 它属于user组(id+用户名 查看组)

  • 1.我们需要首先创建一个kafka的principle
  • 2.我们给user用户组赋权可以写入数据到testTopic,注意需要使用管理员kafka用户登录Kerberos才能进行操作
[root@cdh-datanode03 kafka_client]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: kafka@GREE.IO

Valid starting       Expires              Service principal
09/11/2019 20:47:25  09/12/2019 20:47:25  krbtgt/GREE.IO@GREE.IO
    renew until 09/18/2019 20:47:25
  • 3.创建一个role
[root@cdh-datanode03 kafka_client]#  kafka-sentry -cr -r kafka_role
  • 4.给kafka_role赋予写入testTopic权限
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=write"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=describe"
  • 5.将角色加入到user组下面
[root@cdh-datanode03 kafka_client]#  kafka-sentry -arg -r kafka_role -g user
  • 6.以fayson用户登录(输入密码)
[root@cdh-datanode03 kafka_client]# kinit fayson

之后以此用户写入testTopic 就不会报权限问题了

此时我们还需要给 fayson 用户赋予读取testTopic的权限,所以需要给kafka_role赋予读取testtopic的权限

  • 1.我们在上面完成的基础之上需要对kafka_role角色赋予读取testTopic 的权限
  • 2.执行以下命令需要使用kafka 用户
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=read"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=describe"
[root@cdh-datanode03 kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=zhcTestTopic->action=read"

三.代码访问(java)

需要创建consumer.properties,producer.properties,jaas.conf文件 还要引入krb5.conf文件

producer.properties文件内容

bootstrap.servers=cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092
#实现了Serializer接口的序列化类。用于告诉kafka如何序列化key
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#告诉kafka如何序列化value
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=1
#访问kerberos的kafka client 配置
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

consumer.properties文件内容

bootstrap.servers=cdh-datanode04:9092
group.id=testgroup1
enable.auto.commit=true
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

jaas.conf文件内容

KafkaClient 
  com.sun.security.auth.module.Krb5LoginModule required
  doNotPrompt=true
  useKeyTab=true
  storeKey=true
  renewTicket=true
  keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
  principal="gree1@GREE.IO";
;


Client 
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
  principal="gree1@GREE.IO";
;
  • 1.pom.xml
    <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.2.1-cdh6.3.0</version>
      </dependency>
  • 2.producer
package producer;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.util.Properties;

class MyProducer 
    private static final MyProducer Instance = new MyProducer();

    private MyProducer() 
    

    public static MyProducer getInstance() 
        return Instance;
    

    public int messageNo = 1;

    /**
     * 获得一个Kafka生产者实例
     *
     * @return
     */
    public KafkaProducer Produce() 
        System.setProperty("java.security.auth.login.config",             "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");
       
        Properties props = new Properties();
        try 
            props.load(this.getClass().getResourceAsStream("/producer.properties"));
         catch (IOException e) 
            e.printStackTrace();
        
        KafkaProducer producer = new KafkaProducer(props);
        return producer;
    


public class ProducerStarter implements Runnable 
    private int threadIndex;

    public ProducerStarter(int threadIndex) 
        this.threadIndex = threadIndex;
    

    /**
     * 生产数据
     */

    public void run() 
        MyProducer pro = MyProducer.getInstance();
        KafkaProducer prod = pro.Produce();
        String topic = "testTopic";
        int i = 0;
        while (1 == 1) 
            final int index = i++; 
            try 
                Thread.sleep(200);
             catch (InterruptedException e) 
                e.printStackTrace();
            

            prod.send(new ProducerRecord<String, String>(topic,String.valueOf(index), String.valueOf(i)), new Callback() 

                public void onCompletion(RecordMetadata recordMetadata, Exception e) 
                    if (e != null) 
                        e.printStackTrace();
                    
                    System.out.println("message send to partition " + recordMetadata.partition() + /*value*/ ": hello word " + index);
                
        );
            prod.flush();
            //sleep 1min
            try 
                Thread.sleep(2000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
    

    /**
     * 启动200个线程,生产
     *
     * @param args
     */
    public static void main(String args[]) 

        for (int i = 0; i < 1; i++) 
            System.out.println("启动线程:" + i);
            Thread thread = new Thread(new ProducerStarter(i));
            thread.start();
        
    

  • 3.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;


import java.util.Arrays;

public class ConsumerStarter 
    public static void main(String[] args) throws InterruptedException 

        KafkaConsumer consumer = Consumer.getInstance().Consume();
        consumer.subscribe(Arrays.asList("testTopic"));
        //消费并打印消费结果
        while (true) 
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord record: records) 
                System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
            
            Thread.sleep(1000);
        
    
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Properties;

/**
 * Created by 260212 on 2018/4/12.
 * Author:JackLee -赵化臣
 * 描述:
 */
class Consumer 
    private static final Consumer Instance=new Consumer();
    private Consumer()
    public static Consumer getInstance()
        return Instance;
    

    /**
     * 获得一个Kafka消费者
     * kafka-clients版本要高于0.9.0.1,否则会取出为null
     * @return
     */
    public KafkaConsumer Consume ()
        System.setProperty("java.security.auth.login.config",  "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");
        Properties props=new Properties();
        try 
            props.load(this.getClass().getResourceAsStream("/consumer.properties"));
         catch (IOException e) 
            e.printStackTrace();
        
        KafkaConsumer consumerSelf=new KafkaConsumer<String,String>(props);
        return consumerSelf;
    


以上是关于kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 认证三:Kerberos 认证中心部署

Kafka 认证三:Kerberos 认证中心部署

Kafka Kerberos 安全认证

Kafka 认证三:添加 Kerberos 认证详细流程

Kafka 认证三:添加 Kerberos 认证详细流程

Kafka 认证三:添加 Kerberos 认证详细流程