zookeeper api和zkclient api使用
Posted LDDXFS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper api和zkclient api使用相关的知识,希望对你有一定的参考价值。
zookeeper api
--原生api
CreateSession 连接zookeeper
package lddxfs.zkstudy.zkdemo.test001; import lddxfs.zkstudy.zkdemo.Constant; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/20 */ public class CreateSession { private static ZooKeeper zk; public static void main(String[] args) throws Exception { zk = new ZooKeeper(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, new MyWatcher()); //Zookeeper是API提供的1个类,我们连接zk集群,进行相应的znode操作,都是通过ZooKeeper的实例进行,这个实例就是zk client,和命令行客户端是同样的角色 //Zookeeper实例的创建需要传递3个参数 //connectString 代表要连接zk集群服务,通过逗号分隔 Thread.sleep(Integer.MAX_VALUE); } static class MyWatcher implements Watcher { public void process(WatchedEvent event) { // 这个方法只会调用一次,在这个session建立完成调用 if (event.getState() == Event.KeeperState.SyncConnected) { //连接建立事件的处理 System.out.println("Event:" + event); System.out.println("=========Client Connected to zookeeper=========="); } } } }
CreateNode 创建znode
package lddxfs.zkstudy.zkdemo.test002; import lddxfs.zkstudy.zkdemo.Constant; import org.apache.zookeeper.*; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/20 */ public class CreateNode implements Watcher { private static ZooKeeper zk; public static void main(String[] args) throws Exception { zk = new ZooKeeper(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, new CreateNode()); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { //Client段处理连接建立事件,处理动作为添加1个永久节点 if(event.getState()==Event.KeeperState.SyncConnected){ //创建znode节点 try { createNodeSync(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } //create node, synchronized private void createNodeSync() throws KeeperException, InterruptedException { System.out.println("Create node with Sync mode"); String path =zk.create("/node_by_java","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("New Node added: "+path); } }
GetChildrenSync 获取子节点不注册watch
package lddxfs.zkstudy.zkdemo.test003; import lddxfs.zkstudy.zkdemo.Constant; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.util.List; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/20 */ public class GetChildrenSync implements Watcher { private static ZooKeeper zk; public static void main(String[] args) throws Exception { zk=new ZooKeeper(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,new GetChildrenSync()); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { //只在连接建立后,查询 /的子节点列表 if(event.getState()==Event.KeeperState.SyncConnected){ //查询子节点列表 try { getChildranSync(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } //get children ,synchronized private void getChildranSync() throws KeeperException, InterruptedException { System.out.println("Get Childran in sync mode"); //false ,不关注子节点列表的变更事件(不注册watcher) List<String> children=zk.getChildren("/",false); System.out.println("Children list of /: "+children); } }
GetChildrenSync 获取子节点注册watch
package lddxfs.zkstudy.zkdemo.test004; import lddxfs.zkstudy.zkdemo.Constant; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.List; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/20 */ public class GetChildrenSync implements Watcher { private static ZooKeeper zk; public static void main(String[] args) throws IOException, InterruptedException { zk = new ZooKeeper(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, new GetChildrenSync()); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { //子节点列表变化 even的处理 if (event.getType() == Event.EventType.NodeChildrenChanged) { //再次获取子节点列表 try { //event.getPath()返回 哪个znode的子节点列表发生了变化 List<String> childrens = zk.getChildren(event.getPath(), true); System.out.println("NodeChildrenChanged childrens=" + childrens); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } else if (event.getState() == Event.KeeperState.SyncConnected) { try { getChildrenSync(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } private void getChildrenSync() throws KeeperException, InterruptedException { System.out.println("Get Children in sync mode"); List<String> childrens = zk.getChildren("/", true); System.out.println("Children list of / : " + childrens); } }
DeleteNodeSync 删除znode
package lddxfs.zkstudy.zkdemo.test005; import lddxfs.zkstudy.zkdemo.Constant; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.List; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/20 */ public class DeleteNodeSync implements Watcher { private static ZooKeeper zk; public static void main(String[] args) throws IOException, InterruptedException { zk=new ZooKeeper(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,new DeleteNodeSync()); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if(event.getState()== Event.KeeperState.SyncConnected){ if(event.getType()== Event.EventType.None&&event.getPath()==null){ try { deleteNode("/nodeddd"); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } private void deleteNode(String path) throws KeeperException, InterruptedException { System.out.println("Delete Node in sync mode"); zk.delete(path,-1); System.out.println("Node delete :"+path); List<String> children=zk.getChildren("/",false); System.out.println("Children list of / is: "+children); } }
SetDataSync设置数据
package lddxfs.zkstudy.zkdemo.test006; import lddxfs.zkstudy.zkdemo.Constant; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/20 */ public class SetDataSync implements Watcher { private static ZooKeeper zk; public static void main(String[] args) throws IOException, InterruptedException { zk = new ZooKeeper(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, new SetDataSync()); Thread.sleep(Integer.MAX_VALUE); } public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { if (event.getType() == Event.EventType.None && event.getPath() == null) { try { String path = zk.create("/testdata", "12345".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Stat stat = zk.setData(path, "zhonddd".getBytes(), -1); System.out.println(""); byte[] datass = zk.getData(path, false, stat); System.out.println(new String(datass)); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
zkclient api
--是GitHub上一个开源的ZooKeeper客户端 在原生Zookeeper API 接口上进行包装
CreateSession 连接zookeeper
package lddxfs.zkstudy.zkclientdemo.test001; import lddxfs.zkstudy.zkdemo.Constant; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/21 */ //zkclient 是GitHub上一个开源的ZooKeeper客户端 在原生Zookeeper API 接口上进行包装 // 同时在内部实现了session超时重连 ,Watcher反复注册等功能 public class CreateSession { public static void main(String[] args) { ZkClient zkClient=new ZkClient(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,Constant.CONNECTION_TIMEOUT,new SerializableSerializer()); /** * 1)和zookeeper原生API不同 通过zkclient API创建会话 需要提供session timeout,connection timeout两个定时器 * 2)同时要提供 1个序列化器实例,原因在于后续创建zonde节点时,写入的数据(java对象)会自动通过序列化器来转换为byte[] * 3)同理 ,读取出的byte[] 的数据,也会自动通过序列化器直接转换为java对象 */ } }
CreateNode
package lddxfs.zkstudy.zkclientdemo.test002; import lddxfs.zkstudy.zkclientdemo.User; import lddxfs.zkstudy.zkdemo.Constant; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.zookeeper.CreateMode; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/21 */ public class CreateNode { public static void main(String[] args) { ZkClient zkClient=new ZkClient(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,Constant.CONNECTION_TIMEOUT,new SerializableSerializer()); User user=new User(); //直接将数据user写入,自动序列化为byte[] String path= zkClient.create("/node_zkclient",user, CreateMode.PERSISTENT); System.out.println("Create path is:"+path); /** * 通过客户端查看会是这样格式的数据 * [zk: 192.168.10.132:2185(CONNECTED) 3] get /node_zkclient * ��sr lddxfs.zkstudy.zkclientdemo.User-U��t�\'LidtLjava/lang/Integer;LnametLjava/lang/String;xppp */ } }
WriteData
package lddxfs.zkstudy.zkclientdemo.test003; import lddxfs.zkstudy.zkclientdemo.User; import lddxfs.zkstudy.zkdemo.Constant; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.zookeeper.CreateMode; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/21 */ public class WriteData { private static final String PATH = "/node_zkclient2"; public static void main(String[] args) { ZkClient zkClient = new ZkClient(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, Constant.CONNECTION_TIMEOUT, new SerializableSerializer()); User user = new User(1, "zhangsan"); zkClient.create(PATH, user, CreateMode.PERSISTENT); user.setId(2); user.setName("lisi"); zkClient.writeData(PATH, user); } }
ReadData
package lddxfs.zkstudy.zkclientdemo.test004; import lddxfs.zkstudy.zkclientdemo.User; import lddxfs.zkstudy.zkdemo.Constant; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/21 */ public class ReadData { private static final String PATH = "/node_zkclient3"; public static void main(String[] args) { ZkClient zkClient = new ZkClient(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, Constant.SESSION_TIMEOUT, new SerializableSerializer()); User user = new User(1, "wangwu"); String path = zkClient.create(PATH, user, CreateMode.PERSISTENT); System.out.println("Create path is :" + path); Stat stat = new Stat(); user = zkClient.readData(PATH, stat); if (user != null) { System.out.println(user); System.out.println(stat); } } }
GetChildren
package lddxfs.zkstudy.zkclientdemo.test005; import lddxfs.zkstudy.zkclientdemo.User; import lddxfs.zkstudy.zkdemo.Constant; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import java.util.List; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/21 */ public class GetChildren { public static void main(String[] args) { ZkClient zkClient=new ZkClient(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,Constant.CONNECTION_TIMEOUT,new SerializableSerializer()); List<String> children=zkClient.getChildren("/"); System.out.println("Children list of /:"+children); //Children list of /:[testdata, node_zkclient3, dubbo, node_zkclient2, zookeeper, node_zkclient] } }
NodeExist
package lddxfs.zkstudy.zkclientdemo.test006; import lddxfs.zkstudy.zkdemo.Constant; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/21 * 判断节点是否存在 */ public class NodeExist { public static void main(String[] args) { ZkClient zkClient = new ZkClient(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, Constant.CONNECTION_TIMEOUT, new SerializableSerializer()); boolean exist = zkClient.exists("/dubbo"); System.out.println("Node exist status is:" + exist); //Node exist status is:true } }
订阅子节点列表发生变化
package lddxfs.zkstudy.zkclientdemo.test007; import lddxfs.zkstudy.zkdemo.Constant; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/21 * 订阅子节点列表发生变化 */ public class SubscribeChildren { public static void main(String[] args) throws InterruptedException { ZkClient zkClient=new ZkClient(Constant.CONNECT_STRING,Constant.SESSION_TIMEOUT,Constant.CONNECTION_TIMEOUT,new SerializableSerializer()); zkClient.subscribeChildChanges("/888",new ZkChildListener()); Thread.sleep(Integer.MAX_VALUE); /** * 使用Cli.sh连接zk * [zk: 192.168.10.132:2185(CONNECTED) 6] create /888 888 * Created /888 * [zk: 192.168.10.132:2185(CONNECTED) 7] create /888/999 999 * Created /888/999 * [zk: 192.168.10.132:2185(CONNECTED) 8] * 控制台输出 * Parent path is/888 * Current children:[] * Parent path is/888 * Current children:[999] */ } }
package lddxfs.zkstudy.zkclientdemo.test007; import org.I0Itec.zkclient.IZkChildListener; import java.util.List; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/21 */ public class ZkChildListener implements IZkChildListener { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println("Parent path is"+parentPath); System.out.println("Current children:"+currentChilds); } }
订阅数据变化
package lddxfs.zkstudy.zkclientdemo.test008; import lddxfs.zkstudy.zkdemo.Constant; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/21 * 订阅数据变化 */ public class SubscribeData { public static void main(String[] args) throws InterruptedException { //使用了新的序列化器,zk命令行写入的数据才能被检查 ZkClient zkClient = new ZkClient(Constant.CONNECT_STRING, Constant.SESSION_TIMEOUT, Constant.CONNECTION_TIMEOUT, new BytesPushThroughSerializer()); zkClient.subscribeDataChanges("/node_zkclient", new ZkDataListener()); Thread.sleep(Integer.MAX_VALUE); /** * [zk: 192.168.10.132:2185(CONNECTED) 10] set /node_zkclient ddd * data change dataPath:/node_zkclient * data change data:[B@11d2adb4 * */ } }
package lddxfs.zkstudy.zkclientdemo.test008; import org.I0Itec.zkclient.IZkDataListener; /** * Author:lddxfs(lddxfs@qq.com;https://www.cnblogs.com/LDDXFS/) * Date:2018/10/21 */ public class ZkDataListener implements IZkDataListener { public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println("data change dataPath:"+dataPath); System.out.println("data change data:"+data); } public void handleDataDeleted(String dataPath) throws Exception { System.out.println("deleted data dataPath:"+dataPath); } }
以上是关于zookeeper api和zkclient api使用的主要内容,如果未能解决你的问题,请参考以下文章
zookeeper客户端使用第三方(zkclient)封装的Api操作节点