Zookeeper基于API操作Node节点
Posted Vip灬cnblog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper基于API操作Node节点相关的知识,希望对你有一定的参考价值。
-
org.apache.zookeeper //客户端主要类文件
-
org.apache.zookeeper.data //
-
org.apache.zookeeper.server
-
org.apache.zookeeper.server.quorum
-
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> </dependency>
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class CreateSession implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1); /* 建立会话 */ public static void main(String[] args) throws IOException, InterruptedException { /* 客户端可以通过创建一个zk实例来连接zk服务器 new Zookeeper(connectString,sesssionTimeOut,Wather) connectString: 连接地址:IP:端口 sesssionTimeOut:会话超时时间:单位毫秒 Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端) */ ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession()); System.out.println(zooKeeper.getState()); // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞 countDownLatch.await(); System.out.println("客户端与服务端会话真正建立了"); } /* 回调方法:处理来自服务器端的watcher通知 */ public void process(WatchedEvent watchedEvent) { // SyncConnected if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ //解除主程序在CountDownLatch上的等待阻塞 System.out.println("process方法执行了..."); countDownLatch.countDown(); } } }
注意:ZooKeeper客户端和服务端会话的建立是一个异步的过程,也就是说在程序中,构造方法会在处理完客户端初始化工作后立即返回,在大多数情况下,此时并没有真正建立好一个可用的会话,在会话的生命周期中处于“ CONNECTING的状态。当该会话真正创建完毕后 ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话
创建节点
节点介绍:
/**
* path :节点创建的路径
* data[] :节点创建要保存的数据,是个byte类型的
* acl :节点创建的权限信息(4种类型)
* ANYONE_ID_UNSAFE : 表示任何人
* AUTH_IDS :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
* OPEN_ACL_UNSAFE :这是一个完全开放的ACL(常用)--> world:anyone
* CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode :创建节点的类型(4种类型)
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点
String node = zookeeper.create(path,data,acl,createMode);
*/
import org.apache.zookeeper.*; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class CreateNote implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper; /* 建立会话 */ public static void main(String[] args) throws IOException, InterruptedException, KeeperException { /* 客户端可以通过创建一个zk实例来连接zk服务器 new Zookeeper(connectString,sesssionTimeOut,Wather) connectString: 连接地址:IP:端口 sesssionTimeOut:会话超时时间:单位毫秒 Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端) */ zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNote()); System.out.println(zooKeeper.getState()); // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞 //countDownLatch.await();\\ Thread.sleep(Integer.MAX_VALUE); } /* 回调方法:处理来自服务器端的watcher通知 */ public void process(WatchedEvent watchedEvent) { // SyncConnected if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ //解除主程序在CountDownLatch上的等待阻塞 System.out.println("process方法执行了..."); // 创建节点 try { createNoteSync(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } /* 创建节点的方法 */ private static void createNoteSync() throws KeeperException, InterruptedException { /** * path :节点创建的路径 * data[] :节点创建要保存的数据,是个byte类型的 * acl :节点创建的权限信息(4种类型) * ANYONE_ID_UNSAFE : 表示任何人 * AUTH_IDS :此ID仅可用于设置ACL。它将被客户机验证的ID替换。 * OPEN_ACL_UNSAFE :这是一个完全开放的ACL(常用)--> world:anyone * CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限 * createMode :创建节点的类型(4种类型) * PERSISTENT:持久节点 * PERSISTENT_SEQUENTIAL:持久顺序节点 * EPHEMERAL:临时节点 * EPHEMERAL_SEQUENTIAL:临时顺序节点 String node = zookeeper.create(path,data,acl,createMode); */ // 持久节点 String note_persistent = zooKeeper.create("/g-persistent", "持久节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 临时节点 String note_ephemeral = zooKeeper.create("/g-ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // 持久顺序节点 String note_persistent_sequential = zooKeeper.create("/g-persistent_sequential", "持久顺序节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println("创建的持久节点" + note_persistent); System.out.println("创建的临时节点" + note_ephemeral); System.out.println("创建的持久顺序节点" + note_persistent_sequential); } }
读取节点:
public class GetNoteData implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper; /* 建立会话 */ public static void main(String[] args) throws IOException, InterruptedException, KeeperException { /* 客户端可以通过创建一个zk实例来连接zk服务器 new Zookeeper(connectString,sesssionTimeOut,Wather) connectString: 连接地址:IP:端口 sesssionTimeOut:会话超时时间:单位毫秒 Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端) */ zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new GetNoteData()); System.out.println(zooKeeper.getState()); // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞 //countDownLatch.await();\\ Thread.sleep(Integer.MAX_VALUE); } /* 回调方法:处理来自服务器端的watcher通知 */ public void process(WatchedEvent watchedEvent) { /* 子节点列表发生改变时,服务器端会发生noteChildrenChanged事件通知 要重新获取子节点列表,同时注意:通知是一次性的,需要反复注册监听 */ if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){ List<String> children = null; try { children = zooKeeper.getChildren("/g-persistent", true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(children); } // SyncConnected if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ //解除主程序在CountDownLatch上的等待阻塞 System.out.println("process方法执行了..."); // 获取节点数据的方法 try { getNoteData(); // 获取节点的子节点列表方法 getChildrens(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } /* 获取某个节点的内容 */ private void getNoteData() throws KeeperException, InterruptedException { /** * path : 获取数据的路径 * watch : 是否开启监听 * stat : 节点状态信息 * null: 表示获取最新版本的数据 * zk.getData(path, watch, stat); */ byte[] data = zooKeeper.getData("/g-persistent", false, null); System.out.println(new String(data)); } /* 获取某个节点的子节点列表方法 */ public static void getChildrens() throws KeeperException, InterruptedException { /* path:路径 watch:是否要启动监听,当子节点列表发生变化,会触发监听 zooKeeper.getChildren(path, watch); */ List<String> children = zooKeeper.getChildren("/g-persistent", true); System.out.println(children); } }
删除节点
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class DeleteNote implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper; /* 建立会话 */ public static void main(String[] args) throws IOException, InterruptedException, KeeperException { /* 客户端可以通过创建一个zk实例来连接zk服务器 new Zookeeper(connectString,sesssionTimeOut,Wather) connectString: 连接地址:IP:端口 sesssionTimeOut:会话超时时间:单位毫秒 Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端) */ zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new DeleteNote()); System.out.println(zooKeeper.getState()); // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞 //countDownLatch.await();\\ Thread.sleep(Integer.MAX_VALUE); } /* 回调方法:处理来自服务器端的watcher通知 */ public void process(WatchedEvent watchedEvent) { // SyncConnected if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ //解除主程序在CountDownLatch上的等待阻塞 System.out.println("process方法执行了..."); // 删除节点 try { deleteNoteSync(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } /* 删除节点的方法 */ private void deleteNoteSync() throws KeeperException, InterruptedException { /* zooKeeper.exists(path,watch) :判断节点是否存在 zookeeper.delete(path,version) : 删除节点 */ Stat stat = zooKeeper.exists("/g-persistent/c1", false); System.out.println(stat == null ? "该节点不存在":"该节点存在"); if(stat != null){ zooKeeper.delete("/g-persistent/c1",-1); } Stat stat2 = zooKeeper.exists("/g-persistent/c1", false); System.out.println(stat2 == null ? "该节点不存在":"该节点存在"); } }
更新节点
/* 更新数据节点内容的方法 */ private void updateNoteSync() throws KeeperException, InterruptedException { /* path:路径 data:要修改的内容 byte[] version:为-1,表示对最新版本的数据进行修改 zooKeeper.setData(path, data,version); */ byte[] data = zooKeeper.getData("/g-persistent", false, null); System.out.println("修改前的值:" + new String(data)); //修改/lg-persistent 的数据 stat: 状态信息对象 Stat stat = zooKeeper.setData("/g-persistent", "客户端修改了节点数据".getBytes(), -1); byte[] data2 = zooKeeper.getData("/g-persistent", false, null); System.out.println("修改后的值:" + new String(data2)); }
以上是关于Zookeeper基于API操作Node节点的主要内容,如果未能解决你的问题,请参考以下文章
Zookeeper3.5.7版本——客户端 API 操作(代码示例)
Java分布式锁三种实现方案——方案三:基于Zookeeper的分布式锁,利用节点名称的唯一性来实现独占锁
Zookeeper -- Zookeeper JavaAPI相关操作(Curator介绍Curator API 常用操作(节点的CRUD,Watch事件监听)分布式锁模拟12306售票案例)