06_zookeeper原生Java API使用
Posted HigginCui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了06_zookeeper原生Java API使用相关的知识,希望对你有一定的参考价值。
【Zookeeper构造方法概述】
/** * 客户端和zk服务端的连接是一个异步的过程 * 当连接成功后,客户端会收到一个watch通知 * * ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, * long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) * 参数介绍 * connectString:连接服务器的ip字符串 * 比如:"192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181" * 可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群 * 也可以在ip后加路径 * sessionTimeout:超时时间,心跳收不到了,那就超时 * watcher:通知事件,如果有对应的事件触发,则会收到一个通知:如果不需要,那就设为null * sessionId:会话的id * sessionPasswd:会话密码,当会话丢失后,可以依据sessionId和sessionPasswd重新获取会话 * canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写, * 此时数据被读取到的可能是旧数据,一般设置为false,不推荐使用 * */ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
【Zookeeper API 客户端连接服务端例子】
package com.zk.demo; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; /** * Created by HigginCui on 2018/9/20. */ public class ZkConnect implements Watcher{ public static final String zkServerPath = "127.0.0.1:2181"; public static final Integer timeout = 5000; /** * 客户端和zk服务端的连接是一个异步的过程 * 当连接成功后,客户端会收到一个watch通知 */ public static void main(String[] args) throws Exception{ ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,new ZkConnect()); for (int i=0;i<20;i++) { Thread.sleep(10); //休眠10ms,在这个过程中,连接状态会从CONNECTING--->CONNECTED System.out.println(i+"---"+zk.getState()); } } @Override public void process(WatchedEvent watchedEvent) { System.err.println("收到zk的watch通知----" ); } }
【运行结果】
【使用CountDownLatch优化zk连接过程】
package com.zk.demo; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; public class ZkConnect implements Watcher{ public static final String zkServerPath = "127.0.0.1:2181"; public static final Integer timeout = 5000; private static CountDownLatch latch = new CountDownLatch(1); public static void main(String[] args) throws Exception{ ZooKeeper zk = new ZooKeeper(zkServerPath,timeout,new ZkConnect()); System.out.println("连接状态---" + zk.getState()); latch.await(); System.out.println("连接状态---" + zk.getState()); } @Override public void process(WatchedEvent watchedEvent) { System.err.println("收到zk的watch通知----" ); latch.countDown(); } }
【运行结果】
【创建节点】
/** * * String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) * * 同步或异步创建节点,都不支持子节点的递归操作,异步有一个callBack方法 * 参数 * path:创建的路径 * data:存储的数据,byte[]类型 * acl:权限控制策略 * Ids.OPEN_ACL_UNSAFE ---> world:anyone:crdwa * CREATE_ALL_ACL ---> auth:user:password:cdrwa * createMode:节点类型,是一个枚举 * CreateMode.PERSISTENT: 持久节点 * CreateMode.PERSISTENT_SEQUENTIAL:持久顺序节点 * CreateMode.EPHEMERAL: 临时节点 * CreateMode.EPHEMERAL_SEQUENTIAL: 临时顺序节点 */
【创建一个临时节点】
package com.zk.demo; import org.apache.zookeeper.*; import java.util.concurrent.CountDownLatch; /** * Created by HigginCui on 2018/9/21. */ public class ZkNodeOperator implements Watcher{ private static ZooKeeper zooKeeper = null; public static final String zkServerPath = "127.0.0.1:2181"; public static final Integer timeout = 5000; private static CountDownLatch latch = new CountDownLatch(1); public ZkNodeOperator() { } private static void init() { try{ zooKeeper = new ZooKeeper(zkServerPath,timeout,new ZkNodeOperator()); latch.await(); }catch (Exception e){ e.printStackTrace(); if(zooKeeper!=null){ try { zooKeeper.close(); }catch (InterruptedException e1){ e1.printStackTrace(); } } } } public static void main(String[] args) throws Exception{ init();
//创建/testnode临时节点,包含数据为haha,权限是OPEN_ACL_UNSAFE,类型为临时节点 zooKeeper.create("/testnode","haha".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); } /** * 观察者 */ @Override public void process(WatchedEvent event) { System.err.println("收到zk的watch通知----" +event.getPath()+"---"+event.getState()); latch.countDown(); } }
【运行结果 直接看打开zkCli.sh连接】
【创建持久节点,并且产生一个回调通知】
package com.zk.demo; import org.apache.zookeeper.*; import java.util.concurrent.CountDownLatch; /** * Created by HigginCui on 2018/9/21. */ public class ZkNodeOperator implements Watcher{ private static ZooKeeper zooKeeper = null; public static final String zkServerPath = "127.0.0.1:2181"; public static final Integer timeout = 5000; private static CountDownLatch latch = new CountDownLatch(1); public ZkNodeOperator() { } private static void init() { try{ zooKeeper = new ZooKeeper(zkServerPath,timeout,new ZkNodeOperator()); latch.await(); }catch (Exception e){ e.printStackTrace(); if(zooKeeper!=null){ try { zooKeeper.close(); }catch (InterruptedException e1){ e1.printStackTrace(); } } } } public static void main(String[] args) throws Exception{ init(); //create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) String ctx = "{\'create\':\'success\'}"; zooKeeper.create("/testnode","haha".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,new CreateNodeCallBack(),ctx); System.in.read(); //线程停在这里,等待回调线程的打印结果 } /** * 观察者 */ @Override public void process(WatchedEvent event) { System.err.println("收到zk的watch通知----" +event.getPath()+"---"+event.getState()); latch.countDown(); } } /** * 创建节点回调通知类 */ class CreateNodeCallBack implements AsyncCallback.StringCallback{ @Override public void processResult(int i, String path, Object ctx, String s1) { System.out.println("【回调方法】:创建节点的路径:"+path); System.out.println("【回调方法】:ctx==="+(String)ctx); } }
【运行结果——控制台打印】
【运行结果——zk客户端连接】
【修改节点的数据】
/** * path:节点路径 * data:修改后的数据 * version:版本号,这里的版本号必须是正要修改的节点数据的版本号!! */ Stat setData(String path, byte[] data, int version)
修改节点代码示例
Stat stat = zooKeeper.setData("/testnode", "Higgin".getBytes(), 1); System.out.println("修改后数据的版本号为---"+stat.getVersion());
【运行结果】
先看下对应的数据版本号信息
看下控制台运行结果
修改下代码,将版本号改为0,与zk上要修改的数据的版本号保持一致
Stat stat = zooKeeper.setData("/testnode", "Higgin".getBytes(), 0); System.out.println("修改后数据的版本号为---"+stat.getVersion());
【查看运行结果】
【删除节点】
/** * 删除节点 * 注意:version版本号必须和要删除的节点数据的版本号一致 */ public void delete(String path, int version)
【删除实例】
zooKeeper.delete("/testnode",1);
【运行结果】
删除前,先看下对应节点数据
执行删除代码后,可以看到节点已经不存在了
【获取节点数据】
public byte[] getData(String path, boolean watch, Stat stat) public byte[] getData(String path, Watcher watcher, Stat stat) public void getData(String path, Watcher watcher, DataCallback cb, Object ctx) public void getData(String path, boolean watch, DataCallback cb, Object ctx)
[获取节点数据实例]
byte[] dateBytes = zooKeeper.getData("/testnode", true, null); String str = new String(dateBytes); System.err.println("获取的数据为==="+str);
[运行结果]
【获取子节点的列表数据】
public List<String> getChildren(String path, Watcher watcher) public List<String> getChildren(String path, boolean watch, Stat stat) public List<String> getChildren(String path, boolean watch, Stat stat) public void getChildren(String path, Watcher watcher, ChildrenCallback cb, Object ctx) ......
[获取子节点列表实例]
List<String> childList = zooKeeper.getChildren("/testnode", null); for (String child : childList) { System.err.println("子节点=="+child); }
[ 运行结果 ]
先看下zk上对应的数据
[ 控制台运行结果 ]
以上是关于06_zookeeper原生Java API使用的主要内容,如果未能解决你的问题,请参考以下文章
Zookeeper系列三:Zookeeper客户端的使用(Zookeeper原生API如何进行调用ZKClientCurator)
ZooKeeper客户端原生API的使用以及ZkClient第三方API的使用