六:ZooKeeper的java客户端api的使用
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了六:ZooKeeper的java客户端api的使用相关的知识,希望对你有一定的参考价值。
一:客户端链接测试
1 package com.yeepay.sxf.createConnection; 2 3 import java.io.IOException; 4 5 import org.apache.zookeeper.ZooKeeper; 6 import org.apache.zookeeper.ZooKeeper.States; 7 8 /** 9 * 测试Zookeeper的链接 10 * @author sxf 11 * 12 */ 13 public class TestCreateSession { 14 15 //zooKeeper实例 16 private static ZooKeeper zooKeeper; 17 18 public static void main(String[] args) throws IOException, InterruptedException { 19 20 //实例化zooKeeper的实例 21 //参数:(ip地址:端口号 ,当前会话超时时间,自定义事件监听器) 22 zooKeeper=new ZooKeeper("10.151.30.75:2181",5000, new MyWatcher()); 23 24 //获取链接状态 25 States states=zooKeeper.getState(); 26 27 //此链接为异步链接 28 System.out.println("TestCreateSession.main(链接状态):"+states.toString());//CONNECTING 29 30 Thread.sleep(Integer.MAX_VALUE); 31 } 32 } 33 34 35 36 37 38 package com.yeepay.sxf.createConnection; 39 40 import org.apache.zookeeper.WatchedEvent; 41 import org.apache.zookeeper.Watcher; 42 import org.apache.zookeeper.Watcher.Event.KeeperState; 43 /** 44 * zookeeper实例过程中的事件监听器 45 * @author sxf 46 * 47 */ 48 public class MyWatcher implements Watcher{ 49 50 51 52 //该方法可以做相关的逻辑代码 53 @Override 54 public void process(WatchedEvent event) { 55 //MyWatcher.process(接收到的事件:)WatchedEvent state:SyncConnected type:None path:null 56 System.out.println("MyWatcher.process(接收到的事件:)"+event); 57 58 //如果链接成功可以做一些事情 59 if(event.getState()==KeeperState.SyncConnected){ 60 System.out.println("MyWatcher.process(链接成功做一些事情:)"); 61 } 62 63 } 64 65 66 }
二:客户端创建节点测试
1 package com.yeepay.sxf.createNode; 2 3 import java.io.IOException; 4 import java.security.NoSuchAlgorithmException; 5 import java.util.ArrayList; 6 import java.util.List; 7 8 import org.apache.zookeeper.CreateMode; 9 import org.apache.zookeeper.KeeperException; 10 import org.apache.zookeeper.WatchedEvent; 11 import org.apache.zookeeper.Watcher; 12 import org.apache.zookeeper.ZooKeeper; 13 import org.apache.zookeeper.Watcher.Event.KeeperState; 14 import org.apache.zookeeper.ZooDefs.Ids; 15 import org.apache.zookeeper.ZooDefs.Perms; 16 import org.apache.zookeeper.data.ACL; 17 import org.apache.zookeeper.data.Id; 18 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; 19 /** 20 * zooKeeper同步创建节点测试 21 * @author sxf 22 * 23 */ 24 public class TestCreateNodeSyn implements Watcher { 25 26 private static ZooKeeper zooKeeper; 27 28 public static void main(String[] args) throws IOException, InterruptedException, NoSuchAlgorithmException { 29 // //实例化zooKeeper链接 30 // zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestCreateNodeSyn()); 31 // //创建链接 32 // Thread.sleep(Integer.MAX_VALUE); 33 34 System.out.println(DigestAuthenticationProvider.generateDigest("shangxiaofei:shangxiaofei")); 35 } 36 37 /** 38 *权限模式(scheme): ip,digest 39 *授权对象(ID): 40 * ip权限模式:具体的ip地址 41 * digest权限模式:username:Base64(SHA-1(username:password)) 42 * 43 *权限(permission):CREATE(C),DELETE(D),READ(R),WRITE(W),ADMIN(A) 44 * 注:单个权限,完全权限,复合权限 45 * 46 *权限组合:scheme+ID+permission 47 */ 48 49 public void process(WatchedEvent event) { 50 //链接成功 51 if(event.getState()==KeeperState.SyncConnected){ 52 //同步创建节点 53 try { 54 //基于ip的权限,意味着这个ip的客户端对此节点有读取权限 55 ACL ipacl=new ACL(Perms.READ, new Id("ip", "10.151.30.75")); 56 //基于digest的权限,意味着只有这个用户名和密码的客户端才能读取和写的权限 57 ACL digetacl=new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("shangxiaofei:shangxiaofei"))); 58 59 List<ACL> myaclAcls=new ArrayList<ACL>(); 60 myaclAcls.add(ipacl); 61 myaclAcls.add(digetacl); 62 63 String path=zooKeeper.create("/node_128", "shangxiaofei".getBytes(), myaclAcls, CreateMode.PERSISTENT); 64 System.out.println("MyWatcher2.process(创建节点返回的路径:)"+path); 65 }catch (NoSuchAlgorithmException e){ 66 e.printStackTrace(); 67 } catch (KeeperException e) { 68 // TODO Auto-generated catch block 69 e.printStackTrace(); 70 } catch (InterruptedException e) { 71 // TODO Auto-generated catch block 72 e.printStackTrace(); 73 } 74 } 75 76 } 77 78 79 80 81 } 82 83 84 85 86 87 88 package com.yeepay.sxf.createNode; 89 90 import java.io.IOException; 91 92 import org.apache.zookeeper.AsyncCallback; 93 import org.apache.zookeeper.CreateMode; 94 import org.apache.zookeeper.WatchedEvent; 95 import org.apache.zookeeper.Watcher; 96 import org.apache.zookeeper.Watcher.Event.KeeperState; 97 import org.apache.zookeeper.ZooDefs.Ids; 98 import org.apache.zookeeper.ZooKeeper; 99 /** 100 * zooKeeper异步创建节点测试 101 * @author sxf 102 * 103 */ 104 public class TestCreateNodeAsyn implements Watcher { 105 106 private static ZooKeeper zooKeeper; 107 108 public static void main(String[] args) throws IOException, InterruptedException { 109 //实例化zooKeeper链接 110 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestCreateNodeAsyn()); 111 //创建链接 112 Thread.sleep(Integer.MAX_VALUE); 113 } 114 115 @Override 116 public void process(WatchedEvent event) { 117 //链接成功 118 if(event.getState()==KeeperState.SyncConnected){ 119 //异步创建节点 120 zooKeeper.create("/node_124", "shangxiaoshuai".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,new ISstringCallBack(),"sxf创建"); 121 } 122 123 } 124 125 126 //该类数据异步回调接口实现 127 static class ISstringCallBack implements AsyncCallback.StringCallback{ 128 129 //创建成功rc=0 130 //path,创建节点的路径 131 //ctx,创建节点的传入的上下位 132 //name,创建节点的名字 133 @Override 134 public void processResult(int rc, String path, Object ctx, String name) { 135 StringBuffer sb=new StringBuffer(); 136 sb.append("rc="+rc).append("\n"); 137 sb.append("path="+path).append("\n"); 138 sb.append("ctx="+ctx).append("\n"); 139 sb.append("name="+name); 140 System.out.println(sb.toString()); 141 /** 142 *rc=0 143 *path=/node_124 144 *ctx=sxf创建 145 *name=/node_124 146 * 147 */ 148 } 149 150 } 151 152 }
三:客户端删除节点测试
1 package com.yeepay.sxf.deleteNode; 2 3 import java.io.IOException; 4 5 import org.apache.zookeeper.WatchedEvent; 6 import org.apache.zookeeper.Watcher; 7 import org.apache.zookeeper.Watcher.Event.EventType; 8 import org.apache.zookeeper.Watcher.Event.KeeperState; 9 import org.apache.zookeeper.ZooKeeper; 10 11 /** 12 *同步删除节点中存取的值 13 * @author sxf 14 * 15 */ 16 public class TestDeleteNodeSyn implements Watcher{ 17 18 private static ZooKeeper zooKeeper; 19 20 21 public static void main(String[] args) throws IOException, InterruptedException { 22 //实例化zooKeeper链接 23 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestDeleteNodeSyn()); 24 //创建链接 25 Thread.sleep(Integer.MAX_VALUE); 26 } 27 28 /** 29 * event 30 * (1)zookeeper链接状态 event.getState() 31 * (2)zookeeper事件类型 event.getType() 32 * (3)zookeeper事件触发的节点路径 event.getPath() 33 */ 34 @Override 35 public void process(WatchedEvent event) { 36 try{ 37 //已经连接做一件事情 38 if(event.getState()==KeeperState.SyncConnected){ 39 40 if(event.getType()==EventType.None&&event.getPath()==null){ 41 //删除一个节点 42 //第一个参数:删除节点的全路径 43 //第二个参素:节点的版本(类似乐观锁),当为-1时,对版本无限制 44 zooKeeper.delete("/node_127", -1); 45 }else if(event.getType()==EventType.NodeDataChanged){ 46 47 } 48 49 } 50 51 }catch(Exception e){ 52 e.printStackTrace(); 53 } 54 55 56 57 } 58 59 60 } 61 62 63 64 65 package com.yeepay.sxf.deleteNode; 66 67 import java.io.IOException; 68 69 import org.apache.zookeeper.AsyncCallback; 70 import org.apache.zookeeper.WatchedEvent; 71 import org.apache.zookeeper.Watcher; 72 import org.apache.zookeeper.Watcher.Event.EventType; 73 import org.apache.zookeeper.Watcher.Event.KeeperState; 74 import org.apache.zookeeper.ZooKeeper; 75 76 /** 77 *异步删除节点 78 * @author sxf 79 * 80 */ 81 public class TestDeleteNodeAsyn implements Watcher{ 82 83 private static ZooKeeper zooKeeper; 84 85 86 public static void main(String[] args) throws IOException, InterruptedException { 87 //实例化zooKeeper链接 88 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestDeleteNodeAsyn()); 89 //创建链接 90 Thread.sleep(Integer.MAX_VALUE); 91 } 92 93 /** 94 * event 95 * (1)zookeeper链接状态 event.getState() 96 * (2)zookeeper事件类型 event.getType() 97 * (3)zookeeper事件触发的节点路径 event.getPath() 98 */ 99 @Override 100 public void process(WatchedEvent event) { 101 try{ 102 //已经连接做一件事情 103 if(event.getState()==KeeperState.SyncConnected){ 104 105 if(event.getType()==EventType.None&&event.getPath()==null){ 106 //第一个参数:要删除的节点的全路径 107 //第二个参数:要删除节点的版本号(类似乐观锁) 108 //第三个参数:异步删除的回调实现类 109 //第四个参数:删除传入的上下文 110 zooKeeper.delete("/node_126", -1, new IsDeleteCallBack(), "删除节点sxf"); 111 112 } 113 114 } 115 116 }catch(Exception e){ 117 e.printStackTrace(); 118 } 119 120 121 122 } 123 124 /** 125 * 异步删除回调接口实现类 126 * @author sxf 127 * 128 */ 129 static class IsDeleteCallBack implements AsyncCallback.VoidCallback{ 130 131 //第一个参数:返回删除成功rc 132 @Override 133 public void processResult(int rc, String path, Object ctx) { 134 StringBuffer sb=new StringBuffer(); 135 sb.append("rc="+rc).append("\n"); 136 sb.append("path="+path).append("\n"); 137 sb.append("ctx="+ctx).append("\n"); 138 System.out.println(sb.toString()); 139 /** 140 * rc=0 141 * path=/node_126 142 * ctx=删除节点sxf 143 */ 144 } 145 146 } 147 148 }
四:客户端判断节点是否存在测试
1 package com.yeepay.sxf.existsNode; 2 3 import java.io.IOException; 4 5 import org.apache.zookeeper.WatchedEvent; 6 import org.apache.zookeeper.Watcher; 7 import org.apache.zookeeper.Watcher.Event.EventType; 8 import org.apache.zookeeper.Watcher.Event.KeeperState; 9 import org.apache.zookeeper.ZooKeeper; 10 import org.apache.zookeeper.data.Stat; 11 12 /** 13 *同步判断节点是否存在 14 * @author sxf 15 * 16 */ 17 public class TestExistsNodeSyn implements Watcher{ 18 19 private static ZooKeeper zooKeeper; 20 21 22 public static void main(String[] args) throws IOException, InterruptedException { 23 //实例化zooKeeper链接 24 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestExistsNodeSyn()); 25 //创建链接 26 Thread.sleep(Integer.MAX_VALUE); 27 } 28 29 /** 30 * event 31 * (1)zookeeper链接状态 event.getState() 32 * (2)zookeeper事件类型 event.getType() 33 * (3)zookeeper事件触发的节点路径 event.getPath() 34 */ 35 @Override 36 public void process(WatchedEvent event) { 37 try{ 38 //已经连接做一件事情 39 if(event.getState()==KeeperState.SyncConnected){ 40 41 if(event.getType()==EventType.None&&event.getPath()==null){ 42 //第一个参数:判断节点的全路径 43 //第二个参数:是否注册监听器 44 Stat stat=zooKeeper.exists("/node_123",true); 45 System.out.println(stat); 46 }else if(event.getType()==EventType.NodeDataChanged){ 47 //节点数据改变 48 Stat stat=zooKeeper.exists(event.getPath(),true); 49 System.out.println("节点数据改变=>"+stat); 50 }else if(event.getType()==EventType.NodeCreated){ 51 //节点被创建 52 Stat stat=zooKeeper.exists(event.getPath(),true); 53 System.out.println("节点被创建=>"+stat); 54 }else if(event.getType()==EventType.NodeDeleted){ 55 //节点被删除 56 Stat stat=zooKeeper.exists(event.getPath(),true); 57 System.out.println("节点被删除=>"+stat); 58 } 59 60 } 61 62 }catch(Exception e){ 63 e.printStackTrace(); 64 } 65 66 67 68 } 69 70 71 } 72 73 74 75 76 package com.yeepay.sxf.existsNode; 77 78 import java.io.IOException; 79 80 import org.apache.zookeeper.AsyncCallback; 81 import org.apache.zookeeper.WatchedEvent; 82 import org.apache.zookeeper.Watcher; 83 import org.apache.zookeeper.Watcher.Event.EventType; 84 import org.apache.zookeeper.Watcher.Event.KeeperState; 85 import org.apache.zookeeper.ZooKeeper; 86 import org.apache.zookeeper.data.Stat; 87 88 /** 89 *异步判断节点是否存在 90 * @author sxf 91 * 92 */ 93 public class TestExistsNodeAsyn implements Watcher{ 94 95 private static ZooKeeper zooKeeper; 96 97 98 public static void main(String[] args) throws IOException, InterruptedException { 99 //实例化zooKeeper链接 100 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestExistsNodeAsyn()); 101 //创建链接 102 Thread.sleep(Integer.MAX_VALUE); 103 } 104 105 /** 106 * event 107 * (1)zookeeper链接状态 event.getState() 108 * (2)zookeeper事件类型 event.getType() 109 * (3)zookeeper事件触发的节点路径 event.getPath() 110 */ 111 @Override 112 public void process(WatchedEvent event) { 113 try{ 114 //已经连接做一件事情 115 if(event.getState()==KeeperState.SyncConnected){ 116 117 if(event.getType()==EventType.None&&event.getPath()==null){ 118 zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在"); 119 }else if(event.getType()==EventType.NodeDataChanged){ 120 //节点数据改变 121 zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在"); 122 System.out.println("TestExistsNodeAsyn.process(节点数据改变)"); 123 }else if(event.getType()==EventType.NodeCreated){ 124 //节点被创建 125 zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在"); 126 System.out.println("TestExistsNodeAsyn.process(节点被创建)"); 127 }else if(event.getType()==EventType.NodeDeleted){ 128 //节点被删除 129 zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在"); 130 System.out.println("TestExistsNodeAsyn.process(节点被删除)"); 131 } 132 133 } 134 135 }catch(Exception e){ 136 e.printStackTrace(); 137 } 138 139 140 141 }
以上是关于六:ZooKeeper的java客户端api的使用的主要内容,如果未能解决你的问题,请参考以下文章
curator-api及zookeeper-java-api的使用总结
使用ZooKeeper提供的Java API操作ZooKeeper