zookeeper客户端

Posted zhuchangwu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper客户端相关的知识,希望对你有一定的参考价值。

session会话机制

client请求和服务端建立连接,服务端会保留和标记当前client的session,包含session过期时间,sessionId,然后服务端开始在session过期时间的基础上倒计时,在这段时间内,client需要向server发送心跳包,目的是然server重置session过期时间

使用quit命令,退出可端,但是server端的session不会立即消失,使用ls / 依然可以看到创建的临时节点

节点的类型:

  • 持久节点,不加任何参数,默认创建的是持久节点
  • 临时节点: 当客户端断开连接时,这个节点会消失
  • 持久顺序节点: 父节点为他的第一级子节点维护一份时序,标记节点的创建顺序,这个标记其实就是一个数字后缀,作为新节点的名字,数字的范围就是整形的最大值(1024*1024)
  • 临时顺序节点,同上. (临时节点不能再创建的节点)

创建节点时,可以指定每个节点的data信息,zookeeper默认每个节点的数量的最大上限是1M

常用命令

创建节点:

语法 : create /path data

[zk: localhost:2181(CONNECTED) 2] create /changwu 1
Created /changwu
[zk: localhost:2181(CONNECTED) 3] ls /
[changwu, zookeeper]

创建顺序节点

语法create -s /path data

[zk: localhost:2181(CONNECTED) 9] create -s /seq1 1
Created /seq10000000001
[zk: localhost:2181(CONNECTED) 10] ls /
[changwu, zookeeper, seq10000000001]

创建临时节点 (extra)

语法: create -e /path data

[zk: localhost:2181(CONNECTED) 16] create -e /extra1 1
Created /extra1
[zk: localhost:2181(CONNECTED) 17] ls /
[changwu, extra1, zookeeper, seq10000000001]

当客户端断开连接时,临时节点被删除

获取节点和节点指定的元数据

语法: get /path

[zk: localhost:2181(CONNECTED) 22] get /changwu
1  # data 源数据
cZxid = 0x200000008
ctime = Tue Sep 17 12:06:40 CST 2019
mZxid = 0x200000008
mtime = Tue Sep 17 12:06:40 CST 2019
pZxid = 0x200000008
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0

访问临时节点

注意点,访问顺序节点,需要带上 节点的全称

[zk: localhost:2181(CONNECTED) 37] get /seq10000000001
1
cZxid = 0x200000009
ctime = Tue Sep 17 12:08:16 CST 2019
mZxid = 0x200000009
mtime = Tue Sep 17 12:08:16 CST 2019
pZxid = 0x200000009
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0

设置元数据

语法: set /path data

[zk: localhost:2181(CONNECTED) 51] ls / 
[changwu, extra1, zookeeper, seq10000000001]
[zk: localhost:2181(CONNECTED) 52] set /changwu 2 
# 创建znode的事务id
cZxid = 0x200000008 
# znode的创建时间
ctime = Tue Sep 17 12:06:40 CST 2019
# 最后修改的znode的事务id
mZxid = 0x20000000b
# znode的最近修改的时间
mtime = Tue Sep 17 12:15:35 CST 2019
# 最后修改/删除/添加znode的事务id
pZxid = 0x200000008
# 对当前znode 子节点 的修改次数
cversion = 0
# 对当前znode节点data的修改的次数
dataVersion = 1
# 对znode的acl修改的次数
aclVersion = 0
# 如果znode是(ephemeral短暂)类型节点,这就是znode所有者的sessionId
# 如果不是(ephemeral短暂)类型,设置为0
ephemeralOwner = 0x0
# znode数据字段的长度
dataLength = 1
# znode子节点数量
numChildren = 0

创建子节点

注意点,创建子节点时添加上父节点的路径
语法: create / 父节点/子节点 子节点data

[zk: localhost:2181(CONNECTED) 67] create /changwu/child1 firstchild 
Created /changwu/child1

列出子节点

语法: ls /父节点

[zk: localhost:2181(CONNECTED) 70] ls /changwu
[child1]

查看znode的状态

语法: stat /path

[zk: localhost:2181(CONNECTED) 73] stat /changwu
# 创建znode节点的事务id
cZxid = 0x200000008
# znode节点的创建时间
ctime = Tue Sep 17 12:06:40 CST 2019
# 最后修改znode的事务id
mZxid = 0x20000000b
# znode的最后修改时间
mtime = Tue Sep 17 12:15:35 CST 2019
# 最后修还,添加.删除 znode节点的事务id
pZxid = 0x20000000c
# 对当前znode子节点更改的次数
cversion = 1
# 对znode data更改的次数
dataVersion = 1
# 对znode acl 更改的次数
aclVersion = 0
# 如果znode是epemeral类型的节点,则当前值就是znode所有者的sessionid
# 如果znode 不是 ephemeral 类型的,当前值=0
ephemeralOwner = 0x0
# znode data长度
dataLength = 1
# 子节点的数量
numChildren = 1

移除节点1

语法: delete /path
如果当前节点存在子节点,delete 是删除不掉它的

移除节点2

语法: rmr /path

[zk: localhost:2181(CONNECTED) 87] rmr /
changwu          extra1           zookeeper        seq10000000001
[zk: localhost:2181(CONNECTED) 87] rmr /extra1
[zk: localhost:2181(CONNECTED) 88] ls /
[changwu, zookeeper, seq10000000001]

什么是ACL(access control list 访问控制列表)

zookeeper在分布式系统中承担中间件的作用,它管理的每一个节点上可能都存储这重要的信息,因为应用可以读取到任意节点,这就可能造成安全问题,ACL的作用就是帮助zookeeper实现权限控制

  • zookeeper的权限控制基于节点,每个znode可以有不同的权限
  • 子节点不会继承父节点的权限, 访问不了不节点,不代表访问不到子节点

写法: schema??permission

Schema: 鉴权的策略

  • world: 只有一个用户 anyone: 所有人:权限
  • ip: 使用ip地址认证
  • auth: 使用已经添加的认证用户认证
  • disgest: 使用"用户名:密码"方式认证

    授权对象 ID

权限Permission

  • create / c 可以创建子节点
  • delete / d 仅可以删除子节点
  • read / r 读取当前节点数据,及子节点列表
  • write / w 设置节点数据
  • admin / a 设置节点的访问控制列表

查看ACL

语法getAcl /parentpath

[zk: localhost:2181(CONNECTED) 94] getAcl /changwu
'world,'anyone
: cdrwa

设置ACL

[zk: localhost:2181(CONNECTED) 102] setAcl /changwu world:anyone:wa
cZxid = 0x200000008
ctime = Tue Sep 17 12:06:40 CST 2019
mZxid = 0x20000000b
mtime = Tue Sep 17 12:15:35 CST 2019
pZxid = 0x20000000c
cversion = 1
dataVersion = 1
aclVersion = 1
ephemeralOwner = 0x0
dataLength = 1
numChildren = 1

再次尝试获取,就没有读权限了,还掉了线

[zk: localhost:2181(CONNECTED) 115] get /changwu
Authentication is not valid : /changwu
添加用户,并登录
[zk: localhost:2181(CONNECTED) 0] addauth digest lisi:123456
使用第四种授权策略
[zk: localhost:2181(CONNECTED) 2] get[zk: localhost:2181(CONNECTED) 0] addauth digest lisi:123456

将这个授权添加到本次会话后就可以读了
[zk: localhost:2181(CONNECTED) 4] get /changwu
2
cZxid = 0x200000008
ctime = Tue Sep 17 12:06:40 CST 2019
mZxid = 0x20000000b
mtime = Tue Sep 17 12:15:35 CST 2019
pZxid = 0x20000000c
cversion = 1
dataVersion = 1
aclVersion = 2
ephemeralOwner = 0x0
dataLength = 1
numChildren = 1

zookeeper的客户端

原生客户端 ZooKeeper

Zookeeper 存在一个问题,session有存在丢弃的可能性

client--网络中断-->server client和server通过网络发送心跳保持沟通,如果因为网络的原因,client发送的心跳包都没有到达server端, server端的session倒计时机制就会把 session 和 watch全部清除

  • 原生zookeeper客户端
  public static void main(String[] args) throws Exception {
   ZooKeeper client = new ZooKeeper("localhost", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.err.println("连接,触发");
            }
        });
  Stat stat = new Stat();
  • 创建客户端
   /**
     * 1. path  
     * 2. data  
     * 3. acl 安全模式
     * 4. 持久化/临时 策略
     */
  client.create("/node2","value of node2".getBytes(), (List<ACL>) ZooDefs.Ids.ANYONE_ID_UNSAFE,CreateMode.PERSISTENT);
  • 获取节点数据
client.getData("/node1",true,stat);
  • 监听回调
 String content = new String(  client.getData("/node1", new Watcher() {
            @Override
    public void process(WatchedEvent event) {
        // todo 任何连接上这个节点的客户端修改了这个节点的 data数据,都会引起process函数的回调
        // todo 特点1:  watch只能使用1次
        if (event.getType().equals(Event.EventType.NodeDataChanged)){
            System.err.println("当前节点数据发生了改变");
        }
    }
}, new Stat()));
  • 非阻塞,直接回调
  client.getData("/node1", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
                System.out.println("123123");
            }
        },null);

Curator

  • 提供了多种zookeeper集群leader的选举机制
  • 封装了原生的zookeeper client 和 zookeeper server
  • Fluent Api 流式api

  • java API创建客户端
    public static void main(String[] args) throws Exception {
        // 初始化客户端
        // 总共重试三次,每次一秒
         CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new RetryNTimes(3,100));
         client.start();  // todo instance must be started before calling this method
        // 创建一个节点
        //client.create().withMode(CreateMode.PERSISTENT).forPath("/node3","value of node3".getBytes());

        // 订阅
        String path = "/node1";
        
        /**
         *  todo NodeCache就是对这个path节点内存的缓存
         *  第一次执行会触发, 而且,以后每次node被改变都会执行
         */
        NodeCache  cache = new NodeCache(client,path);

        // cache.start(true); 启动时会同步数据到缓存中, 既然数据一致了,启动时,下面的回调就不会触发
        // cache.start(false); 默认是false, 不会触发
        cache.start();
        cache.getListenable().addListener(new NodeCacheListener(){
            // todo 监听节点值的改变
            @Override
            public void nodeChanged() {
                System.err.println("nodeChanged 执行了");
            }
        });
        
        System.in.read();
    }

也可以使用仅回调一次的api

// 和原生的客户端一样,仅仅触发一次
 client.getData().usingWatcher(new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.err.println("process...");
        }
    }).forPath(path);

Curator解决session会话失效

    public static void main(String[] args) throws Exception {
         CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",new RetryNTimes(3,100));
        client.start();  // todo instance must be started before calling this method

        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                if(connectionState.equals(ConnectionState.LOST)){
                    // todo 重新连接
                }
            }
        });
        // todo dosomething
    }

以上是关于zookeeper客户端的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper3.5.7版本——客户端 API 操作(代码示例)

在ansible模板中使用动态组名称

Zookeeper--07---案例----服务器动态上下线

Zookeeper 的 Golang 客户端

zookeeper开源客户端curator

Zookeeper——服务器动态上下线客户端动态监听