使用ZooKeeper提供的Java API操作ZooKeeper
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用ZooKeeper提供的Java API操作ZooKeeper相关的知识,希望对你有一定的参考价值。
建立客户端与zk服务端的连接
我们先来创建一个普通的maven工程,然后在pom.xml文件中配置zookeeper依赖:
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
</dependencies>
在resources目录下创建一个zk-connect.properties属性配置文件,我们在该文件中填写连接zookeeper服务器的一些配置信息。如下:
# zk.zkServerIp=192.168.190.129:2181 单机模式
zk.zkServerIps=192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181
zk.timeout=5000
注:我这里使用的集群模式,所以是多个IP。
zookeeper使用的是log4j作为日志打印工具,所以我们还需要在resources目录下创建log4j的
log4j.rootLogger=WARN,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.encoding=UTF-8
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%l] - [%p] %m%n
然后创建一个连接类demo,代码如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.Properties;
/**
* @Description: zookeeper 连接demo演示
*/
public class ZKConnect implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZKConnect.class);
// private static String zkServerIp; 单机模式是一个ip
// 集群模式则是多个ip
private static String zkServerIps;
// 连接超时时间
private static Integer timeout;
// 加载配置信息
static {
Properties properties = new Properties();
InputStream inputStream = Object.class.getResourceAsStream("/zk-connect.properties");
try {
properties.load(inputStream);
// zkServerIp = properties.getProperty("zk.zkServerIp");
zkServerIps = properties.getProperty("zk.zkServerIps");
timeout = Integer.parseInt(properties.getProperty("zk.timeout"));
} catch (Exception e) {
logger.error("配置文件读取异常", e);
} finally {
try {
if (inputStream != null) {
inputStream.close();
}
} catch (IOException e) {
logger.error("关闭流失败", e);
}
}
}
// Watch事件通知
public void process(WatchedEvent watchedEvent) {
logger.warn("接收到watch通知:{}", watchedEvent);
}
public static void main(String[] args) throws IOException, InterruptedException {
/**
* 客户端和zk服务端链接是一个异步的过程
* 当连接成功后后,客户端会收的一个watch通知
*
* 参数:
* connectString:连接服务器的ip字符串,
* 比如: "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181"
* 可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群
* 也可以在ip后加路径
* sessionTimeout:超时时间,心跳收不到了,那就超时
* watcher:通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,那就设置为null
* canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
* 此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用
* sessionId:会话的id
* sessionPasswd:会话密码 当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话
*/
// 实例化zookeeper客户端
ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKConnect());
logger.warn("客户端开始连接zookeeper服务器...");
logger.warn("连接状态:{}", zooKeeper.getState());
// 避免发出连接请求就断开,不然就无法正常连接也无法获取watch事件的通知
Thread.sleep(2000);
logger.warn("连接状态:{}", zooKeeper.getState());
}
}
运行该类后,控制台输出日志信息如下:
2018-04-25 10:41:32,488 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:76)] - [WARN] 客户端开始连接zookeeper服务器...
2018-04-25 10:41:32,505 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:77)] - [WARN] 连接状态:CONNECTING
2018-04-25 10:41:32,515 [main-EventThread] [org.zero01.zk.demo.ZKConnect.process(ZKConnect.java:52)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 10:41:34,507 [main] [org.zero01.zk.demo.ZKConnect.main(ZKConnect.java:81)] - [WARN] 连接状态:CONNECTED
这样,我们就完成了一个与zookeeper服务端建立连接的过程。
zk会话重连机制
上一节我们简单演示了如何去连接zk服务端,本节则介绍一下,如何通过sessionid和session密码去恢复上一次的会话,也就是zk的会话重连机制。
新建一个类,用做于演示zk会话重连机制的demo:
package org.zero01.zk.demo;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @program: zookeeper-connection
* @description: zookeeper 恢复之前的会话连接demo演示
* @author: 01
* @create: 2018-04-25 12:59
**/
public class ZKConnectSessionWatcher implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);
// 集群模式则是多个ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超时时间
private static final Integer timeout = 5000;
// Watch事件通知
public void process(WatchedEvent watchedEvent) {
logger.warn("接收到watch通知:{}", watchedEvent);
}
public static void main(String[] args) throws IOException, InterruptedException {
// 实例化zookeeper客户端
ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKConnectSessionWatcher());
logger.warn("客户端开始连接zookeeper服务器...");
logger.warn("连接状态:{}", zooKeeper.getState());
Thread.sleep(2000);
logger.warn("连接状态:{}", zooKeeper.getState());
// 记录本次会话的sessionId
long sessionId = zooKeeper.getSessionId();
// 转换成16进制进行打印
logger.warn("sid:{}", "0x" + Long.toHexString(sessionId));
// 记录本次会话的session密码
byte[] sessionPassword = zooKeeper.getSessionPasswd();
Thread.sleep(200);
// 开始会话重连
logger.warn("开始会话重连...");
// 加上sessionId和password参数去实例化zookeeper客户端
ZooKeeper zkSession = new ZooKeeper(zkServerIps, timeout, new ZKConnectSessionWatcher(), sessionId, sessionPassword);
logger.warn("重新连接状态zkSession:{}", zkSession.getState());
Thread.sleep(2000);
logger.warn("重新连接状态zkSession:{}", zkSession.getState());
}
}
运行该类,控制台输出日志结果如下:
2018-04-25 13:48:00,931 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:35)] - [WARN] 客户端开始连接zookeeper服务器...
2018-04-25 13:48:00,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:36)] - [WARN] 连接状态:CONNECTING
2018-04-25 13:48:00,951 [main-EventThread] [org.zero01.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:28)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 13:48:02,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:38)] - [WARN] 连接状态:CONNECTED
2018-04-25 13:48:02,935 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:43)] - [WARN] sid:0x10000e81cfa0002
2018-04-25 13:48:03,136 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:50)] - [WARN] 开始会话重连...
2018-04-25 13:48:03,137 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:53)] - [WARN] 重新连接状态zkSession:CONNECTING
2018-04-25 13:48:03,142 [main-EventThread] [org.zero01.zk.demo.ZKConnectSessionWatcher.process(ZKConnectSessionWatcher.java:28)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-04-25 13:48:05,140 [main] [org.zero01.zk.demo.ZKConnectSessionWatcher.main(ZKConnectSessionWatcher.java:55)] - [WARN] 重新连接状态zkSession:CONNECTED
同步/异步创建zk节点
以上我们介绍了如何去连接和重连zk服务端,既然知道如何连接zk服务端之后,我们来看一下如何,同步或异步去创建zk节点。
先演示同步创建zk节点的方式,创建一个demo类如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* @program: zookeeper-connection
* @description: 演示同步异步创建zk节点
* @author: 01
* @create: 2018-04-25 13:51
**/
public class ZkNodeOperator implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZkNodeOperator.class);
// 集群模式则是多个ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超时时间
private static final Integer timeout = 5000;
private ZooKeeper zooKeeper;
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
public ZkNodeOperator() {
}
public ZkNodeOperator(String connectStr) {
try {
// 在使用该构造器的时候,实例化zk客户端对象
zooKeeper = new ZooKeeper(connectStr, timeout, new ZkNodeOperator());
} catch (IOException e) {
e.printStackTrace();
try {
if (zooKeeper != null) {
zooKeeper.close();
}
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
logger.warn("接收到watch通知:{}", watchedEvent);
}
/**
* @Title: ZKOperatorDemo.java
* @Description: 创建zk节点
*/
public void createZKNode(String path, byte[] data, List<ACL> acls) {
String result = "";
try {
/**
* 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
* 参数:
* path:节点创建的路径
* data:节点所存储的数据的byte[]
* acl:控制权限策略
* Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
* CREATOR_ALL_ACL --> auth:user:password:cdrwa
* createMode:节点类型, 是一个枚举
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点
*/
// 同步创建zk节点,节点类型为临时节点
result = zooKeeper.create(path, data, acls, CreateMode.EPHEMERAL);
System.out.println("创建节点:\t" + result + "\t成功...");
Thread.sleep(2000);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ZkNodeOperator zkServer = new ZkNodeOperator(zkServerIps);
// 创建zk节点
zkServer.createZKNode("/testNode", "testNode-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
}
运行该类,到服务器上查看是否已创建成功。如下,我这里是创建成功的:
[[email protected] ~]# zkCli.sh
[zk: localhost:2181(CONNECTED) 7] ls /
[zookeeper, data, real-culster, testNode]
[zk: localhost:2181(CONNECTED) 8] ls /
[zookeeper, data, real-culster] # 因为是临时节点,所以客户端断开之后就消失了
[zk: localhost:2181(CONNECTED) 9] quit
[[email protected] ~]#
控制台输出的日志信息如下:
2018-04-25 14:16:47,726 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:56)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
创建节点: /testNode 成功...
接下来我们演示一下异步创建zk节点的方式,因为异步创建有一个回调函数,所以我们得先创建一个类,实现StringCallback接口里面的回调方法:
package org.zero01.zk.demo;
import org.apache.zookeeper.AsyncCallback.StringCallback;
public class CreateCallBack implements StringCallback {
// 回调函数
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("创建节点:" + path);
System.out.println((String) ctx);
}
}
修改 ZkNodeOperator 类中的 createZKNode 方法代码如下:
...
public class ZkNodeOperator implements Watcher {
...
/**
* @Title: ZKOperatorDemo.java
* @Description: 创建zk节点
*/
public void createZKNode(String path, byte[] data, List<ACL> acls) {
try {
...
// 异步步创建zk节点,节点类型为持久节点
String ctx = "{‘create‘:‘success‘}";
zooKeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行该类,然后到服务器上查看是否已创建成功。如下,我这里是创建成功的:
[zk: localhost:2181(CONNECTED) 9] ls /
[zookeeper, data, real-culster, testNode]
[zk: localhost:2181(CONNECTED) 10] get /testNode
testNode-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x700000014
mtime = Wed Apr 25 22:17:26 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 0
[zk: localhost:2181(CONNECTED) 11]
控制台输出的日志信息如下:
2018-04-25 14:25:14,923 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:56)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
创建节点:/testNode
{‘create‘:‘success‘}
同步/异步修改zk节点数据
同样的,我们也可以通过Zookeeper提供的Java API去修改zk节点的数据,也是有同步和异步两种方式,先来演示同步的方式。创建一个新的类,代码如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @program: zookeeper-connection
* @description: 修改zk节点数据演示
* @author: 01
* @create: 2018-04-25 16:25
**/
public class ZKNodeAlterOperator implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZKNodeAlterOperator.class);
// 集群模式则是多个ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超时时间
private static final Integer timeout = 5000;
private ZooKeeper zooKeeper;
public ZooKeeper getZooKeeper() {
return zooKeeper;
}
public void setZooKeeper(ZooKeeper zooKeeper) {
this.zooKeeper = zooKeeper;
}
public ZKNodeAlterOperator() {
}
public ZKNodeAlterOperator(String connectStr) {
try {
// 在使用该构造器的时候,实例化zk客户端对象
zooKeeper = new ZooKeeper(connectStr, timeout, new ZKNodeAlterOperator());
} catch (IOException e) {
e.printStackTrace();
try {
if (zooKeeper != null) {
zooKeeper.close();
}
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
logger.warn("接收到watch通知:{}", watchedEvent);
}
public static void main(String[] args) throws KeeperException, InterruptedException {
ZKNodeAlterOperator zkServer = new ZKNodeAlterOperator(zkServerIps);
/**
* 修改zk节点数据(同步)
* 参数:
* path:节点路径
* data:新数据
* version 数据版本
*/
Stat status = zkServer.getZooKeeper().setData("/testNode", "this is new data".getBytes(), 0);
// 通过Stat对象可以获取znode所有的状态属性,这里以version为例
System.out.println("修改成功,当前数据版本为:" + status.getVersion());
}
}
运行该类,到服务器上查看节点是否已成功修改数据。如下,我这里是修改成功的:
[zk: localhost:2181(CONNECTED) 12] get /testNode
this is new data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x700000017
mtime = Thu Apr 26 00:21:54 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 16
numChildren = 0
[zk: localhost:2181(CONNECTED) 13]
控制台输出的日志信息如下:
2018-04-25 16:30:02,111 [main-EventThread] [org.zero01.zk.demo.ZkNodeOperator.process(ZkNodeOperator.java:57)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
修改成功,当前数据版本为:1
接下来演示一下异步修改zk节点数据的方式,和异步创建节点是几乎一样的。也是需要新建一个类来实现回调接口的方法,只不过接口不一样而已。如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.data.Stat;
public class AlterCallBack implements StatCallback {
// 回调函数
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("修改节点:" + path + "成功...");
// 通过Stat对象可以获取znode所有的状态属性,这里以version为例
System.out.println("当前数据版本为:" + stat.getVersion());
System.out.println((String) ctx);
}
}
然后修改 ZKNodeAlterOperator 类中的main方法代码如下:
...
public class ZKNodeAlterOperator implements Watcher {
...
public static void main(String[] args) throws KeeperException, InterruptedException {
ZKNodeAlterOperator zkServer = new ZKNodeAlterOperator(zkServerIps);
/**
* 修改zk节点数据(异步)
* 参数:
* path:节点路径
* data:新数据
* version: 数据版本
* sc:实现回调函数的对象
* ctx:给回调函数的上下文
*/
String ctx = "{‘alter‘:‘success‘}";
zkServer.getZooKeeper().setData("/testNode", "asynchronous-data".getBytes(), 0, new AlterCallBack(), ctx);
Thread.sleep(2000);
}
}
运行该类,到服务器上查看节点是否已成功修改数据。如下,我这里是修改成功的:
[zk: localhost:2181(CONNECTED) 16] get /testNode
asynchronous-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x70000001a
mtime = Thu Apr 26 00:35:53 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 0
[zk: localhost:2181(CONNECTED) 17]
控制台输出的日志信息如下:
2018-04-25 16:44:03,472 [main-EventThread] [org.zero01.zk.demo.ZKNodeAlterOperator.process(ZKNodeAlterOperator.java:58)] - [WARN] 接收到watch通知:WatchedEvent state:SyncConnected type:None path:null
修改节点:/testNode成功...
当前数据版本为:2
{‘alter‘:‘success‘}
同步/异步删除zk节点
同样的,删除节点也有同步和异步两种方式,在删除节点操作上,使用异步会更人性化一些,因为有回调通知,同步的方式,除了设置了watch事件,不然是没有通知的。我们先来看一下同步方式的删除节点,代码如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZKNodeDeleteOperator implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZKNodeDeleteOperator.class);
// 集群模式则是多个ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超时时间
private static final Integer timeout = 5000;
private static ZooKeeper zooKeeper;
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
logger.warn("接收到watch通知:{}", watchedEvent);
}
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeAlterOperator());
// 创建节点
zooKeeper.create("/testDeleteNode", "test-delete-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(1000);
/**
* 删除节点(同步)
* 参数:
* path:需要删除的节点路径
* version:数据版本
*/
zooKeeper.delete("/testDeleteNode", 0);
zooKeeper.close();
}
}
由于同步的删除方法不会有返回值,所以我们无法在控制台输出内容。
然后再来看一下异步方式的删除节点,首先需要新建一个类实现回调接口的方法:
package org.zero01.zk.demo;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
public class DeleteCallBack implements VoidCallback {
// 回调函数
public void processResult(int rc, String path, Object ctx) {
System.out.println("删除节点:" + path + " 成功...");
System.out.println((String) ctx);
}
}
然后修改一下 ZKNodeDeleteOperator 类的main方法:
public class ZKNodeDeleteOperator implements Watcher {
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeAlterOperator());
// 创建节点
zooKeeper.create("/testDeleteNode", "test-delete-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(1000);
/**
* 删除节点(异步)
* 参数:
* path:需要删除的节点路径
* version:数据版本
* sc:实现回调函数的对象
* ctx:给回调函数的上下文
*/
String ctx = "{‘delete‘:‘success‘}";
zooKeeper.delete("/testDeleteNode", 0, new DeleteCallBack(), ctx);
Thread.sleep(2000);
zooKeeper.close();
}
}
运行该类,控制台输出结果如下:
删除节点:/testDeleteNode 成功...
{‘delete‘:‘success‘}
获取zk节点数据
以上小节介绍完了增删改,现在就剩下查了。同样的查询也有同步和异步两种方式,异步的方式在之前的增删改例子中已经都介绍过了,在查询里使用异步也是和增删改同样的方式,所以就不再演示查询的异步了。zk中有三种数据可以查询:查询zk节点数据、查询zk子节点列表、查询某个zk节点是否存在。本节先介绍如何查询zk节点数据。
现在zookeeper服务器上,有一个/testNode节点。节点数据内容如下:
[zk: localhost:2181(CONNECTED) 3] get /testNode
asynchronous-data
...
[zk: localhost:2181(CONNECTED) 4]
然后我们来编写一个 ZKGetNodeData 类,调用zookeeper的API去获取zk节点数据。代码示例:
package org.zero01.zk.demo;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @program: zookeeper-connection
* @description: 获取zk节点数据demo
* @author: 01
* @create: 2018-04-26 18:05
**/
public class ZKGetNodeData implements Watcher {
private static final Logger logger = LoggerFactory.getLogger(ZKGetNodeData.class);
// 集群模式则是多个ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超时时间
private static final Integer timeout = 5000;
private static ZooKeeper zooKeeper;
private static Stat stat = new Stat();
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
logger.warn("接收到watch通知:{}", watchedEvent);
}
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetNodeData());
/**
* 参数:
* path:节点路径
* watch:true或者false,注册一个watch事件
* stat:状态,我们可以通过这个对象获取节点的状态信息
*/
byte[] resByte = zooKeeper.getData("/testNode", true, stat);
String result = new String(resByte);
System.out.println("/testNode 节点的数据: " + result);
zooKeeper.close();
}
}
控制台输出结果如下:
/testNode 节点的值: asynchronous-data
通过实现 Watcher 接口的通知方法,再结合这个获取节点数据的API,我们就可以在数据发生改变的时候获取最新的数据。如下示例,在 ZKGetNodeData 类中,增加代码如下:
...
public class ZKGetNodeData implements Watcher {
...
// 计数器
private static CountDownLatch countDownLatch = new CountDownLatch(1);
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
try {
if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
ZooKeeper zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetNodeData());
byte[] resByte = zooKeeper.getData("/testNode", false, stat);
String result = new String(resByte);
System.out.println("/testNode 节点的数据发生了变化");
System.out.println("新的数据为: " + result);
System.out.println("新的数据版本号为:" + stat.getVersion());
// 通知完之后,计数器减一
countDownLatch.countDown();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
...
// 等待线程执行
countDownLatch.await();
}
}
这时候由于我们在main里调用了await()方法,所以主线程会阻塞。然后我们到zookeeper服务器上,对该节点的数据进行操作,如下:
[zk: localhost:2181(CONNECTED) 11] get /testNode
asynchronous-data
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x800000011
mtime = Fri Apr 27 03:04:09 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 6
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 0
[zk: localhost:2181(CONNECTED) 12] set /testNode new-data 6
cZxid = 0x700000014
ctime = Wed Apr 25 22:17:26 CST 2018
mZxid = 0x800000013
mtime = Fri Apr 27 03:04:35 CST 2018
pZxid = 0x700000014
cversion = 0
dataVersion = 7
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0
[zk: localhost:2181(CONNECTED) 13]
当我们修改了数据之后,控制台就会输出如下内容,主线程就会解除阻塞结束执行:
/testNode 节点的数据: asynchronous-data
/testNode 节点的数据发生了变化
新的数据为: new-data
新的数据版本号为:7
获取zk子节点列表
本节介绍一下如何获取zk子节点列表,同样的也是有同步和异步两种方式,这里介绍的是同步的。testNode节点下有三个节点,如下:
[zk: localhost:2181(CONNECTED) 20] ls /testNode
[ThreeNode, TwoNode, OneNode]
[zk: localhost:2181(CONNECTED) 21]
我们来写一个demo获取这个节点下的子节点列表。代码如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.List;
/**
* @program: zookeeper-connection
* @description: zookeeper 获取子节点数据的demo演示
* @author: 01
* @create: 2018-04-26 21:13
**/
public class ZKGetChildrenList implements Watcher{
// 集群模式则是多个ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超时时间
private static final Integer timeout = 5000;
private static ZooKeeper zooKeeper;
// Watch事件通知方法
public void process(WatchedEvent watchedEvent) {
}
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKGetChildrenList());
/**
* 参数:
* path:父节点路径
* watch:true或者false,注册一个watch事件
*/
List<String> strChildList = zooKeeper.getChildren("/testNode", false);
for (String s : strChildList) {
System.out.println(s);
}
}
}
控制台就会输出内容如下:
ThreeNode
TwoNode
OneNode
判断zk节点是否存在
最后介绍如何判断一个zk节点是否存在,同样的也是有同步和异步两种方式,这里介绍的是同步的。我们来写一个demo判断某个zk节点是否存在。代码如下:
package org.zero01.zk.demo;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* @program: zookeeper-connection
* @description: zookeeper 判断节点是否存在demo
* @author: 01
* @create: 2018-04-26 22:06
**/
public class ZKNodeExist implements Watcher {
// 集群模式则是多个ip
private static final String zkServerIps = "192.168.190.128:2181,192.168.190.129:2181,192.168.190.130:2181";
// 超时时间
private static final Integer timeout = 5000;
private static ZooKeeper zooKeeper;
public void process(WatchedEvent watchedEvent) {
}
public static void main(String[] args) throws Exception {
zooKeeper = new ZooKeeper(zkServerIps, timeout, new ZKNodeExist());
/**
* 参数:
* path:节点路径
* watch:true或者false,注册一个watch事件
*/
Stat stat = zooKeeper.exists("/testNode", true);
if (stat != null) {
System.out.println("testNode 节点存在...");
System.out.println("该节点的数据版本为:" + stat.getVersion());
} else {
System.out.println("该节点不存在...");
}
}
}
运行该类,控制台输出如下:
testNode 节点存在...
该节点的数据版本为:7
将testNode换成一个不存在的节点,运行该类,控制台输出如下:
该节点不存在...
以上是关于使用ZooKeeper提供的Java API操作ZooKeeper的主要内容,如果未能解决你的问题,请参考以下文章