Kafka SASL/PLAIN加密 及Kafka-Python整合

Posted Aarend

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka SASL/PLAIN加密 及Kafka-Python整合相关的知识,希望对你有一定的参考价值。

SASL/PLAIN


前言

        SASL/PLAIN是kafka中一种使用用户名/密码的身份验证机制,本文使用Kafka-Python2.02 及kafka3.2.0进行简单的整合操作。

一、配置Kafka 节点信息

1. kafka增加认证信息:

1、在Kafka每个节点的安装目录下的config下新建一个kafka_server_jaas.conf文件

并写入以下内容

JAAS文件定义了链接Kafka Broker时所需要的用户名密码及broker各个节点之间相互通信的用户名密码

KafkaServer 
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_producer="producer"
user_consumer="consumer";
;

2、config下新建一个kafka_producer_jaas.conf文件,用作生产者账号

KafkaClient  
org.apache.kafka.common.security.plain.PlainLoginModule required 
username="producer" 
password="producer"; 
;

3、config下新建一个kafka_consumer_jaas.conf文件用作消费者账号

KafkaClient  
org.apache.kafka.common.security.plain.PlainLoginModule required 
username="consumer" 
password="consumer"; 
;

4、config下修改server.properties,添加如下内容

##当前主机名称
listeners=SASL_PLAINTEXT:localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
##当没有找到ACL配置时,允许所有的访问操作。
allow.everyone.if.no.acl.found=true 

 5、config下修改producer.properties,添加如下内容

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

  6、config下修改consumer.properties,添加如下内容

##username 和 password 对应kafka_server_jaas.conf中的用户名密码
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required 
username="admin" password="admin";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

2.修改kafka脚本-添加相关配置文件内容:

1、修改启动脚本

找到bin目录下kafka-server-start.sh的最后一行并在中间添加如下内容:

-Djava.security.auth.login.config=kafka/config/kafka_server_jaas.conf
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/kafka/config/kafka_server_jaas.conf kafka.Kafka "$@"

2、修改kafka-console-consumer.sh

找到bin目录下kafka-console-consumer.sh的最后一行并在中间添加如下内容:

-Djava.security.auth.login.config=kafka/config/kafka_client_consumer_jaas.conf 
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=kafka/config/kafka_client_consumer_jaas.conf kafka.tools.ConsoleConsumer "$@"

3、修改kafka-console-producer.sh

找到bin目录下kafka-console-producer.sh的最后一行并在中间添加如下内容:

-Djava.security.auth.login.config=kafka/config/kafka_client_producer_jaas.conf 
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=kafka/config/kafka_client_producer_jaas.conf kafka.tools.ConsoleProducer "$@"

4、修改kafka-topics.sh

找到bin目录下kafka-topics.sh,并找到export KAFKA_OPTS 添加如下内容:

-Djava.security.auth.login.config=kafka/config/kafka_server_jaas.conf"
export KAFKA_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=kafka/config/kafka_server_jaas.conf"

3.启动kafka并进行测试:

   启动kafka

bin/kafka-server-start.sh config/server.properties

注意:配置kafka sasl加密认证后启动脚本需要指定配置文件

 启动生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config config/producer.properties

   启动消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer.properties

另外说一下,开启kafka sasl加密认证后 无法查看主题的情况需要添加一个配置文件并在使用​ kafka-topics.sh指向该配置文件即可

 例:

在config目录下新建config.properties文件,并添加如下内容

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

查看主题情况

bin/kafka-topics.sh --list --bootstrap-server localhost:9092 --command-config config.properties

Kafka-Python整合

生产者示例

from kafka import KafkaProducer
import json

producer = KafkaProducer(
                            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                            key_serializer=lambda k: json.dumps(k).encode(),
                            bootstrap_servers=localhost:9092
                            security_protocol = 'SASL_PLAINTEXT',
                            sasl_mechanism = 'PLAIN',
                            sasl_plain_username = 'username',
                            sasl_plain_password = 'password'
)                            # 假设生产的消息为json字符串

data = 'Data':'我是一条json消息'

producer.send("Test",data)
producer.close()

消费者示例

from kafka import KafkaConsumer

consumer = KafkaConsumer(
            'Test', 
            bootstrap_servers=localhost:9092,
            group_id='Test_Group',
            auto_offset_reset='latest',
            security_protocol='SASL_PLAINTEXT',
            sasl_mechanism='PLAIN',
            sasl_plain_username='consumer',
            sasl_plain_password='consumer',
            enable_auto_commit='False'
)
#  
for msg in consumer:
    print(msg)
    consumer.commit()

文章有什么不对的地方希望大神多多指点~

以上是关于Kafka SASL/PLAIN加密 及Kafka-Python整合的主要内容,如果未能解决你的问题,请参考以下文章

go kafka 配置SASL认证及实现SASL PLAIN认证功能

kafka sasl/plain安全认证

kafka使用 SASL/PLAIN 认证服务端/客户端配置

kafka安全认证与授权(SASL/PLAIN)

开启kafka密码认证

开启kafka密码认证