使用ZooKeeper提供的Java API操作ZooKeeper

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用ZooKeeper提供的Java API操作ZooKeeper相关的知识,希望对你有一定的参考价值。

建立客户端与zk服务端的连接

我们先来创建一个普通的maven工程,然后在pom.xml文件中配置zookeeper依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.11</version>
    </dependency>
</dependencies>

在resources目录下创建一个zk-connect.properties属性配置文件,我们在该文件中填写连接zookeeper服务器的一些配置信息。如下:

# zk.zkServerIp=192.168.190.129:2181  单机模式
zk.zkServerIps=192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181
zk.timeout=5000

注:我这里使用的集群模式,所以是多个IP。

zookeeper使用的是log4j作为日志打印工具,所以我们还需要在resources目录下创建log4j的

log4j.rootLogger=WARN,console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.encoding=UTF-8
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%l] - [%p] %m%n

然后创建一个连接类demo,代码如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.Properties;

/**
 * @Description: zookeeper 连接demo演示
 */
public class ZKConnect implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZKConnect.class);

    // private static String zkServerIp;  单机模式是一个ip

    // 集群模式则是多个ip
    private static String zkServerIps;

    // 连接超时时间
    private static Integer timeout;

    // 加载配置信息
    static {
        Properties properties = new Properties();
        InputStream inputStream = Object.class.getResourceAsStream("/zk-connect.properties");
        try {
            properties.load(inputStream);

            // zkServerIp = properties.getProperty("zk.zkServerIp");
            zkServerIps = properties.getProperty("zk.zkServerIps");
            timeout = Integer.parseInt(properties.getProperty("zk.timeout"));
        } catch (Exception e) {
            logger.error("配置文件读取异常", e);
        } finally {
            try {
                if (inputStream != null) {
                    inputStream.close();
                }
            } catch (IOException e) {
                logger.error("关闭流失败", e);
            }
        }
    }

    // Watch事件通知
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        /**
         * 客户端和zk服务端链接是一个异步的过程
         * 当连接成功后后,客户端会收的一个watch通知
         *
         * 参数:
         * connectString:连接服务器的ip字符串,
         *      比如: "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181"
         *      可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群
         *      也可以在ip后加路径
         * sessionTimeout:超时时间,心跳收不到了,那就超时
         * watcher:通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,那就设置为null
         * canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
         *                         此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用
         * sessionId:会话的id
         * sessionPasswd:会话密码   当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话
         */

        // 实例化zookeeper客户端
        ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKConnect());

        logger.warn("客户端开始连接zookeeper服务器...");
        logger.warn("连接状态:{}", zooKeeper.getState());

        // 避免发出连接请求就断开,不然就无法正常连接也无法获取watch事件的通知
        Thread.sleep(2000);

        logger.warn("连接状态:{}", zooKeeper.getState());
    }
}

运行该类后,控制台输出日志信息如下:

2018-04-25 10:41:32,488 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:76)] - [WARN] 客户端开始连接zookeeper服务器...
2018-04-25 10:41:32,505 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:77)] - [WARN] 连接状态:CONNECTING
2018-04-25 10:41:32,515 [main-EventThread] [org.zero01.zk.demo.ZKConnect.process(ZKConnect.java:52)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 10:41:34,507 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:81)] - [WARN] 连接状态:CONNECTED

这样,我们就完成了一个与zookeeper服务端建立连接的过程。


zk会话重连机制

上一节我们简单演示了如何去连接zk服务端,本节则介绍一下,如何通过sessionid和session密码去恢复上一次的会话,也就是zk的会话重连机制。

新建一个类,用做于演示zk会话重连机制的demo:

package org.zero01.zk.demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * @program: zookeeper-connection
 * @description: zookeeper 恢复之前的会话连接demo演示
 * @author: 01
 * @create: 2018-04-25 12:59
 **/
public class ZKConnectSessionWatcher implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);

    // 集群模式则是多个ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超时时间
    private static final Integer timeout = 5000;

    // Watch事件通知
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        // 实例化zookeeper客户端
        ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKConnectSessionWatcher());

        logger.warn("客户端开始连接zookeeper服务器...");
        logger.warn("连接状态:{}", zooKeeper.getState());
        Thread.sleep(2000);
        logger.warn("连接状态:{}", zooKeeper.getState());

        // 记录本次会话的sessionId
        long sessionId = zooKeeper.getSessionId();
        // 转换成16进制进行打印
        logger.warn("sid:{}", "0x" + Long.toHexString(sessionId));
        // 记录本次会话的session密码
        byte[] sessionPassword = zooKeeper.getSessionPasswd();

        Thread.sleep(200);

        // 开始会话重连
        logger.warn("开始会话重连...");
        // 加上sessionId和password参数去实例化zookeeper客户端
        ZooKeeper zkSession = new ZooKeeper(zkServerIps, timeout, new ZKConnectSessionWatcher(), sessionId, sessionPassword);
        logger.warn("重新连接状态zkSession:{}", zkSession.getState());
        Thread.sleep(2000);
        logger.warn("重新连接状态zkSession:{}", zkSession.getState());
    }
}

运行该类,控制台输出日志结果如下:

2018-04-25 13:48:00,931 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:35)] - [WARN] 客户端开始连接zookeeper服务器...
2018-04-25 13:48:00,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:36)] - [WARN] 连接状态:CONNECTING
2018-04-25 13:48:00,951 [main-EventThread] [org.zero01.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:28)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 13:48:02,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:38)] - [WARN] 连接状态:CONNECTED
2018-04-25 13:48:02,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:43)] - [WARN] sid:0x10000e81cfa0002
2018-04-25 13:48:03,136 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:50)] - [WARN] 开始会话重连...
2018-04-25 13:48:03,137 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:53)] - [WARN] 重新连接状态zkSession:CONNECTING
2018-04-25 13:48:03,142 [main-EventThread] [org.zero01.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:28)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 13:48:05,140 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:55)] - [WARN] 重新连接状态zkSession:CONNECTED

同步/异步创建zk节点

以上我们介绍了如何去连接和重连zk服务端,既然知道如何连接zk服务端之后,我们来看一下如何,同步或异步去创建zk节点。

先演示同步创建zk节点的方式,创建一个demo类如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

/**
 * @program: zookeeper-connection
 * @description:  演示同步异步创建zk节点
 * @author: 01
 * @create: 2018-04-25 13:51
 **/
public class ZkNodeOperator implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZkNodeOperator.class);

    // 集群模式则是多个ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超时时间
    private static final Integer timeout = 5000;
    private ZooKeeper zooKeeper;

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public ZkNodeOperator() {
    }

    public ZkNodeOperator(String connectStr) {
        try {
            // 在使用该构造器的时候,实例化zk客户端对象
            zooKeeper = new ZooKeeper(connectStr, timeout, new ZkNodeOperator());
        } catch (IOException e) {
            e.printStackTrace();
            try {
                if (zooKeeper != null) {
                    zooKeeper.close();
                }
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    /**
     * @Title: ZKOperatorDemo.java
     * @Description: 创建zk节点
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls) {
        String result = "";
        try {
            /**
             * 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
             * 参数:
             * path:节点创建的路径
             * data:节点所存储的数据的byte[]
             * acl:控制权限策略
             *          Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
             *          CREATOR_ALL_ACL --> auth:user:password:cdrwa
             * createMode:节点类型, 是一个枚举
             *          PERSISTENT:持久节点
             *          PERSISTENT_SEQUENTIAL:持久顺序节点
             *          EPHEMERAL:临时节点
             *          EPHEMERAL_SEQUENTIAL:临时顺序节点
             */
            // 同步创建zk节点,节点类型为临时节点
            result = zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL);
            System.out.println("创建节点:\t" + result + "\t成功...");
            Thread.sleep(2000);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ZkNodeOperator zkServer = new ZkNodeOperator(zkServerIps);

        // 创建zk节点
        zkServer.createZKNode("/testNode", "testNode-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
    }
}

运行该类,到服务器上查看是否已创建成功。如下,我这里是创建成功的:

[[email protected] ~]# zkCli.sh
[zk: localhost:2181(CONNECTED) 7] ls /
[zookeeper, data, real-culster, testNode]
[zk: localhost:2181(CONNECTED) 8] ls /
[zookeeper, data, real-culster]  # 因为是临时节点,所以客户端断开之后就消失了
[zk: localhost:2181(CONNECTED) 9] quit
[[email protected] ~]# 

控制台输出的日志信息如下:

2018-04-25 14:16:47,726 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:56)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
创建节点:   /testNode   成功...

接下来我们演示一下异步创建zk节点的方式,因为异步创建有一个回调函数,所以我们得先创建一个类,实现StringCallback接口里面的回调方法:

package org.zero01.zk.demo;

import org.apache.zookeeper.AsyncCallback.StringCallback;

public class CreateCallBack implements StringCallback {

    // 回调函数
    public void processResult(int rc, String path, Object ctx, String name) {
        System.out.println("创建节点:" + path);
        System.out.println((String) ctx);
    }
}

修改 ZkNodeOperator 类中的 createZKNode 方法代码如下:

...
public class ZkNodeOperator implements Watcher {
    ...
    /**
     * @Title: ZKOperatorDemo.java
     * @Description: 创建zk节点
     */
    public void createZKNode(String path, byte[] data, List<ACL> acls) {
        try {
            ...
            // 异步步创建zk节点,节点类型为持久节点
            String ctx = "{‘create‘:‘success‘}";
            zooKeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);

            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}    

运行该类,然后到服务器上查看是否已创建成功。如下,我这里是创建成功的:

[zk: localhost:2181(CONNECTED) 9] ls /
[zookeeper, data, real-culster, testNode]
[zk: localhost:2181(CONNECTED) 10] get /testNode
testNode-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x700000014
mtime = Wed Apr 25 22:17:26 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 0
[zk: localhost:2181(CONNECTED) 11] 

控制台输出的日志信息如下:

2018-04-25 14:25:14,923 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:56)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
创建节点:/testNode
{‘create‘:‘success‘}

同步/异步修改zk节点数据

同样的,我们也可以通过Zookeeper提供的Java API去修改zk节点的数据,也是有同步和异步两种方式,先来演示同步的方式。创建一个新的类,代码如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * @program: zookeeper-connection
 * @description: 修改zk节点数据演示
 * @author: 01
 * @create: 2018-04-25 16:25
 **/
public class ZKNodeAlterOperator implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZKNodeAlterOperator.class);

    // 集群模式则是多个ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超时时间
    private static final Integer timeout = 5000;
    private ZooKeeper zooKeeper;

    public ZooKeeper getZooKeeper() {
        return zooKeeper;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public ZKNodeAlterOperator() {
    }

    public ZKNodeAlterOperator(String connectStr) {
        try {
            // 在使用该构造器的时候,实例化zk客户端对象
            zooKeeper = new ZooKeeper(connectStr, timeout, new ZKNodeAlterOperator());
        } catch (IOException e) {
            e.printStackTrace();
            try {
                if (zooKeeper != null) {
                    zooKeeper.close();
                }
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
    }

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    public static void main(String[] args) throws KeeperException, InterruptedException {
        ZKNodeAlterOperator zkServer = new ZKNodeAlterOperator(zkServerIps);

        /**
         * 修改zk节点数据(同步)
         * 参数:
         * path:节点路径
         * data:新数据
         * version 数据版本
         */
        Stat status = zkServer.getZooKeeper().setData("/testNode", "this is new data".getBytes(), 0);
        // 通过Stat对象可以获取znode所有的状态属性,这里以version为例
        System.out.println("修改成功,当前数据版本为:" + status.getVersion());
    }
}

运行该类,到服务器上查看节点是否已成功修改数据。如下,我这里是修改成功的:

[zk: localhost:2181(CONNECTED) 12] get /testNode
this is new data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x700000017
mtime = Thu Apr 26 00:21:54 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 16
numChildren = 0
[zk: localhost:2181(CONNECTED) 13] 

控制台输出的日志信息如下:

2018-04-25 16:30:02,111 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:57)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
修改成功,当前数据版本为:1

接下来演示一下异步修改zk节点数据的方式,和异步创建节点是几乎一样的。也是需要新建一个类来实现回调接口的方法,只不过接口不一样而已。如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.data.Stat;

public class AlterCallBack implements StatCallback {

    // 回调函数
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        System.out.println("修改节点:" + path + "成功...");
        // 通过Stat对象可以获取znode所有的状态属性,这里以version为例
        System.out.println("当前数据版本为:" + stat.getVersion());
        System.out.println((String) ctx);
    }
}

然后修改 ZKNodeAlterOperator 类中的main方法代码如下:

...
public class ZKNodeAlterOperator implements Watcher {
    ...
    public static void main(String[] args) throws KeeperException, InterruptedException {
        ZKNodeAlterOperator zkServer = new ZKNodeAlterOperator(zkServerIps);

        /**
         * 修改zk节点数据(异步)
         * 参数:
         * path:节点路径
         * data:新数据
         * version: 数据版本
         * sc:实现回调函数的对象
         * ctx:给回调函数的上下文
         */
        String ctx = "{‘alter‘:‘success‘}";
        zkServer.getZooKeeper().setData("/testNode", "asynchronous-data".getBytes(), 0, new AlterCallBack(), ctx);

        Thread.sleep(2000);
    }
}

运行该类,到服务器上查看节点是否已成功修改数据。如下,我这里是修改成功的:

[zk: localhost:2181(CONNECTED) 16] get /testNode
asynchronous-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x70000001a
mtime = Thu Apr 26 00:35:53 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 0
[zk: localhost:2181(CONNECTED) 17] 

控制台输出的日志信息如下:

2018-04-25 16:44:03,472 [main-EventThread] [org.zero01.zk.demo.ZKNodeAlterOperator.process(ZKNodeAlterOperator.java:58)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
修改节点:/testNode成功...
当前数据版本为:2
{‘alter‘:‘success‘}

同步/异步删除zk节点

同样的,删除节点也有同步和异步两种方式,在删除节点操作上,使用异步会更人性化一些,因为有回调通知,同步的方式,除了设置了watch事件,不然是没有通知的。我们先来看一下同步方式的删除节点,代码如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKNodeDeleteOperator implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZKNodeDeleteOperator.class);

    // 集群模式则是多个ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超时时间
    private static final Integer timeout = 5000;
    private static ZooKeeper zooKeeper;

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeAlterOperator());
        // 创建节点
        zooKeeper.create("/testDeleteNode", "test-delete-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        Thread.sleep(1000);
        /**
         * 删除节点(同步)
         * 参数:
         * path:需要删除的节点路径
         * version:数据版本
         */
        zooKeeper.delete("/testDeleteNode", 0);

        zooKeeper.close();
    }
}

由于同步的删除方法不会有返回值,所以我们无法在控制台输出内容。

然后再来看一下异步方式的删除节点,首先需要新建一个类实现回调接口的方法:

package org.zero01.zk.demo;

import org.apache.zookeeper.AsyncCallback.VoidCallback;

public class DeleteCallBack implements VoidCallback {

    // 回调函数
    public void processResult(int rc, String path, Object ctx) {
        System.out.println("删除节点:" + path + " 成功...");
        System.out.println((String) ctx);
    }
}

然后修改一下 ZKNodeDeleteOperator 类的main方法:

public class ZKNodeDeleteOperator implements Watcher {
    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeAlterOperator());
        // 创建节点
        zooKeeper.create("/testDeleteNode", "test-delete-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        Thread.sleep(1000);
        /**
         * 删除节点(异步)
         * 参数:
         * path:需要删除的节点路径
         * version:数据版本
         * sc:实现回调函数的对象
         * ctx:给回调函数的上下文
         */
        String ctx = "{‘delete‘:‘success‘}";
        zooKeeper.delete("/testDeleteNode", 0, new DeleteCallBack(), ctx);
        Thread.sleep(2000);
        zooKeeper.close();
    }
}

运行该类,控制台输出结果如下:

删除节点:/testDeleteNode 成功...
{‘delete‘:‘success‘}

获取zk节点数据

以上小节介绍完了增删改,现在就剩下查了。同样的查询也有同步和异步两种方式,异步的方式在之前的增删改例子中已经都介绍过了,在查询里使用异步也是和增删改同样的方式,所以就不再演示查询的异步了。zk中有三种数据可以查询:查询zk节点数据、查询zk子节点列表、查询某个zk节点是否存在。本节先介绍如何查询zk节点数据。

现在zookeeper服务器上,有一个/testNode节点。节点数据内容如下:

[zk: localhost:2181(CONNECTED) 3] get /testNode
asynchronous-data
...
[zk: localhost:2181(CONNECTED) 4] 

然后我们来编写一个 ZKGetNodeData 类,调用zookeeper的API去获取zk节点数据。代码示例:

package org.zero01.zk.demo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @program: zookeeper-connection
 * @description: 获取zk节点数据demo
 * @author: 01
 * @create: 2018-04-26 18:05
 **/
public class ZKGetNodeData implements Watcher {

    private static final Logger logger = LoggerFactory.getLogger(ZKGetNodeData.class);

    // 集群模式则是多个ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超时时间
    private static final Integer timeout = 5000;
    private static ZooKeeper zooKeeper;
    private static Stat stat = new Stat();

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
        logger.warn("接收到watch通知:{}", watchedEvent);
    }

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetNodeData());

        /**
         * 参数:
         * path:节点路径
         * watch:true或者false,注册一个watch事件
         * stat:状态,我们可以通过这个对象获取节点的状态信息
         */
        byte[] resByte = zooKeeper.getData("/testNode", true, stat);
        String result = new String(resByte);
        System.out.println("/testNode 节点的数据: " + result);

        zooKeeper.close();
    }
}

控制台输出结果如下:

/testNode 节点的值: asynchronous-data

通过实现 Watcher 接口的通知方法,再结合这个获取节点数据的API,我们就可以在数据发生改变的时候获取最新的数据。如下示例,在 ZKGetNodeData 类中,增加代码如下:

...
public class ZKGetNodeData implements Watcher {
    ...
    // 计数器
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
        try {
            if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
                ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetNodeData());
                byte[] resByte = zooKeeper.getData("/testNode", false, stat);
                String result = new String(resByte);
                System.out.println("/testNode 节点的数据发生了变化");
                System.out.println("新的数据为: " + result);
                System.out.println("新的数据版本号为:" + stat.getVersion());

                // 通知完之后,计数器减一
                countDownLatch.countDown();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        ...
        // 等待线程执行
        countDownLatch.await();
    }
}

这时候由于我们在main里调用了await()方法,所以主线程会阻塞。然后我们到zookeeper服务器上,对该节点的数据进行操作,如下:

[zk: localhost:2181(CONNECTED) 11] get /testNode
asynchronous-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x800000011
mtime = Fri Apr 27 03:04:09 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 6
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 0
[zk: localhost:2181(CONNECTED) 12] set /testNode new-data 6       
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x800000013
mtime = Fri Apr 27 03:04:35 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 7
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0
[zk: localhost:2181(CONNECTED) 13]

当我们修改了数据之后,控制台就会输出如下内容,主线程就会解除阻塞结束执行:

/testNode 节点的数据: asynchronous-data
/testNode 节点的数据发生了变化
新的数据为: new-data
新的数据版本号为:7

获取zk子节点列表

本节介绍一下如何获取zk子节点列表,同样的也是有同步和异步两种方式,这里介绍的是同步的。testNode节点下有三个节点,如下:

[zk: localhost:2181(CONNECTED) 20] ls /testNode
[ThreeNode, TwoNode, OneNode]
[zk: localhost:2181(CONNECTED) 21] 

我们来写一个demo获取这个节点下的子节点列表。代码如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.List;

/**
 * @program: zookeeper-connection
 * @description:  zookeeper 获取子节点数据的demo演示
 * @author: 01
 * @create: 2018-04-26 21:13
 **/
public class ZKGetChildrenList implements Watcher{

    // 集群模式则是多个ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超时时间
    private static final Integer timeout = 5000;
    private static ZooKeeper zooKeeper;

    // Watch事件通知方法
    public void process(WatchedEvent watchedEvent) {
    }

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetChildrenList());

        /**
         * 参数:
         * path:父节点路径
         * watch:true或者false,注册一个watch事件
         */
        List<String> strChildList = zooKeeper.getChildren("/testNode", false);
        for (String s : strChildList) {
            System.out.println(s);
        }
    }
}

控制台就会输出内容如下:

ThreeNode
TwoNode
OneNode

判断zk节点是否存在

最后介绍如何判断一个zk节点是否存在,同样的也是有同步和异步两种方式,这里介绍的是同步的。我们来写一个demo判断某个zk节点是否存在。代码如下:

package org.zero01.zk.demo;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * @program: zookeeper-connection
 * @description: zookeeper 判断节点是否存在demo
 * @author: 01
 * @create: 2018-04-26 22:06
 **/
public class ZKNodeExist implements Watcher {

    // 集群模式则是多个ip
    private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
    // 超时时间
    private static final Integer timeout = 5000;
    private static ZooKeeper zooKeeper;

    public void process(WatchedEvent watchedEvent) {
    }

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeExist());

        /**
         * 参数:
         * path:节点路径
         * watch:true或者false,注册一个watch事件
         */
        Stat stat = zooKeeper.exists("/testNode", true);
        if (stat != null) {
            System.out.println("testNode 节点存在...");
            System.out.println("该节点的数据版本为:" + stat.getVersion());
        } else {
            System.out.println("该节点不存在...");
        }
    }
}

运行该类,控制台输出如下:

testNode 节点存在...
该节点的数据版本为:7

将testNode换成一个不存在的节点,运行该类,控制台输出如下:

该节点不存在...

以上是关于使用ZooKeeper提供的Java API操作ZooKeeper的主要内容,如果未能解决你的问题,请参考以下文章

使用Java API操作zookeeper的acl权限

Zookeeper详解-API

Zookeeper API

2021年大数据ZooKeeper:ZooKeeper Java API操作

Zookeeper 提供的API

Hadoop_简单操作ZooKeeper