Zookeeper客户端的使用
Posted *King*
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper客户端的使用相关的知识,希望对你有一定的参考价值。
1、Zookeeper原生客户端
zookeeper官方提供的java客户端API
(1)引入依赖
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
(2)编写原生客户端测试类
1.创建会话:
运行后创建成功了
优化后的代码及运行结果
public class TestCreateSession {
//Zookeeper服务地址
private static final String SERVER = "127.0.0.1:2181";
//会话超时时间
private static final int SESSION_TIMEOUT = 30000;
/**
* 获取session的方式,这种方式可能会在Zookeeper还没有获取连接的时候就已经对ZK进行访问了
* @param:
* @return void
* @author <a href="mailto:wanglu@smartdot.com.cn">wanglu</a>
* @Date 2021/6/5
*/
@Test
public void testSession() throws IOException, KeeperException, InterruptedException {
ZooKeeper zookeeper = new ZooKeeper(SERVER,SESSION_TIMEOUT,null);
System.out.println(zookeeper);
System.out.println(zookeeper.getState());
//创建节点
zookeeper.create("/king","value".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//发令枪
private CountDownLatch countDownLatch = new CountDownLatch(1);
/**
* 对获取Session的方式进行优化,在Zookeeper初始化完成以前先等待,等待完成后再进行后续操作
* @author <a href="mailto:wanglu@smartdot.com.cn">wanglu</a>
* @Date 2021/6/5
*/
@Test
public void testSession2() throws IOException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper(SERVER, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//确定已经连接完毕后再进行操作
countDownLatch.countDown();
System.out.println("已经获得了连接");
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState());
}
}
2.客户端基本操作
package com.example.zookeeperdemo;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* Zookeeper客户端的基本使用
* @author wanglu
* @since 1.0, 2021/6/5 20:54
*/
public class TestJavaApi implements Watcher {
private static final int SESSION_TIMEOUT = 10000;
private static final String CONNECTION_STRING = "127.0.0.1:2181";
private static final String ZK_PATH = "/leader";
private ZooKeeper zk = null;
private CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) {
TestJavaApi sample = new TestJavaApi();
sample.createConnection(CONNECTION_STRING,SESSION_TIMEOUT);
if(sample.createPath(ZK_PATH,"我是节点初始内容")){
System.out.println();
System.out.println("数据内容:"+sample.readDate(ZK_PATH)+"\\n");
sample.writeDate(ZK_PATH,"更新后的数据");
System.out.println("数据内容:"+sample.readDate(ZK_PATH)+"\\n");
sample.deleteNode(ZK_PATH);
}
sample.releaseConnection();
}
/**
* 创建ZK连接
* @Date 2021/6/5
*/
private void createConnection(String connectionString, int sessionTimeout) {
this.releaseConnection();
try {
zk = new ZooKeeper(connectionString,sessionTimeout,this);
countDownLatch.await();
}catch (InterruptedException e ){
System.out.println("连接创建失败,发生InterruptedException");
e.printStackTrace();
}catch (IOException e){
System.out.println("连接创建失败,发生IOException");
e.printStackTrace();
}
}
/**
* 创建节点
* @param: path 节点path
* @param: data 初始数据内容
* @Date 2021/6/5
*/
private boolean createPath(String path, String data) {
try {
System.out.println("节点创建成功,path:"+this.zk.create(
path,//节点路径
data.getBytes(),//节点内容
ZooDefs.Ids.OPEN_ACL_UNSAFE,//节点权限
CreateMode.EPHEMERAL//节点类型(临时的)
)+",content:"+data);
}catch (KeeperException e){
System.out.println("节点创建失败,发生KeeperException");
e.printStackTrace();
}catch (InterruptedException e){
System.out.println("节点创建失败,发生InterruptedException");
e.printStackTrace();
}
return true;
}
/**
* 读取指定节点数据内容
* @param: path 节点path
* @Date 2021/6/5
*/
private String readDate(String path) {
try {
System.out.println("获取数据成功,path"+path);
return new String(this.zk.getData(path,false,null));
}catch (KeeperException e){
System.out.println("读取数据失败,发生KeeperException,path:"+path);
e.printStackTrace();
}catch (InterruptedException e){
System.out.println("读取数据失败,发生InterruptedException,path:"+path);
e.printStackTrace();
}
return "";
}
/**
* 更新指定节点数据内容
* @param: zkPath
* @param: data
* @Date 2021/6/5
*/
private boolean writeDate(String path, String data) {
try {
System.out.println("更新数据成功,path:"+path+",stat:"+
this.zk.setData(path,data.getBytes(),-1));
} catch (InterruptedException e) {
System.out.println("更新数据失败,发生InterruptedException,path:"+path);
e.printStackTrace();
} catch (KeeperException e) {
System.out.println("更新数据失败,发生KeeperException,path:"+path);
e.printStackTrace();
}
return false;
}
/**
* 删除指定节点数据
* @param: path 节点path
* @Date 2021/6/5
*/
private void deleteNode(String path) {
try {
this.zk.delete(path,-1);
System.out.println("删除节点成功,path:"+path);
} catch (InterruptedException e) {
System.out.println("删除节点失败,发生InterruptedException,path"+path);
e.printStackTrace();
} catch (KeeperException e) {
System.out.println("删除节点失败,发生KeeperException,path"+path);
e.printStackTrace();
}
}
/**
* 关闭ZK连接
* @Date 2021/6/5
*/
private void releaseConnection() {
if(null != this.zk){
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 收到来自Server的Watcher通知后的处理
* @Date 2021/6/5
*/
@Override
public void process(WatchedEvent event) {
System.out.println("收到事件通知:"+event.getState());
if(Event.KeeperState.SyncConnected == event.getState()){
countDownLatch.countDown();
}
}
}
3.Zookeeper的监听机制
package com.example.zookeeperdemo;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.WatcherEvent;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Zookeeper的监听机制
* @author wanglu
* @since 1.0, 2021/6/5 23:33
*/
public class ZooKeeperWatcher implements Watcher {
/** 定义原子变量 */
AtomicInteger seq = new AtomicInteger();
/** 定义session失效时间 */
private static final int SESSION_TIMEOUT = 10000;
/** zookeeper服务器地址 */
private static final String CONNECTION_ADDR = "127.0.0.1:2181";
/** zk父路径设置 */
private static final String PARENT_PATH = "/testWatch";
/** zk子路径设置 */
private static final String CHILDREN_PATH = "/testWatch/children";
/** 进入标识 */
private static final String LOG_PREFIX_OF_MAIN = "【Main】";
/** zk变量 */
private ZooKeeper zk = null;
/** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
/**
* 创建ZK连接
* @param connectAddr ZK服务器地址列表
* @param sessionTimeout Session超时时间
*/
public void createConnection(String connectAddr, int sessionTimeout) {
this.releaseConnection();
try {
zk = new ZooKeeper(connectAddr, sessionTimeout, this);
System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");
connectedSemaphore.await();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 关闭ZK连接
*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 创建节点
* @param path 节点路径
* @param data 数据内容
* @return
*/
public boolean createPath(String path, String data) {
try {
//设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)
this.zk.exists(path, true);
System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " +
this.zk.create( /**路径*/
path,
/**数据*/
data.getBytes(),
/**所有可见*/
ZooDefs.Ids.OPEN_ACL_UNSAFE,
/**永久存储*/
CreateMode.PERSISTENT ) +
", content: " + data);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 读取指定节点数据内容
* @param path 节点路径
* @return
*/
public String readData(String path, boolean needWatch) {
try {
return new String(this.zk.getData(path, needWatch, null));
} catch (Exception e) {
e.printStackTrace();
return "";
}
}
/**
* 更新指定节点数据内容
* @param path 节点路径
* @param data 数据内容
* @return
*/
public boolean writeData(String path, String data) {
try {
System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +
this.zk.setData(path, data.getBytes(), -1));
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 删除指定节点
*
* @param path
* 节点path
*/
public void deleteNode(String path) {
try {
this.zk.delete(path, -1);
System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 判断指定节点是否存在
* @param path 节点路径
*/
public Stat exists(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取子节点
* @param path 节点路径
*/
private List<String> getChildren(String path, boolean needWatch) {
try {
return this.zk.getChildren(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 删除所有节点
*/
public void deleteAllTestPath() {
if(this.exists(CHILDREN_PATH, false) != null){
this.deleteNode(CHILDREN_PATH);
}
if(this.exists(PARENT_PATH, false) != null){
this.deleteNode(PARENT_PATH);
}
}
/**
* 收到来自Server的Watcher通知后的处理。
*/
@Override
public void process(WatchedEvent event) {
System.out.println("进入 process 。。。。。event = " + event);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (event == null) {
return;
}
// 连接状态
Watcher.Event.KeeperState keeperState = event.getState();
// 事件类型
Watcher.Event.EventType eventType = event.getType();
// 受影响的path
String path = event.getPath();
String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
System.out.println(logPrefix + "收到Watcher通知");
System.out.println(logPrefix + "连接状态:\\t" + keeperState.toString());
System.out.println(logPrefix + "事件类型:\\t" + eventType.toString());
if (Event.KeeperState.SyncConnected == keeperState) {
// 成功连接上ZK服务器
if (Event.EventType.None == eventType) {
System.out.println(logPrefix + "成功连接上ZK服务器");
connectedSemaphore.countDown();
}
//创建节点
if (Event.EventType.NodeCreated == eventType) {
System.out.println(logPrefix + "节点创建");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.exists(path, true);
}
//更新节点
else if (Event.EventType.NodeDataChanged == eventType) {
System.out.println(logPrefix + "节点数据更新");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));
}
//更新子节点
else if (Event.EventType.NodeChildrenChanged == eventType) {
System.out.println(logPrefix + "子节点变更");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));
}
//删除节点
else if (Event.EventType.NodeDeleted == eventType) {
System.out.println(logPrefix + "节点 " + path + " 被删除");
}
}
else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
System.out.println(logPrefix + "与ZK服务器断开连接");
}
else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
System.out.println(logPrefix + "权限检查失败");
}
else if (Watcher.Event.KeeperState.Expired == keeperState) {
System.out.println(logPrefix + "会话失效");
}
System.out.println("--------------------------------------------");
}
public static void main(String[] args) throws Exception {
//建立watcher
ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
//创建连接
zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
//System.out.println(zkWatch.zk.toString());
Thread.sleep(1000);
// 清理节点
zkWatch.deleteAllTestPath();
if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {
// 读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。watch是一次性的,
// 也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。
System.out.println("---------------------- read parent ----------------------------");
// zkWatch.readData(PARENT_PATH, true);
// 更新数据
zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
/** 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,而不会输出NodeChildrenChanged,
也就是说创建子节点时没有watch。
如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在创建c1时watch,输出c1的NodeChildrenChanged,
而不会输出创建c2时的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,
其中path="/p/c1"
*/
System.out.println("---------------------- read children path ----------------------------");
zkWatch.getChildren(PARENT_PATH, true);
Thread.sleep(1000);
// 创建子节点,同理如果想要watch到NodeChildrenChanged状态,需要调用getChildren(CHILDREN_PATH, true)
zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");
Thread.sleep(1000);
zkWatch.readData(CHILDREN_PATH, true);
zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
}
Thread.sleep(20000);
// 清理节点
zkWatch.deleteAllTestPath();
Thread.sleep(1000);
zkWatch.releaseConnection();
}
}
运行结果:
【Main】开始连接ZK服务器
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态: SyncConnected
【Watcher-1】事件类型: None
【Watcher-1】成功连接上ZK服务器
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatch
【Main】节点创建成功, Path: /testWatch, content: 1622912745034
---------------------- read parent ----------------------------
【Main】更新数据成功,path:/testWatch, stat: 60,61,1622912745043,1622912745187,1,0,0,0,13,0,60
---------------------- read children path ----------------------------
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态: SyncConnected
【Watcher-2】事件类型: NodeCreated
【Watcher-2】节点创建
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatch/children
【Main】节点创建成功, Path: /testWatch/children, content: 1622912746224
【Watcher-3】收到Watcher通知
【Watcher-3】连接状态: SyncConnected
【Watcher-3】事件类型: NodeCreated
【Watcher-3】节点创建
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatch
【Watcher-4】收到Watcher通知
【Watcher-4】连接状态: SyncConnected
【Watcher-4】事件类型: NodeChildrenChanged
【Watcher-4】子节点变更
【Main】更新数据成功,path:/testWatch/children, stat: 62,63,1622912746227,1622912747352,1,0,0,0,13,0,62
【Watcher-4】子节点列表:[children]
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatch/children
【Watcher-5】收到Watcher通知
【Watcher-5】连接状态: SyncConnected
【Watcher-5】事件类型: NodeDataChanged
【Watcher-5】节点数据更新
【Watcher-5】数据内容: 1622912745185
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatch
【Main】删除节点成功,path:/testWatch/children
【Main】删除节点成功,path:/testWatch
【Watcher-6】收到Watcher通知
【Watcher-6】连接状态: SyncConnected
【Watcher-6】事件类型: NodeChildrenChanged
【Watcher-6】子节点变更
(3)Zookeeper原生客户端在开发支持上的弊端
-
会话连接是异步的
-
Watch需要重复注册
-
Session重连机制
-
开发复杂性较高
2、ZkClient的使用
开源的zk客户端,在原生API基础上封装,是一个更易于使用的zookeeper客户端
(1)引入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
(2)编写客户端测试类
public class ZkClientOperator {
/** zookeeper地址 */
static final String CONNECT_ADDR = "127.0.0.1:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 10000;//ms
public static void main(String[] args) throws Exception {
//ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);
ZkClient zkc = new ZkClient(CONNECT_ADDR, SESSION_OUTTIME);
//1. create and delete方法
zkc.createEphemeral("/temp");
zkc.createPersistent("/super/c1", true);
Thread.sleep(10000);
zkc.delete("/temp");
zkc.deleteRecursive("/super");
//2. 设置path和data 并且读取子节点和每个节点的内容
zkc.createPersistent("/super", "1234");
zkc.createPersistent("/super/c1", "c1内容");
zkc.createPersistent("/super/c2", "c2内容");
List<String> list = zkc.getChildren("/super");
for(String p : list){
System.out.println(p);
String rp = "/super/" + p;
String data = zkc.readData(rp);
System.out.println("节点为:" + rp + ",内容为: " + data);
}
//3. 更新和判断节点是否存在
zkc.writeData("/super/c1", "新内容");
System.out.println(zkc.readData("/super/c1").toString());
System.out.println(zkc.exists("/super/c1"));
// 4.递归删除/super内容
zkc.deleteRecursive("/super");
}
}
运行结果:
(3)ZkClient的监听机制
public class TestZkClientWatcher {
/** zookeeper地址 */
static final String CONNECT_ADDR = "127.0.0.1:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 10000;//ms
@Test
/**
* subscribeChildChanges方法 订阅子节点变化
*/
public void testZkClientWatcher1() throws Exception {
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);
//对父节点添加监听子节点变化。
zkc.subscribeChildChanges("/super", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("parentPath: " + parentPath);
System.out.println("currentChilds: " + currentChilds);
}
});
Thread.sleep(3000);
zkc.createPersistent("/super");
Thread.sleep(1000);
zkc.createPersistent("/super" + "/" + "c1", "c1内容");
Thread.sleep(1000);
zkc.createPersistent("/super" + "/" + "c2", "c2内容");
Thread.sleep(1000);
zkc.delete("/super/c2");
Thread.sleep(1000);
zkc.deleteRecursive("/super");
Thread.sleep(Integer.MAX_VALUE);
}
@Test
/**
* subscribeDataChanges 订阅内容变化
*/
public void testZkClientWatcher2() throws Exception {
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);
zkc.createPersistent("/super", "1234");
//对父节点添加监听子节点变化。
zkc.subscribeDataChanges("/super", new IZkDataListener() {
@Override
public void handleDataDeleted(String path) throws Exception {
System.out.println("删除的节点为:" + path);
}
@Override
public void handleDataChange(String path, Object data) throws Exception {
System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
}
});
Thread.sleep(3000);
zkc.writeData("/super", "456", -1);
Thread.sleep(1000);
zkc.delete("/super");
Thread.sleep(Integer.MAX_VALUE);
}
}
运行结果:
3、Curator的使用
开源的zk客户端,在原生API基础上封装,apache顶级项目
(1)引入依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
(2)编写客户端测试类
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import static com.sun.xml.internal.ws.dump.LoggingDumpTube.Position.Before;
/**
* 测试Apache Curator框架的基本用法
*/
public class OperatorTest {
//ZooKeeper服务地址
private static final String SERVER = "127.0.0.1:2181";
//会话超时时间
private final int SESSION_TIMEOUT = 30000;
//连接超时时间
private final int CONNECTION_TIMEOUT = 5000;
//创建连接实例
private CuratorFramework client = null;
/**
* baseSleepTimeMs:初始的重试等待时间
* maxRetries:最多重试次数
*
*
* ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增
* RetryNTimes:重试N次
* RetryOneTime:重试一次
* RetryUntilElapsed:重试一定时间
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
@org.junit.Before
public void init(){
//创建 CuratorFrameworkImpl实例
client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
//启动
client.start();
}
/**
* 测试创建节点
* @throws Exception
*/
@Test
public void testCreate() throws Exception{
//创建永久节点
client.create().forPath("/curator","/curator data".getBytes());
//创建永久有序节点
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential","/curator_sequential data".getBytes());
//创建临时节点
client.create().withMode(CreateMode.EPHEMERAL)
.forPath("/curator/ephemeral","/curator/ephemeral data".getBytes());
//创建临时有序节点
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath("/curator/ephemeral_path1","/curator/ephemeral_path1 data".getBytes());
}
/**
* 测试检查某个节点是否存在
* @throws Exception
*/
@Test
public void testCheck() throws Exception{
Stat stat1 = client.checkExists().forPath("/curator");
Stat stat2 = client.checkExists().forPath("/curator2");
System.out.println("'/curator'是否存在: " + (stat1 != null ? true : false));
System.out.println("'/curator2'是否存在: " + (stat2 != null ? true : false));
}
/**
* 测试异步设置节点数据
* @throws Exception
*/
@Test
public void testSetDataAsync() throws Exception{
//创建监听器
CuratorListener listener = new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event)
throws Exception {
System.out.println(event.getPath());
}
};
//添加监听器
client.getCuratorListenable().addListener(listener);
//异步设置某个节点数据
client.setData().inBackground().forPath("/curator","sync".getBytes());
//为了防止单元测试结束从而看不到异步执行结果,因此暂停10秒
Thread.sleep(10000);
}
/**
* 测试另一种异步执行获取通知的方式
* @throws Exception
*/
@Test
public void testSetDataAsyncWithCallback() throws Exception{
BackgroundCallback callback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event)
throws Exception {
System.out.println(event.getPath());
}
};
//异步设置某个节点数据
client.setData().inBackground(callback).forPath("/curator","/curator modified data with Callback".getBytes());
//为了防止单元测试结束从而看不到异步执行结果,因此暂停10秒
Thread.sleep(10000);
}
/**
* 测试删除节点
* @throws Exception
*/
@Test
public void testDelete() throws Exception{
//创建测试节点
client.create().orSetData().creatingParentsIfNeeded()
.forPath("/curator/del_key1","/curator/del_key1 data".getBytes());
client.create().orSetData().creatingParentsIfNeeded()
.forPath("/curator/del_key2","/curator/del_key2 data".getBytes());
client.create().forPath("/curator/del_key2/test_key","test_key data".getBytes());
//删除该节点
client.delete().forPath("/curator/del_key1");
//级联删除子节点
client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2");
}
/*
* 测试事务管理:碰到异常,事务会回滚
* @throws Exception
*/
@Test
public void testTransaction() throws Exception{
//定义几个基本操作
CuratorOp createOp = client.transactionOp().create()
.forPath("/curator/one_path","some data".getBytes());
CuratorOp setDataOp = client.transactionOp().setData()
.forPath("/curator","other data".getBytes());
CuratorOp deleteOp = client.transactionOp().delete()
.forPath("/curator");
//事务执行结果
List<CuratorTransactionResult> results = client.transaction()
.forOperations(createOp,setDataOp,deleteOp);
//遍历输出结果
for(CuratorTransactionResult result : results){
System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());
}
}
}
(3)Curator监听机制
public class EventTest {
//ZooKeeper服务地址
private static final String SERVER = "192.168.30.10:2181";
//会话超时时间
private final int SESSION_TIMEOUT = 30000;
//连接超时时间
private final int CONNECTION_TIMEOUT = 5000;
//创建连接实例
private CuratorFramework client = null;
/**
* baseSleepTimeMs:初始的重试等待时间
* maxRetries:最多重试次数
* ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增
* RetryNTimes:重试N次
* RetryOneTime:重试一次
* RetryUntilElapsed:重试一定时间
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
@org.junit.Before
public void init(){
//创建 CuratorFrameworkImpl实例
client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);
//启动
client.start();
}
/**
*
* @描述:第一种监听器的添加方式: 对指定的节点进行添加操作
* 仅仅能监控指定的本节点的数据修改,删除 操作 并且只能监听一次 --->不好
*/
@Test
public void TestListenterOne() throws Exception{
client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());
// 注册观察者,当节点变动时触发
byte[] data = client.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("获取 test 节点 监听器 : " + event);
}
}).forPath("/test");
client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());
Thread.sleep(1000);
client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());
Thread.sleep(1000);
System.out.println("节点数据: "+ new String(data));
Thread.sleep(10000);
}
/**
*
* @描述:第二种监听器的添加方式: Cache 的三种实现
* Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。
* 产生的事件会传递给注册的PathChildrenCacheListener。
* Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
* Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
*/
//1.path Cache 连接 路径 是否获取数据
//能监听所有的字节点 且是无限监听的模式 但是 指定目录下节点的子节点不再监听
@Test
public void setListenterTwoOne() throws Exception{
ExecutorService pool = Executors.newCachedThreadPool();
PathChildrenCache childrenCache = new PathChildrenCache(client, "/test", true);
PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("开始进行事件分析:-----");
ChildData data = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED : "+ data.getPath() +" 数据:"+ data.getData());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED : "+ data.getPath() +" 数据:"+ data.getData());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED : "+ data.getPath() +" 数据:"+ data.getData());
break;
case INITIALIZED:
System.out.println("INITIALIZED : "+ data.getPath() +" 数据:"+ data.getData());
break;
default:
break;
}
}
};
childrenCache.getListenable().addListener(childrenCacheListener);
System.out.println("Register zk watcher successfully!");
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
//创建一个节点
client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());
client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes());
Thread.sleep(1000);
client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","deer".getBytes());
Thread.sleep(1000);
client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","demo".getBytes());
Thread.sleep(1000);
client.delete().forPath("/test/node02");
Thread.sleep(10000);
}
//2.Node Cache 监控本节点的变化情况 连接 目录 是否压缩
//监听本节点的变化 节点可以进行修改操作 删除节点后会再次创建(空节点)
@Test
public void setListenterTwoTwo() throws Exception{
ExecutorService pool = Executors.newCachedThreadPool();
//设置节点的cache
final NodeCache nodeCache = new NodeCache(client, "/test", false);
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("the test node is change and result is :");
System.out.println("path : "+nodeCache.getCurrentData().getPath());
System.out.println("data : "+new String(nodeCache.getCurrentData().getData()));
System.out.println("stat : "+nodeCache.getCurrentData().getStat());
}
});
nodeCache.start();
client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());
Thread.sleep(1000);
client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","enjoy".getBytes());
Thread.sleep(10000);
}
//3.Tree Cache
// 监控 指定节点和节点下的所有的节点的变化--无限监听 可以进行本节点的删除(不在创建)
@Test
public void TestListenterTwoThree() throws Exception{
ExecutorService pool = Executors.newCachedThreadPool();
//设置节点的cache
TreeCache treeCache = new TreeCache(client, "/test");
//设置监听器和处理过程
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if(data !=null){
switch (event.getType()) {
case NODE_ADDED:
System.out.println("NODE_ADDED : "+ data.getPath() +" 数据:"+ new String(data.getData()));
break;
case NODE_REMOVED:
System.out.println("NODE_REMOVED : "+ data.getPath() +" 数据:"+ new String(data.getData()));
break;
case NODE_UPDATED:
System.out.println("NODE_UPDATED : "+ data.getPath() +" 数据:"+ new String(data.getData()));
break;
default:
break;
}
}else{
System.out.println( "data is null : "+ event.getType());
}
}
});
//开始监听
treeCache.start();
//创建一个节点
client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());
Thread.sleep(1000);
client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes());
Thread.sleep(1000);
client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","deer".getBytes());
Thread.sleep(1000);
client.create().orSetData().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test/node02/node02_2","deer".getBytes());
Thread.sleep(10000);
}
}
+" 数据:"+ new String(data.getData()));
break;
case NODE_UPDATED:
System.out.println(“NODE_UPDATED : “+ data.getPath() +” 数据:”+ new String(data.getData()));
break;
default:
break;
}
}else{
System.out.println( "data is null : "+ event.getType());
}
}
});
//开始监听
treeCache.start();
//创建一个节点
client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());
Thread.sleep(1000);
client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes());
Thread.sleep(1000);
client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","deer".getBytes());
Thread.sleep(1000);
client.create().orSetData().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test/node02/node02_2","deer".getBytes());
Thread.sleep(10000);
}
}
以上是关于Zookeeper客户端的使用的主要内容,如果未能解决你的问题,请参考以下文章
Zookeeper系列三:Zookeeper客户端的使用(Zookeeper原生API如何进行调用ZKClientCurator)