Kafka安全机制解析及重构(四) ACL权限控制
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka安全机制解析及重构(四) ACL权限控制相关的知识,希望对你有一定的参考价值。
参考技术A 在前三篇文章中我们介绍了Kafka的安全机制,并自己重构了一个名为ABC的SASL机制
Kafka安全机制解析及重构(一)
Kafka安全机制解析及重构(二)
Kafka安全机制解析及重构(二)
我们将用户的权限细分为以下两类:
根据上一篇文章中的配置方法配置好后,Kafka可以实现Client与Broker之间的连接鉴权,但是也仅仅是连接权限。当按照上一篇的尝试配置好后,如果再配置上Kafka自带的ACL,会发现broker之间无法同步数据,且客户端就算配置上权限,仍然无法访问指定的TOPIC。这与Broker之间通信使用PLAINTEXT机制有关。
我们重读一下Kafka的ACL的说明
Principal指的就是用于校验权限的信息,在我们的机制中,也就是 用户名 。
通过阅读kafka.security.auth.SimpleAclAuthorizer这个类可以发现,在authorize()这个方法中,principal是从session中读取出来的。
可Kafka Broker之间明明是通过PLAINTEXT连接的,不会带上用户名信息的,那总该有个默认的principle吧,这个默认的principal可以在org.apache.kafka.common.security.auth.KafkaPrincipal中找到
为了进一步验证broker之间的连接是否是用ANONYMOUS连接的,我们可以开启Kafka的debug日志,在config/log4j.properties中修改配置
然后就可以在logs/kafka-authorizer.log中看到Broker之间互相用ANONYMOUS来访问,然后被拒绝的信息了。
既然明确了Broker之间使用的ANONYMOUS用户,那就好办了,将ANONYMOUS配置成超级用户就行了。我们只需要在server.properties中进行如下的配置,就可以让broker之间恢复正常通信:
需要注意以下两点:
zookeeperzookeeper的ACL权限控制
1.概述
转载并且补充:zookeeper的ACL权限控制
ACL:Access Control List 访问控制列表
关联文章:【zookeeper】ZooKeeper 权限管理与Curator增加权限验证
关联文章:【kafka】kerberos client is being asked for a password not available to garner authentication informa
1.1 简介
ACL 权限控制,使用:scheme:id:perm
来标识,主要涵盖 3 个方面:
权限模式(Scheme):授权的策略
授权对象(ID):授权的对象
权限(Permission):授予的权限
其特性如下:
- ZooKeeper的权限控制是基于每个znode节点的,需要对每个节点设置权限
- 每个znode支持设置多种权限控制方案和多个权限
- 子节点不会继承父节点的权限,客户端无权访问某节点,但可能可以访问它的子节点
例如:
setAcl /test2 ip:128.0.0.1:crwda
1.2 scheme 采用何种方式授权
-
world
:默认方式,相当于全部都能访问 -
auth
:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd
来添加当前上下文中的授权用户) -
digest
:即用户名:密码
这种方式认证,这也是业务系统中最常用的。用username:password
字符串来产生一个MD5串,然后该串被用来作为ACL ID。认证是通过明文发送username:password
来进行的,当用在ACL时,表达式为username:base64
,base64是password的SHA1摘要的编码。 -
ip
:使用客户端的主机IP作为ACL ID 。这个ACL表达式的格式为addr/bits ,此时addr中的有效位与客户端addr中的有效位进行比对。
1.3 ID 给谁授予权限
授权对象ID是指,权限赋予的用户或者一个实体,例如:IP 地址或者机器。授权模式 schema 与 授权对象 ID 之间
1.4 permission 授予什么权限
CREATE、READ、WRITE、DELETE、ADMIN
也就是 增、删、改、查、管理权限,这5种权限简写为crwda
注意:
这5种权限中,delete是指对子节点的删除权限,其它4种权限指对自身节点的操作权限
更详细的如下:
CREATE c 可以创建子节点
DELETE d 可以删除子节点(仅下一级节点)
READ r 可以读取节点数据及显示子节点列表
WRITE w 可以设置节点数据
ADMIN a 可以设置节点访问控制列表权限
1.5 ACL 相关命令
getAcl getAcl <path> 读取ACL权限
setAcl setAcl <path> <acl> 设置ACL权限
addauth addauth <scheme> <auth> 添加认证用户
2. 案例
2.1 测试zkCli设置权限
2.1.1 word方式
[zk: localhost:2181(CONNECTED) 9] create /test1 test1-value
Created /test1
[zk: localhost:2181(CONNECTED) 10] getAcl /test1 #创建的默认是所有用户都可以进行cdrwa
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 11] setAcl /test1 world:anyone:acd #修改为所有人可以acd
cZxid = 0x400000007
ctime = Tue Mar 12 14:46:55 CST 2019
mZxid = 0x400000007
mtime = Tue Mar 12 14:46:55 CST 2019
pZxid = 0x400000007
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0
[zk: localhost:2181(CONNECTED) 12] getAcl /test1
'world,'anyone
: cda
2.1.2 IP的方式
[zk: localhost:2181(CONNECTED) 13] create /test2 test2-value
Created /test2
[zk: localhost:2181(CONNECTED) 14] setAcl /test2 ip:127.0.0.1:crwda #修改此IP具有所有权限
cZxid = 0x400000009
ctime = Tue Mar 12 14:51:58 CST 2019
mZxid = 0x400000009
mtime = Tue Mar 12 14:51:58 CST 2019
pZxid = 0x400000009
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0
[zk: localhost:2181(CONNECTED) 15] getAcl /test2
'ip,'127.0.0.1
: cdrwa
当然可以设置IP的时候使用多个ip的方式,比如:
[zk: localhost:2181(CONNECTED) 42] setAcl /t3 ip:192.168.0.164:cdwra,ip:127.0.0.1:cdwra
cZxid = 0x400000018
ctime = Tue Mar 12 15:12:59 CST 2019
mZxid = 0x400000018
mtime = Tue Mar 12 15:12:59 CST 2019
pZxid = 0x400000018
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
[zk: localhost:2181(CONNECTED) 43] getAcl /t3
'ip,'192.168.0.164
: cdrwa
'ip,'127.0.0.1
: cdrwa
2.1.3 Auth
[zk: localhost:2181(CONNECTED) 44] create /t4 44
Created /t4
[zk: localhost:2181(CONNECTED) 45] addauth digest qlq:111222 #增加授权用户,明文用户名和密码
[zk: localhost:2181(CONNECTED) 46] setAcl /t4 auth:qlq:cdwra #授予权限
cZxid = 0x40000001d
ctime = Tue Mar 12 15:16:56 CST 2019
mZxid = 0x40000001d
mtime = Tue Mar 12 15:16:56 CST 2019
pZxid = 0x40000001d
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
[zk: localhost:2181(CONNECTED) 48] getAcl /t4
'digest,'qlq:JWNEexxIoeVompjU7O5pZzTU+VQ=
: cdrwa
如果重新连接之后获取会报没权限,需要添加授权用户:
[zk: localhost:2181(CONNECTED) 4] get /t4
Authentication is not valid : /t4
[zk: localhost:2181(CONNECTED) 6] addauth digest qlq:111222
[zk: localhost:2181(CONNECTED) 7] get /t4
44
cZxid = 0x40000001d
ctime = Tue Mar 12 15:16:56 CST 2019
mZxid = 0x40000001d
mtime = Tue Mar 12 15:16:56 CST 2019
pZxid = 0x40000001d
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
2.1.3 Digest
etAcl /test digest:用户名:密码:权限
密码是用户名和密码加密后的字符串。
2.1.3.1 生成密码:sha1加密之后base64编码
package zd.dms.test;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import org.apache.commons.codec.binary.Base64;
public class Test
public static void main(String[] args) throws NoSuchAlgorithmException
String usernameAndPassword = "user:123456";
byte digest[] = MessageDigest.getInstance("SHA1").digest(usernameAndPassword.getBytes());
Base64 base64 = new Base64();
String encodeToString = base64.encodeToString(digest);
System.out.println(encodeToString);
输出:6DY5WhzOfGsWQ1XFuIyzxkpwdPo=
2.1.3.2. 设置权限
[zk: localhost:2181(CONNECTED) 7] setAcl /t6 digest:user:6DY5WhzOfGsWQ1XFuIyzxkpwdPo=:crwda #授权
cZxid = 0x400000028
ctime = Tue Mar 12 15:50:02 CST 2019
mZxid = 0x400000028
mtime = Tue Mar 12 15:50:02 CST 2019
pZxid = 0x400000028
cversion = 0
dataVersion = 0
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
[zk: localhost:2181(CONNECTED) 8] getAcl /t6
'digest,'user:6DY5WhzOfGsWQ1XFuIyzxkpwdPo=
: cdrwa
直接删除会不允许,也必须增加摘要之后才能删除
[zk: localhost:2181(CONNECTED) 1] rmr /t6 #直接删除没权限
Authentication is not valid : /t6
[zk: localhost:2181(CONNECTED) 2] addauth digest user:123456 #增加认证用户
[zk: localhost:2181(CONNECTED) 3] rmr /t6
[zk: localhost:2181(CONNECTED) 4] ls /
[t4, curator, test2, zookeeper, test1, t3]
2.2 Java原生的zookeperAPI的ACL
2.2.1 创建节点回顾
原来我们创建节点的时候如下:
package zookeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
public class BaseAPI
private static ZooKeeper zoo;
final static CountDownLatch connectedSignal = new CountDownLatch(1);
public static ZooKeeper connect(String host) throws IOException, InterruptedException
zoo = new ZooKeeper(host, 5000, new Watcher()
public void process(WatchedEvent event)
if (event.getState() == KeeperState.SyncConnected)
connectedSignal.countDown();
);
connectedSignal.await();
return zoo;
public void close() throws InterruptedException
zoo.close();
public static void create(String path, byte[] data) throws KeeperException, InterruptedException
zoo.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException
final String path = "/t7";
final ZooKeeper connect = connect("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
connect.create(path, "777".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(10 * 1000);
可以看到create方法的第三个参数就是ACL集合,使用的是与zkCli方式一样的word:anyone:crwda 默认方式
如下:
/**
* This is a completely open ACL .
*/
public final ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList<ACL>(
Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));
public interface Perms
int READ = 1 << 0;
int WRITE = 1 << 1;
int CREATE = 1 << 2;
int DELETE = 1 << 3;
int ADMIN = 1 << 4;
int ALL = READ | WRITE | CREATE | DELETE | ADMIN;
public interface Ids
public final Id ANYONE_ID_UNSAFE = new Id("world", "anyone");
public final Id AUTH_IDS = new Id("auth", "");
public final ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList<ACL>(
Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));
public final ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList<ACL>(
Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));
public final ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList<ACL>(
Collections
.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));
自己手动写一个采用IP的方式设置ACL的方法:
package zookeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
public class BaseAPI
private static ZooKeeper zoo;
final static CountDownLatch connectedSignal = new CountDownLatch(1);
public static ZooKeeper connect(String host) throws IOException, InterruptedException
zoo = new ZooKeeper(host, 5000, new Watcher()
public void process(WatchedEvent event)
if (event.getState() == KeeperState.SyncConnected)
connectedSignal.countDown();
);
connectedSignal.await();
return zoo;
public void close() throws InterruptedException
zoo.close();
public static void create(String path, byte[] data) throws KeeperException, InterruptedException
zoo.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException
final String path = "/t9";
final ZooKeeper connect = connect("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
// 创建ACL
ACL acl = new ACL();
// 创建Id,也可以设置构造方法传入scheme和id
Id id = new Id("ip", "192.168.0.164");
acl.setId(id);
acl.setPerms(Perms.ALL);
List<ACL> acls = new ArrayList<>();
acls.add(acl);
connect.create(path, "777".getBytes(), acls, CreateMode.PERSISTENT);
Thread.sleep(10 * 1000);
获取ACL:
package zookeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
public class BaseAPI
private static ZooKeeper zoo;
final static CountDownLatch connectedSignal = new CountDownLatch(1);
public static ZooKeeper connect(String host) throws IOException, InterruptedException
zoo = new ZooKeeper(host, 5000, new Watcher()
public void process(WatchedEvent event)
if (event.getState() == KeeperState.SyncConnected)
connectedSignal.countDown();
);
connectedSignal.await();
return zoo;
public void close() throws InterruptedException
zoo.close();
public static void create(String path, byte[] data) throws KeeperException, InterruptedException
zoo.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException
final String path = "/t9";
final ZooKeeper connect = connect("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
List<ACL> acls = connect.getACL("/t9", connect.exists("/t9", false));
for (ACL acl : acls)
System.out.println(acl.getPerms());
System.out.println(acl.getId());
结果:
31
'ip,'192.168.0.164
2.2.2 ckCli客户端进行验证:
[zk: localhost:2181(CONNECTED) 7] getAcl /t9
'ip,'192.168.0.164
: cdrwa
补充:权限的计算方法:
<<:左移位,在低位处补0; &与(AND),对两个整型操作数中对应位执行布尔代数,两个位都为1时输出1,否则0。
1
10
100
1000
10000
按位与之后是:11111 也就是十进制的31.
2.2.3 修改ACL
修改节点 /t10 节点的acl访问方式采用digest:user:111222
package zookeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
public class BaseAPI
private static ZooKeeper zoo;
final static CountDownLatch connectedSignal = new CountDownLatch(1);
public static ZooKeeper connect(String host) throws IOException, InterruptedException
zoo = new ZooKeeper(host, 5000, new Watcher()
public void process(WatchedEvent event)
if (event.getState() == KeeperState.SyncConnected)
connectedSignal.countDown();
);
connectedSignal.await();
return zoo;
public void close() throws InterruptedException
zoo.close();
public static void create(String path, byte[] data) throws KeeperException, InterruptedException
zoo.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException
final String path = "/t10";
final ZooKeeper connect = connect("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183");
// 创建ACL
ACL acl = new ACL();
// 创建Id,也可以设置构造方法传入scheme和id
Id id = new Id("digest", "user:6DY5WhzOfGsWQ1XFuIyzxkpwdPo=");
acl.setId(id);
acl.setPerms(Perms.ALL);
List<ACL> acls = new ArrayList<>();
acls.add(acl);
// 修改ACL
Stat setACL = connect.setACL(path, acls, connect.exists(path, false).getAversion());
// 获取Acl
System.out.println(connect.getACL(path, setACL));
结果:
[31,s'digest,'user:6DY5WhzOfGsWQ1XFuIyzxkpwdPo=
]
zkCli客户端进行验证:
[zk: localhost:2181(CONNECTED) 26] getAcl /t10
'digest,'user:6DY5WhzOfGsWQ1XFuIyzxkpwdPo=
: cdrwa
2.2.4 访问上面的节点会报错没权限
package zookeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
public class BaseAPI
private static ZooKeeper zoo;
final static CountDownLatch connectedSignal = new CountDownLatch(1);
public static ZooKeeper connect(String host) throws IOException, InterruptedException
zoo = new ZooKeeper(host, 5000, new Watcher()
public void process(WatchedEvent event)
if (event.getState() == KeeperState.SyncConnected)
connectedSign以上是关于Kafka安全机制解析及重构(四) ACL权限控制的主要内容,如果未能解决你的问题,请参考以下文章
Spring cloud微服务安全实战-4-7重构代码以适应真实环境