Kafka身份认证与权限控制配置

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka身份认证与权限控制配置相关的知识,希望对你有一定的参考价值。

参考技术A 编辑原有配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/server.properties

listeners=SASL_PLAINTEXT://192.168.43.209:9092

security.inter.broker.protocol=SASL_PLAINTEXT

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

allow.everyone.if.no.acl.found=true

super.users=User:root

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf

KafkaServer

       org.apache.kafka.common.security.plain.PlainLoginModule required

        username="kafka"

        password="kafkapswd"

        user_ kafkaa(用户名)="kafkaapswd"(密码)

        user_ kafkab(用户名)=" kafkabpswd"(密码)

user_ kafkac(用户名)=" kafkacpswd"(密码)

user_ kafkad(用户名)=" kafkadpswd"(密码);

;

修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-server-start.sh

if ["x$KAFKA_OPTS" ]; then

    export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf"

fi

修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-run-class.sh

KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf'

if ["x$DAEMON_MODE" = "xtrue" ]; then

  nohup $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS  $KAFKA_JMX_OPTS$KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" >"$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &

else

  exec $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH$KAFKA_OPTS "$@"

fi

创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf

KafkaClient

       org.apache.kafka.common.security.plain.PlainLoginModule required

        username=" kafkaa"

        password=" kafkaapswd";

;

修改执行文件

vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh

vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-producer.sh

if ["x$KAFKA_OPTS" ]; then

    export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"

fi

运行jar包的服务器的指定路径下必须有kafka_ client_ jaas.conf文件

在程序中添加如下配置

System.setProperty("java.security.auth.login.config","xx/kafka_client_jaas.conf");

props.put("security.protocol","SASL_PLAINTEXT");

props.put("sasl.mechanism","PLAIN");

问题描述:发布消息、订阅消息时,出现如下错误,WARN [Consumer clientId=consumer-1, groupId=console-consumer-20752]Error while fetching metadata with correlation id 2 :test2=LEADER_NOT_AVAILABLE (org.apache.kafka.clients.NetworkClient)

解决方法:各客户端的用户名设置为相同,多个客户端同时管理会产生冲突。

kakfa 安全机制

在0.9版本之后,kafka 增加了身份认证和权限控制 两种安全机制。

身份认证:指客户端与服务端连接进行身份认证;包括 客户端与kafka代理之间、代理与代理之间、代理与zookeeper之间的连接认证;目前支持 SSL 、 SASL/Kerberos 、SASL/PLAIN 这3种认证机制;

权限控制:指对客户端的读写操作进行权限控制。

1、利用 SASL/PLAIN 进行身份认证:

  1)修改 server.properties 文件,开启 SASL认证配置:

    listeners=SASL_PLAINTEXT://0.0.0.0:9092  #配置一个SASL端口

    security.inter.broker.protocol=SASL_PLAINTEXT  #设置代理之间通信协议

    sasl.enable.mechanisms=PLAIN  #启用SASL机制

    sasl.mechanism.inter.broker.protocol=PLAIN  #配置SASL机制

  2)创建服务端JAAS文件,配置PLAIN:

    在config目录下创建一个名为 kafka_server_jaas.conf 的文件,然后文件内容为: 

      KafkaServer 
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="kafkapswd"
        user_kafka="kafkapswd"
        user_morton="mortonpswd"
      

    username 和 password 指定该代理与集群中其他代理初始化连接的用户名和密码;

    通过以 "user_"为前缀后接用户名的方式创建连接代理的用户名和密码。

  3)创建和配置客户端JAAS文件:

    在config目录下创建一个名为 kafka_client_jaas.conf 的文件,然后文件内容为:

      KafkaClient 
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="morton"
        password="mortonpswd"
      

  4)将JAAS配置文件加入相应的配置文件:

    修改 kafka-server-start.sh 脚本,在该脚本中引入服务端 JAAS 文件: 

      if [ "x$KAFKA_OPTS" = "x" ]; then
        export KAFKA_OPTS="-DJava.security.auth.login.config=../config/kafka_server_jaas.config"
      fi

    修改 kafak-console-producer.sh 和 kafka-console-consumer.sh 甲苯,在该脚本中引入客户端 JAAS 文件

      if [ "x$KAFKA_OPTS" = "x" ]; then
        export KAFKA_OPTS="-DJava.security.auth.login.config=../config/kafka_client_jaas.config"
      fi

  5)启动服务端、生产者 和 消费者:

    kafka-server-start.sh --deamon ../config/server.properties

    kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-action --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

    kafka-console-consumer.sh --bootstrap-server localhost:9092 topic kafka-action --consumer.property security.protocol=SASL_PLAINTEXT --consumer.property sasl.mechanism=PLAIN

 

2、权限控制

  kafka 提供了 kafka-acls.sh 脚本支持查询(list)、添加(add)、移除(remove)这3类权限控制的操作;  

  要启用 kafka ACL 权限控制,首先需要在 server.properties 文件中增加权限控制实现类的设置:

    authorizer.class.name=kafka.security.auth.SimpleAuthorizer

  当启用了 Kafka ACL 权限控制后,默认条件下除了超级用户之外,所有用户均没有任何权限;

  在 server.properties 文件设置超级用户的格式为:super.users=User:user1;User:user2

  在 server.properties 文件给所有用户开启权限的配置格式为:allow.everyone.if.no.acl.found=true

  由于客户端启动都需要连接到 kafka,因此需要假如 Java.security.auth.login.config 环境变量设置,否则即使进行了授权,客户端依然连接不上kafka。由于客户端执行的脚本都会调用 kafka-run-class.sh 脚本,因此在此脚本中加入 Java.security.suth.login.config 环境变量:

# Launch mode
KAFKA_SASL_OPTS=-DJava.security.suth.login.config=../config/kafka_server_jaas.conf
if [ "x$DAEMON_MODE" = "xtrue" ]; then
  nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_SASL_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "[email protected]" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
  exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_SASL_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "[email protected]"
fi

  查询权限列表:

    通过 kafka-acls.sh 脚本可以查看某个主题(--topic)、某个消费组(--group)、集群(--cluster)当前的权限列表;

    kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --cluster

  为生产者授权:

    kafka-acls.sh --add --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:morton --producer --topic=*

    参数 --allow-principal 指定给某个用户授权(支持正则);

    参数 --deny-principal 指定某些用户不具有权限(支持正则);

    参数 --producer 指定为该用户生产者角色授权,相当于通过 --operation 参数赋予 Write 和 Describe 权限;

    参数 --topic 参数指定哪些主题具有赋予的权限(支持正则);

    参数 --allow-host 和 参数 --deny-host 指定允许和禁止生产者访问IP;

  为消费者授权:

    kafka-acls.sh -add --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:morton --consumer --topic=*  --group acls-group

    参数 --group 指定该消费者所属的消费组;

  删除权限:

    通过 remove 参数删除相应的权限信息,可以删除某用户对主题(--topic)、集群(cluster)、消费组(--group)的操作权限;

    参数 --operation 可以指定删除具体的权限;

    参数 --force 可以强制删除;

    kafka-acls.sh --authorizer-properties zookeeper-connect=localhost:2181 --remove --topic acls-foo --force

以上是关于Kafka身份认证与权限控制配置的主要内容,如果未能解决你的问题,请参考以下文章

kakfa 安全机制

Kafka安全机制解析及重构(四) ACL权限控制

kafka kerberos 认证访问与非认证访问共存下的ACL问题

认证鉴权与API权限控制在微服务架构中的设计与实现

认证鉴权与API权限控制在微服务架构中的设计与实现

认证鉴权与API权限控制在微服务架构中的设计与实现