Kafka 集群配置SASL+ACL

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 集群配置SASL+ACL相关的知识,希望对你有一定的参考价值。

                                 **  Kafka 集群配置SASL+ACL

测试环境:**

                    系统: CentOS 6.5 x86_64
                    JDK : java version 1.8.0_121
                    kafka: kafka_2.11-1.0.0.tgz
                    zookeeper: 3.4.5
                    ip: 192.168.49.161 (我们这里在一台机上部署整套环境)

kafka 名词解析:

                    Broker: Kafka 集群包含一个或多个服务器,这种服务器被称为broker
                    Topic: 每条发布到Kafka 集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic 的消息
                    分开存储,逻辑上一个Topic 的消息虽然保存于一个或多个broker 上但用户只需指定消息的Topic 即可生产或
                    消费数据而不必关心数据存于何处)
                    Partition: Parition 是物理上的概念,每个Topic 包含一个或多个Partition
                    Producer: 负责发布消息到Kafka broker 即生产者
                    Consumer: 消息消费者,向Kafka broker 读取消息的客户端即消费者
                    Consumer Group:每个Consumer 属于一个特定的Consumer Group(可为每个Consumer 指定group
                    name,若不指定group name 则属于默认的group)

Kafka 拓扑结构(借用别人的图)
技术分享图片
一个典型的Kafka 集群中包含若干Producer(可以是web 前端产生的Page View,或者是服务器日志,系统 CPU、Memory 等),若干broker(Kafka 支持水平扩展,一般broker 数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper 集群。Kafka 通过Zookeeper 管理集群配置,选举leader,以及在Consumer Group 发生变化时进行rebalance。Producer 使用push 模式将消息发布到broker,Consumer 使用pull 模式从broker 订阅并消费消息

kafka 集群部署
一 Zookeeper 集群部署(这里部署的伪集群)

  1. zookeeper 部署比较简单,解压修改配置文件后即可使用, 默认的配置文件为zoo_sample.cfg 这里我们直接新建一个zoo.cfg 的文件内容如下:

各参数意义这里不作详细解释,想了解可以自行参考官网

                                        tickTime=2000
                                        initLimit=5
                                        syncLimit=2
                                        dataDir=/opt/zook/zookeeper1/data //指定数据路径
                                        dataLogDir=/opt/zook/zookeeper1/logs //指定日志路径
                                        clientPort=2181 //监听端口
                                        server.1=127.0.0.1:7771:7772
                                        server.2=127.0.0.1:7771:7772
                                        server.3=127.0.0.1:7771:7772
  2. 在zookeeper/data 下创建一个名为myid 的文件,并在文件内输入数字1
 3. 集群部署则将上述配置的单节点再复制两份,并将配置文件中
                            ```
                            dataDir , dataLogDir 两个参数根据实际路径修改
                            clientPort 分别配为2182,2183
                            myid 分别配置为2 3
                            ```
  4. 启动集群

二 Kafka 集群部署

 由于集群最终是对外部网络开放的,出于安全考虑本文采用的是SASL+ACL 控制和授权

1. broker 配置
1.1. 将配置server.properties 复制2 份并分别重命名成server-1.properties server-2.properties server-3.properties 修改server.properties 配置:(请根据实际环境填写)

                                                                                                                    broker.id = 1 //另外两个节点分别为2 3
                                                                                                                    host.name=192.168.49.161
                                                                                                                    log.dirs=/tmp/kafka-logs-1 //另外两个节点分别为kafka-logs-2 kafka-logs-3 可以不
                                                                                                                    放在/tmp 下,如果放在其他地方必须注意目录权限
                                                                                                                    zookeeper.connect=192.168.49.161:2181,192.168.49.161:2182,192.168.49.161:2183
                                                                                                                    port=9092 //另外两个节点分别为9093 9094 也可以自定义
                                                                                                                    listeners=SASL_PLAINTEXT://192.168.49.161:9092 //另外两个节点分别为9093 9094
                                    到这里基础配置已完成
                    1.2    要配置SASL 和ACL,需要在broker 端进行两个方面的设置。
               首先是创建包含所有认证用户信息的JAAS 文件,本例中我们有admin qjld 两个用户,其中admin 为管理员用于broker 集群间的认证, qjld 为远程应用的认证用户, 新增认证信息文件
                                                                                                                        ```
                                                                                                                        kafka_server_jaas.conf 内容为:
                                                                                                                        KafkaServer {
                                                                                                                        org.apache.kafka.common.security.plain.PlainLoginModule required
                                                                                                                        username="admin"
                                                                                                                        password="admin-mgr998778123"
                                                                                                                        user_admin="admin-mgr998778123"
                                                                                                                        user_qjld="123456";
                                                                                                                        };
                                                                                                                        ```
                                                                        本例中路径为: /opt/kafka_2.11-1.0.0/config/kafka_server_jaas.conf , 我们需要将配置文件中的内容传递给JVM,因此需要修改/opt/kafka_2.11-                                                         1.0.0/bin/kafka-server-start.sh 脚本文件如下:
                                                                    ```
                                                                    在exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "[email protected]" 之前一行添加export KAFKA_OPTS="-                                  Djava.security.auth.login.config=/opt/kafka_2.11-1.0.0/config/kafka_server_jaas.conf"
                                                                    然后保存退出。
                                                                    ```
                            其次我们要在所有broker 配置文件(server-x.properties)中增加:
                                                                                            ```
                                                                                            #ACL 入口类
                                                                                            authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
                                                                                            #SASL_PLAINTEXT
                                                                                            sasl.enabled.mechanisms=PLAIN
                                                                                            security.inter.broker.protocol=SASL_PLAINTEXT
                                                                                            sasl.mechanism.inter.broker.protocol=PLAIN
                                                                                            allow.everyone.if.no.acl.found=true
                                                                                            #设置admin 为超级用户
                                                                                            super.users=User:admin
                                                                                            ```
                            现在我们可以启动broker 测试了
                                                                                        ```
                                                                                        bin/kafka-server-start.sh config/server-1.properties &
                                                                                        bin/kafka-server-start.sh config/server-2.properties &
                                                                                        bin/kafka-server-start.sh config/server-3.properties &
                                                                                        ```
           启动成功,此时broker 可以接收已认证的客户端连接了,下面我们进行客户端配置
                                                                        ```
                                                                        //创建一个topic 名为test
                                            bin/kafka-topics.sh--create --zookeeper 192.168.49.161:2181,192.168.49.161:2182, 192.168.49.161:2183 --replication-factor 1 --partitions 1 --topic test    
                                                                        //查看topic 是否创建成功
                                                                        bin/kafka-topics.sh --list --zookeeper 192.168.49.161:2181 
                                                                        ```
  **2. Client 配置**
            2.1 新增一个配置文件如:/opt/kafka_2.11-1.0.0/config/kafka_client_jaas.conf,内容为:
                                                                                                                        ```
                                                                                                                        KafkaClient {
                                                                                                                        org.apache.kafka.common.security.plain.PlainLoginModule required
                                                                                                                        username="qjld"
                                                                                                                        password="123456";
                                                                                                                        };
                                                                                                                        ```

同理我们也要将配置文件内容传递给JVM, 因此需要修改

                                                                /opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh 脚本文件如下:在exec $base_dir/kafka-run-class.sh
                                                                $EXTRA_ARGS kafka.Kafka "[email protected]" 之前一行添加export
                                                                KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"
                                                                同样的方法修改kafka-console-consumer.sh
                            2.2 需要在config/consumer.properties 和config/producer.properties 配置文件中追加:
                                                                ```
                                                                security.protocol=SASL_PLAINTEXT
                                                                sasl.mechanism=PLAIN
                                                                ```
                                    这两行配置, 完成后再试运行:
                                                                ```
                                                                bin/kafka-console-producer.sh --broker-list 192.168.49.161:9092 --topic test                                                                 --producer.config config/producer.properties
                                                                bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.161:9092 --topic test
                                                                --from-beginning  --consumer.config config/consumer.properties
                                                                ```
                                    接下来测试消息的写入和读取, 开启两个终端分别输入下列
                                                            ```
                                                            bin/kafka-console-producer.sh --broker-list 192.168.49.161:9092 --topic test //消息写入
                                                            bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.161:9092 --topic test  --from-beginning 消息读取
                                                            ```
               如果报错信息如下:
                           `WARN Bootstrap broker 192.168.49.161:9092 disconnected (org.apache.kafka.clients.NetworkClient)`
               则说明配置的security 已生效, 要想普通用户能读写消息,需要配置ACL
         2.3 配置ACL
                                                            ```
                                                            bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties
                                                            zookeeper.connect=192.168.49.161:2181,192.168.49.161:2182,192.168.49.161:2183 --add
                                                            --allow-principal User:qjld --operation All --topic test
                                                            ```
             这是为qjld 这个用户配置所有权限,如果有更细的要求则可参考网上的,如:Read Write 等只需要将--operation all 改为--operation Read 多种则在其后面加一个--         operation write 即可

三 测试
这里我选用python2.7 编写的脚本进行测试
先安装pip27 install kafka-python
技术分享图片

以上是关于Kafka 集群配置SASL+ACL的主要内容,如果未能解决你的问题,请参考以下文章

基于SASL+ACL 的Kafka集群内外网访问

Kafka SASL + ACL实战

如何通过可视化方式快捷管理kafka的acl配置

kafka快速配置启用ACL示例

Kafka 动态添加 SASL 用户,无需重启集群

Kafka SASL集群部署