Kafka身份认证与权限控制配置
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka身份认证与权限控制配置相关的知识,希望对你有一定的参考价值。
参考技术A 编辑原有配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/server.propertieslisteners=SASL_PLAINTEXT://192.168.43.209:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:root
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf
KafkaServer
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafkapswd"
user_ kafkaa(用户名)="kafkaapswd"(密码)
user_ kafkab(用户名)=" kafkabpswd"(密码)
user_ kafkac(用户名)=" kafkacpswd"(密码)
user_ kafkad(用户名)=" kafkadpswd"(密码);
;
修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-server-start.sh
if ["x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf"
fi
修改执行文件vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-run-class.sh
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_server_jaas.conf'
if ["x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS$KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" >"$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $KAFKA_HEAP_OPTS$KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH$KAFKA_OPTS "$@"
fi
创建新的配置文件vi /home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf
KafkaClient
org.apache.kafka.common.security.plain.PlainLoginModule required
username=" kafkaa"
password=" kafkaapswd";
;
修改执行文件
vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh
vi /home/wucan/kafka/kafka_2.11-1.0.0/bin/kafka-console-producer.sh
if ["x$KAFKA_OPTS" ]; then
export KAFKA_OPTS="-Djava.security.auth.login.config=/home/wucan/kafka/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"
fi
运行jar包的服务器的指定路径下必须有kafka_ client_ jaas.conf文件
在程序中添加如下配置
System.setProperty("java.security.auth.login.config","xx/kafka_client_jaas.conf");
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
问题描述:发布消息、订阅消息时,出现如下错误,WARN [Consumer clientId=consumer-1, groupId=console-consumer-20752]Error while fetching metadata with correlation id 2 :test2=LEADER_NOT_AVAILABLE (org.apache.kafka.clients.NetworkClient)
解决方法:各客户端的用户名设置为相同,多个客户端同时管理会产生冲突。
kakfa 安全机制
在0.9版本之后,kafka 增加了身份认证和权限控制 两种安全机制。
身份认证:指客户端与服务端连接进行身份认证;包括 客户端与kafka代理之间、代理与代理之间、代理与zookeeper之间的连接认证;目前支持 SSL 、 SASL/Kerberos 、SASL/PLAIN 这3种认证机制;
权限控制:指对客户端的读写操作进行权限控制。
1、利用 SASL/PLAIN 进行身份认证:
1)修改 server.properties 文件,开启 SASL认证配置:
listeners=SASL_PLAINTEXT://0.0.0.0:9092 #配置一个SASL端口 security.inter.broker.protocol=SASL_PLAINTEXT #设置代理之间通信协议 sasl.enable.mechanisms=PLAIN #启用SASL机制 sasl.mechanism.inter.broker.protocol=PLAIN #配置SASL机制
2)创建服务端JAAS文件,配置PLAIN:
在config目录下创建一个名为 kafka_server_jaas.conf 的文件,然后文件内容为:
KafkaServer org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafkapswd" user_kafka="kafkapswd" user_morton="mortonpswd"
username 和 password 指定该代理与集群中其他代理初始化连接的用户名和密码;
通过以 "user_"为前缀后接用户名的方式创建连接代理的用户名和密码。
3)创建和配置客户端JAAS文件:
在config目录下创建一个名为 kafka_client_jaas.conf 的文件,然后文件内容为:
KafkaClient org.apache.kafka.common.security.plain.PlainLoginModule required username="morton" password="mortonpswd"
4)将JAAS配置文件加入相应的配置文件:
修改 kafka-server-start.sh 脚本,在该脚本中引入服务端 JAAS 文件:
if [ "x$KAFKA_OPTS" = "x" ]; then
export KAFKA_OPTS="-DJava.security.auth.login.config=../config/kafka_server_jaas.config"
fi
修改 kafak-console-producer.sh 和 kafka-console-consumer.sh 甲苯,在该脚本中引入客户端 JAAS 文件
if [ "x$KAFKA_OPTS" = "x" ]; then
export KAFKA_OPTS="-DJava.security.auth.login.config=../config/kafka_client_jaas.config"
fi
5)启动服务端、生产者 和 消费者:
kafka-server-start.sh --deamon ../config/server.properties
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-action --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
kafka-console-consumer.sh --bootstrap-server localhost:9092 topic kafka-action --consumer.property security.protocol=SASL_PLAINTEXT --consumer.property sasl.mechanism=PLAIN
2、权限控制
kafka 提供了 kafka-acls.sh 脚本支持查询(list)、添加(add)、移除(remove)这3类权限控制的操作;
要启用 kafka ACL 权限控制,首先需要在 server.properties 文件中增加权限控制实现类的设置:
authorizer.class.name=kafka.security.auth.SimpleAuthorizer
当启用了 Kafka ACL 权限控制后,默认条件下除了超级用户之外,所有用户均没有任何权限;
在 server.properties 文件设置超级用户的格式为:super.users=User:user1;User:user2
在 server.properties 文件给所有用户开启权限的配置格式为:allow.everyone.if.no.acl.found=true
由于客户端启动都需要连接到 kafka,因此需要假如 Java.security.auth.login.config 环境变量设置,否则即使进行了授权,客户端依然连接不上kafka。由于客户端执行的脚本都会调用 kafka-run-class.sh 脚本,因此在此脚本中加入 Java.security.suth.login.config 环境变量:
# Launch mode KAFKA_SASL_OPTS=‘-DJava.security.suth.login.config=../config/kafka_server_jaas.conf‘ if [ "x$DAEMON_MODE" = "xtrue" ]; then nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_SASL_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "[email protected]" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null & else exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_SASL_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "[email protected]" fi
查询权限列表:
通过 kafka-acls.sh 脚本可以查看某个主题(--topic)、某个消费组(--group)、集群(--cluster)当前的权限列表;
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --cluster
为生产者授权:
kafka-acls.sh --add --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:morton --producer --topic=*
参数 --allow-principal 指定给某个用户授权(支持正则);
参数 --deny-principal 指定某些用户不具有权限(支持正则);
参数 --producer 指定为该用户生产者角色授权,相当于通过 --operation 参数赋予 Write 和 Describe 权限;
参数 --topic 参数指定哪些主题具有赋予的权限(支持正则);
参数 --allow-host 和 参数 --deny-host 指定允许和禁止生产者访问IP;
为消费者授权:
kafka-acls.sh -add --authorizer-properties zookeeper.connect=localhost:2181 --allow-principal User:morton --consumer --topic=* --group acls-group
参数 --group 指定该消费者所属的消费组;
删除权限:
通过 remove 参数删除相应的权限信息,可以删除某用户对主题(--topic)、集群(cluster)、消费组(--group)的操作权限;
参数 --operation 可以指定删除具体的权限;
参数 --force 可以强制删除;
kafka-acls.sh --authorizer-properties zookeeper-connect=localhost:2181 --remove --topic acls-foo --force
以上是关于Kafka身份认证与权限控制配置的主要内容,如果未能解决你的问题,请参考以下文章