六:ZooKeeper的java客户端api的使用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了六:ZooKeeper的java客户端api的使用相关的知识,希望对你有一定的参考价值。

一:客户端链接测试

技术分享
 1 package com.yeepay.sxf.createConnection;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.zookeeper.ZooKeeper;
 6 import org.apache.zookeeper.ZooKeeper.States;
 7 
 8 /**
 9  * 测试Zookeeper的链接
10  * @author sxf
11  *
12  */
13 public class TestCreateSession {
14 
15     //zooKeeper实例
16     private static ZooKeeper zooKeeper;
17     
18     public static void main(String[] args) throws IOException, InterruptedException {
19         
20         //实例化zooKeeper的实例
21         //参数:(ip地址:端口号  ,当前会话超时时间,自定义事件监听器)
22         zooKeeper=new ZooKeeper("10.151.30.75:2181",5000, new MyWatcher());
23         
24         //获取链接状态
25         States states=zooKeeper.getState();
26         
27         //此链接为异步链接
28         System.out.println("TestCreateSession.main(链接状态):"+states.toString());//CONNECTING
29         
30         Thread.sleep(Integer.MAX_VALUE);
31     }
32 }
33 
34 
35 
36 
37 
38 package com.yeepay.sxf.createConnection;
39 
40 import org.apache.zookeeper.WatchedEvent;
41 import org.apache.zookeeper.Watcher;
42 import org.apache.zookeeper.Watcher.Event.KeeperState;
43 /**
44  * zookeeper实例过程中的事件监听器
45  * @author sxf
46  *
47  */
48 public class MyWatcher implements Watcher{
49 
50 
51     
52     //该方法可以做相关的逻辑代码
53     @Override
54     public void process(WatchedEvent event) {
55         //MyWatcher.process(接收到的事件:)WatchedEvent state:SyncConnected type:None path:null
56         System.out.println("MyWatcher.process(接收到的事件:)"+event);
57         
58         //如果链接成功可以做一些事情
59         if(event.getState()==KeeperState.SyncConnected){
60             System.out.println("MyWatcher.process(链接成功做一些事情:)");
61         }
62         
63     }
64 
65     
66 }
View Code

二:客户端创建节点测试

技术分享
  1 package com.yeepay.sxf.createNode;
  2 
  3 import java.io.IOException;
  4 import java.security.NoSuchAlgorithmException;
  5 import java.util.ArrayList;
  6 import java.util.List;
  7 
  8 import org.apache.zookeeper.CreateMode;
  9 import org.apache.zookeeper.KeeperException;
 10 import org.apache.zookeeper.WatchedEvent;
 11 import org.apache.zookeeper.Watcher;
 12 import org.apache.zookeeper.ZooKeeper;
 13 import org.apache.zookeeper.Watcher.Event.KeeperState;
 14 import org.apache.zookeeper.ZooDefs.Ids;
 15 import org.apache.zookeeper.ZooDefs.Perms;
 16 import org.apache.zookeeper.data.ACL;
 17 import org.apache.zookeeper.data.Id;
 18 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 19 /**
 20  * zooKeeper同步创建节点测试
 21  * @author sxf
 22  *
 23  */
 24 public class TestCreateNodeSyn implements Watcher {
 25 
 26     private static ZooKeeper zooKeeper;
 27     
 28     public static void main(String[] args) throws IOException, InterruptedException, NoSuchAlgorithmException {
 29 //        //实例化zooKeeper链接
 30 //        zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestCreateNodeSyn());
 31 //        //创建链接
 32 //        Thread.sleep(Integer.MAX_VALUE);
 33         
 34         System.out.println(DigestAuthenticationProvider.generateDigest("shangxiaofei:shangxiaofei"));
 35     }
 36 
 37     /**
 38      *权限模式(scheme): ip,digest
 39      *授权对象(ID):
 40      *            ip权限模式:具体的ip地址
 41      *            digest权限模式:username:Base64(SHA-1(username:password))
 42      *
 43      *权限(permission):CREATE(C),DELETE(D),READ(R),WRITE(W),ADMIN(A)
 44      *        注:单个权限,完全权限,复合权限
 45      *
 46      *权限组合:scheme+ID+permission
 47      */
 48     
 49     public void process(WatchedEvent event) {
 50         //链接成功
 51                 if(event.getState()==KeeperState.SyncConnected){
 52                     //同步创建节点
 53                     try {
 54                         //基于ip的权限,意味着这个ip的客户端对此节点有读取权限
 55                         ACL ipacl=new ACL(Perms.READ, new Id("ip", "10.151.30.75"));
 56                         //基于digest的权限,意味着只有这个用户名和密码的客户端才能读取和写的权限
 57                         ACL digetacl=new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("shangxiaofei:shangxiaofei")));
 58                     
 59                         List<ACL> myaclAcls=new ArrayList<ACL>();
 60                         myaclAcls.add(ipacl);
 61                         myaclAcls.add(digetacl);
 62                         
 63                         String path=zooKeeper.create("/node_128", "shangxiaofei".getBytes(), myaclAcls, CreateMode.PERSISTENT);
 64                         System.out.println("MyWatcher2.process(创建节点返回的路径:)"+path);
 65                     }catch (NoSuchAlgorithmException e){
 66                         e.printStackTrace();
 67                     } catch (KeeperException e) {
 68                         // TODO Auto-generated catch block
 69                         e.printStackTrace();
 70                     } catch (InterruptedException e) {
 71                         // TODO Auto-generated catch block
 72                         e.printStackTrace();
 73                     }
 74                 }
 75                 
 76     }
 77     
 78     
 79     
 80     
 81 }
 82 
 83 
 84 
 85 
 86 
 87 
 88 package com.yeepay.sxf.createNode;
 89 
 90 import java.io.IOException;
 91 
 92 import org.apache.zookeeper.AsyncCallback;
 93 import org.apache.zookeeper.CreateMode;
 94 import org.apache.zookeeper.WatchedEvent;
 95 import org.apache.zookeeper.Watcher;
 96 import org.apache.zookeeper.Watcher.Event.KeeperState;
 97 import org.apache.zookeeper.ZooDefs.Ids;
 98 import org.apache.zookeeper.ZooKeeper;
 99 /**
100  * zooKeeper异步创建节点测试
101  * @author sxf
102  *
103  */
104 public class TestCreateNodeAsyn implements Watcher {
105 
106     private static ZooKeeper zooKeeper;
107     
108     public static void main(String[] args) throws IOException, InterruptedException {
109         //实例化zooKeeper链接
110         zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestCreateNodeAsyn());
111         //创建链接
112         Thread.sleep(Integer.MAX_VALUE);
113     }
114 
115     @Override
116     public void process(WatchedEvent event) {
117         //链接成功
118                 if(event.getState()==KeeperState.SyncConnected){
119                         //异步创建节点
120                         zooKeeper.create("/node_124", "shangxiaoshuai".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,new ISstringCallBack(),"sxf创建");
121                 }
122                 
123     }
124     
125     
126     //该类数据异步回调接口实现
127     static class ISstringCallBack implements AsyncCallback.StringCallback{
128 
129         //创建成功rc=0
130         //path,创建节点的路径
131         //ctx,创建节点的传入的上下位
132         //name,创建节点的名字
133         @Override
134         public void processResult(int rc, String path, Object ctx, String name) {
135             StringBuffer sb=new StringBuffer();
136             sb.append("rc="+rc).append("\n");
137             sb.append("path="+path).append("\n");
138             sb.append("ctx="+ctx).append("\n");
139             sb.append("name="+name);
140             System.out.println(sb.toString());
141             /**
142              *rc=0
143              *path=/node_124
144              *ctx=sxf创建
145              *name=/node_124
146              *
147              */
148         }
149         
150     }
151     
152 }
View Code

三:客户端删除节点测试

技术分享
  1 package com.yeepay.sxf.deleteNode;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.zookeeper.WatchedEvent;
  6 import org.apache.zookeeper.Watcher;
  7 import org.apache.zookeeper.Watcher.Event.EventType;
  8 import org.apache.zookeeper.Watcher.Event.KeeperState;
  9 import org.apache.zookeeper.ZooKeeper;
 10 
 11 /**
 12  *同步删除节点中存取的值
 13  * @author sxf
 14  *
 15  */
 16 public class TestDeleteNodeSyn implements Watcher{
 17 
 18     private static ZooKeeper zooKeeper;
 19     
 20     
 21     public static void main(String[] args) throws IOException, InterruptedException {
 22                 //实例化zooKeeper链接
 23                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestDeleteNodeSyn());
 24                 //创建链接
 25                 Thread.sleep(Integer.MAX_VALUE);
 26     }
 27 
 28     /**
 29      * event
 30      * (1)zookeeper链接状态 event.getState()
 31      * (2)zookeeper事件类型 event.getType()
 32      * (3)zookeeper事件触发的节点路径 event.getPath()
 33      */
 34     @Override
 35     public void process(WatchedEvent event) {
 36         try{
 37             //已经连接做一件事情
 38             if(event.getState()==KeeperState.SyncConnected){
 39                 
 40                 if(event.getType()==EventType.None&&event.getPath()==null){
 41                     //删除一个节点
 42                     //第一个参数:删除节点的全路径
 43                     //第二个参素:节点的版本(类似乐观锁),当为-1时,对版本无限制
 44                     zooKeeper.delete("/node_127", -1);
 45                 }else if(event.getType()==EventType.NodeDataChanged){
 46                     
 47                 }
 48                 
 49             }
 50             
 51         }catch(Exception e){
 52             e.printStackTrace();
 53         }
 54         
 55         
 56         
 57     }
 58     
 59     
 60 }
 61 
 62 
 63 
 64 
 65 package com.yeepay.sxf.deleteNode;
 66 
 67 import java.io.IOException;
 68 
 69 import org.apache.zookeeper.AsyncCallback;
 70 import org.apache.zookeeper.WatchedEvent;
 71 import org.apache.zookeeper.Watcher;
 72 import org.apache.zookeeper.Watcher.Event.EventType;
 73 import org.apache.zookeeper.Watcher.Event.KeeperState;
 74 import org.apache.zookeeper.ZooKeeper;
 75 
 76 /**
 77  *异步删除节点
 78  * @author sxf
 79  *
 80  */
 81 public class TestDeleteNodeAsyn implements Watcher{
 82 
 83     private static ZooKeeper zooKeeper;
 84     
 85     
 86     public static void main(String[] args) throws IOException, InterruptedException {
 87                 //实例化zooKeeper链接
 88                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestDeleteNodeAsyn());
 89                 //创建链接
 90                 Thread.sleep(Integer.MAX_VALUE);
 91     }
 92 
 93     /**
 94      * event
 95      * (1)zookeeper链接状态 event.getState()
 96      * (2)zookeeper事件类型 event.getType()
 97      * (3)zookeeper事件触发的节点路径 event.getPath()
 98      */
 99     @Override
100     public void process(WatchedEvent event) {
101         try{
102             //已经连接做一件事情
103             if(event.getState()==KeeperState.SyncConnected){
104                 
105                 if(event.getType()==EventType.None&&event.getPath()==null){
106                     //第一个参数:要删除的节点的全路径
107                     //第二个参数:要删除节点的版本号(类似乐观锁)
108                     //第三个参数:异步删除的回调实现类
109                     //第四个参数:删除传入的上下文
110                     zooKeeper.delete("/node_126", -1, new IsDeleteCallBack(), "删除节点sxf");
111                     
112                 }
113                 
114             }
115             
116         }catch(Exception e){
117             e.printStackTrace();
118         }
119         
120         
121         
122     }
123 
124     /**
125      * 异步删除回调接口实现类
126      * @author sxf
127      *
128      */
129     static class IsDeleteCallBack implements AsyncCallback.VoidCallback{
130         
131         //第一个参数:返回删除成功rc
132         @Override
133         public void processResult(int rc, String path, Object ctx) {
134             StringBuffer sb=new StringBuffer();
135             sb.append("rc="+rc).append("\n");
136             sb.append("path="+path).append("\n");
137             sb.append("ctx="+ctx).append("\n");
138             System.out.println(sb.toString());
139             /**
140              * rc=0
141              *    path=/node_126
142              *    ctx=删除节点sxf
143              */
144         }
145         
146     }
147     
148 }
View Code

四:客户端判断节点是否存在测试

技术分享
  1 package com.yeepay.sxf.existsNode;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.zookeeper.WatchedEvent;
  6 import org.apache.zookeeper.Watcher;
  7 import org.apache.zookeeper.Watcher.Event.EventType;
  8 import org.apache.zookeeper.Watcher.Event.KeeperState;
  9 import org.apache.zookeeper.ZooKeeper;
 10 import org.apache.zookeeper.data.Stat;
 11 
 12 /**
 13  *同步判断节点是否存在
 14  * @author sxf
 15  *
 16  */
 17 public class TestExistsNodeSyn implements Watcher{
 18 
 19     private static ZooKeeper zooKeeper;
 20     
 21     
 22     public static void main(String[] args) throws IOException, InterruptedException {
 23                 //实例化zooKeeper链接
 24                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestExistsNodeSyn());
 25                 //创建链接
 26                 Thread.sleep(Integer.MAX_VALUE);
 27     }
 28 
 29     /**
 30      * event
 31      * (1)zookeeper链接状态 event.getState()
 32      * (2)zookeeper事件类型 event.getType()
 33      * (3)zookeeper事件触发的节点路径 event.getPath()
 34      */
 35     @Override
 36     public void process(WatchedEvent event) {
 37         try{
 38             //已经连接做一件事情
 39             if(event.getState()==KeeperState.SyncConnected){
 40                 
 41                 if(event.getType()==EventType.None&&event.getPath()==null){
 42                     //第一个参数:判断节点的全路径
 43                     //第二个参数:是否注册监听器
 44                     Stat stat=zooKeeper.exists("/node_123",true);
 45                     System.out.println(stat);
 46                 }else if(event.getType()==EventType.NodeDataChanged){
 47                     //节点数据改变
 48                     Stat stat=zooKeeper.exists(event.getPath(),true);
 49                     System.out.println("节点数据改变=>"+stat);
 50                 }else if(event.getType()==EventType.NodeCreated){
 51                     //节点被创建
 52                     Stat stat=zooKeeper.exists(event.getPath(),true);
 53                     System.out.println("节点被创建=>"+stat);
 54                 }else if(event.getType()==EventType.NodeDeleted){
 55                     //节点被删除
 56                     Stat stat=zooKeeper.exists(event.getPath(),true);
 57                     System.out.println("节点被删除=>"+stat);
 58                 }
 59                 
 60             }
 61             
 62         }catch(Exception e){
 63             e.printStackTrace();
 64         }
 65         
 66         
 67         
 68     }
 69     
 70     
 71 }
 72 
 73 
 74 
 75 
 76 package com.yeepay.sxf.existsNode;
 77 
 78 import java.io.IOException;
 79 
 80 import org.apache.zookeeper.AsyncCallback;
 81 import org.apache.zookeeper.WatchedEvent;
 82 import org.apache.zookeeper.Watcher;
 83 import org.apache.zookeeper.Watcher.Event.EventType;
 84 import org.apache.zookeeper.Watcher.Event.KeeperState;
 85 import org.apache.zookeeper.ZooKeeper;
 86 import org.apache.zookeeper.data.Stat;
 87 
 88 /**
 89  *异步判断节点是否存在
 90  * @author sxf
 91  *
 92  */
 93 public class TestExistsNodeAsyn implements Watcher{
 94 
 95     private static ZooKeeper zooKeeper;
 96     
 97     
 98     public static void main(String[] args) throws IOException, InterruptedException {
 99                 //实例化zooKeeper链接
100                 zooKeeper=new ZooKeeper("10.151.30.75:2181", 5000,new TestExistsNodeAsyn());
101                 //创建链接
102                 Thread.sleep(Integer.MAX_VALUE);
103     }
104 
105     /**
106      * event
107      * (1)zookeeper链接状态 event.getState()
108      * (2)zookeeper事件类型 event.getType()
109      * (3)zookeeper事件触发的节点路径 event.getPath()
110      */
111     @Override
112     public void process(WatchedEvent event) {
113         try{
114             //已经连接做一件事情
115             if(event.getState()==KeeperState.SyncConnected){
116                 
117                 if(event.getType()==EventType.None&&event.getPath()==null){
118                         zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在");
119                 }else if(event.getType()==EventType.NodeDataChanged){
120                     //节点数据改变
121                     zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在");
122                     System.out.println("TestExistsNodeAsyn.process(节点数据改变)");
123                 }else if(event.getType()==EventType.NodeCreated){
124                     //节点被创建
125                     zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在");
126                     System.out.println("TestExistsNodeAsyn.process(节点被创建)");
127                 }else if(event.getType()==EventType.NodeDeleted){
128                     //节点被删除
129                     zooKeeper.exists("/node_123", true, new IsStatCallBack(), "sxf判断节点是否存在");
130                     System.out.println("TestExistsNodeAsyn.process(节点被删除)");
131                 }
132                 
133             }
134             
135         }catch(Exception e){
136             e.printStackTrace();
137         }
138         
139         
140         
141     }

以上是关于六:ZooKeeper的java客户端api的使用的主要内容,如果未能解决你的问题,请参考以下文章

curator-api及zookeeper-java-api的使用总结

06_zookeeper原生Java API使用

使用ZooKeeper提供的Java API操作ZooKeeper

Zookeeper3.5.7版本——客户端 API 操作(代码示例)

zookeeper的Java端API应用

zookeeper_03:Java 客户端(原生API)