八:Zookeeper开源客户端Curator的api测试

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了八:Zookeeper开源客户端Curator的api测试相关的知识,希望对你有一定的参考价值。

curator是Netflix公司开源的一套ZooKeeper客户端,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作。包括连接重连,反复注册Watcher等。实现了Fluent风格的API接口,目前已经为Apache的顶级项目,是全世界使用最广泛的ZooKeeper客户端之一

第一:maven依赖

技术分享
 1 <dependency>
 2       <groupId>org.apache.curator</groupId>
 3       <artifactId>curator-framework</artifactId>
 4       <version>2.8.0</version>
 5     </dependency>
 6     <dependency>
 7       <groupId>org.apache.curator</groupId>
 8       <artifactId>curator-recipes</artifactId>
 9       <version>2.8.0</version>
10     </dependency>
View Code

第二:测试的代码

技术分享
  1 package com.yeepay.sxf.testrurator;
  2 
  3 import java.util.List;
  4 import java.util.concurrent.ExecutorService;
  5 import java.util.concurrent.Executors;
  6 
  7 import org.apache.curator.RetryPolicy;
  8 import org.apache.curator.framework.CuratorFramework;
  9 import org.apache.curator.framework.CuratorFrameworkFactory;
 10 import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
 11 import org.apache.curator.framework.api.BackgroundCallback;
 12 import org.apache.curator.framework.api.CreateBuilder;
 13 import org.apache.curator.framework.api.CuratorEvent;
 14 import org.apache.curator.framework.api.CuratorEventType;
 15 import org.apache.curator.framework.api.DeleteBuilder;
 16 import org.apache.curator.framework.api.ExistsBuilder;
 17 import org.apache.curator.framework.api.GetChildrenBuilder;
 18 import org.apache.curator.framework.api.GetDataBuilder;
 19 import org.apache.curator.framework.api.SetDataBuilder;
 20 import org.apache.curator.framework.recipes.cache.NodeCache;
 21 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 22 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 23 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 24 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
 25 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 26 import org.apache.curator.retry.ExponentialBackoffRetry;
 27 import org.apache.curator.retry.RetryNTimes;
 28 import org.apache.curator.retry.RetryUntilElapsed;
 29 import org.apache.zookeeper.CreateMode;
 30 import org.apache.zookeeper.data.Stat;
 31 
 32 /**
 33  * 测试Rurator客户端测试
 34  * @author sxf
 35  *
 36  */
 37 public class TestRurator {
 38     
 39     //创建一个线程池,让异步调用的的回调方法,申请线程池中的资源
 40     private static ExecutorService es=Executors.newFixedThreadPool(5);
 41     public static void main(String[] args) throws Exception {
 42                 //重试策略
 43                 RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3);
 44                 
 45                 //客户端构建器
 46                 Builder builder=CuratorFrameworkFactory.builder();
 47                 
 48                 builder.connectString("10.151.30.75:2181");//客户端链接地址:端口号
 49                 builder.sessionTimeoutMs(5000);//会话的超时时间
 50                 builder.connectionTimeoutMs(5000);//链接的超时时间
 51                 builder.retryPolicy(retryPolicy);//指定重试策略
 52                 
 53                 //构建客户端
 54                 CuratorFramework client=builder.build();
 55                 client.start();
 56                 
 57                 //测试创建节点
 58                 //testCreateNode(client);
 59                 
 60                 //测试删除节点
 61                 //testDeleteNode(client);
 62                 
 63                 //获取子节点列表
 64                 //getChildList(client);
 65                 
 66                 //获取节点中存取的值
 67                 //getNodeValue(client);
 68                 
 69                 //修改节点中存储数据的值
 70                 //updateNodeValue(client);
 71                 
 72                 //判断一个节点是否存在
 73                 //isExistsNode(client);
 74                 
 75                 //测试异步调用使用方法
 76                 //testSynCallBack(client);
 77                 
 78                 //测试节点数据变动和节点被创建的事件足册
 79                 //nodeValueChangeListern(client);
 80                 
 81                 //给某个节点注册监听其子节点的活动
 82                 nodeChildListernDothing(client);
 83                 Thread.sleep(Integer.MAX_VALUE);
 84     }
 85     
 86     
 87     /**
 88      * 建立客户端链接的第一种方法
 89      * @throws InterruptedException
 90      */
 91     public void testCreateSession() throws InterruptedException{
 92         //重试策略可以实现该接口。通过自定义规则,进行重试
 93         //重试链接的策略的接口
 94         //(最开始时间间隔,最多重试次数) 随着次数递增,时间间隔还会增加
 95         RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3);
 96         //(最多重试次数,每次重试间隔的时间长度)
 97         RetryPolicy retryPolicy2 =new RetryNTimes(5, 1000);
 98         //(重试时间的总长度,每次重试的时间间隔) 这个基本可重试10次
 99         RetryPolicy retryPolicy3 =new RetryUntilElapsed(10000, 1000);
100         
101         
102         
103         //(ip地址端口,会话超时时间,建立连接超时时间,重试链接的策略)
104         CuratorFramework client=CuratorFrameworkFactory.newClient("10.151.30.75:2181", 5000,5000,retryPolicy);
105         //启动链接
106         client.start();
107         
108         Thread.sleep(Integer.MAX_VALUE);
109         
110     }
111     
112     /**
113      * 建立客户端链接的第二种方法
114      */
115     public static void testCreateSession2(){
116         //重试策略
117         RetryPolicy retryPolicy =new ExponentialBackoffRetry(1000, 3);
118         
119         //客户端构建器
120         Builder builder=CuratorFrameworkFactory.builder();
121         
122         builder.connectString("10.151.30.75:2181");//客户端链接地址:端口号
123         builder.sessionTimeoutMs(5000);//会话的超时时间
124         builder.connectionTimeoutMs(5000);//链接的超时时间
125         builder.retryPolicy(retryPolicy);//指定重试策略
126         builder.authorization(null);//给客户端添加权限AuthInfo 相关的知识在原生api里有讲过
127     
128 //        AuthInfo a=new AuthInfo("ip", "10.151.30.75:2181".getBytes());
129 //        AuthInfo b=new AuthInfo("digest", "shangxiaofei:shangxiaofei".getBytes());
130         
131         //构建客户端
132         CuratorFramework client=builder.build();
133         
134         //建立链接
135         client.start();
136         
137     }
138     
139     /**
140      * 测试创建节点(调用顺序)
141      * 两种创建方式都可以
142      * @throws Exception 
143      */
144     public static void testCreateNode(CuratorFramework client) throws Exception{
145         //创建节点构建器
146 //        String path=client.create()
147 //                                   .creatingParentsIfNeeded()//防止创建节点的父亲节点不存在,导致创建节点失败。如果父节点不存在,先创建父节点,再创建子节点
148 //                                   .withMode(CreateMode.PERSISTENT)//指定节点的类型(持久节点,临时节点,持久节并顺序节点,临时并顺序节点)
149 //                                   .forPath("/node_134", "xiaoshuai".getBytes());//节点路径,节点中存储的数据
150 //    
151         CreateBuilder builder=client.create();
152         builder.creatingParentsIfNeeded();
153         builder.withMode(CreateMode.PERSISTENT);
154         builder.withACL(null);//添加这个节点的权限。相关内容,在原生zookeeper客户端有讲解。上上篇博客
155         /**
156          * //基于ip的权限,意味着这个ip的客户端对此节点有读取权限
157          *     ACL ipacl=new ACL(Perms.READ, new Id("ip", "10.151.30.75"));
158          *     //基于digest的权限,意味着只有这个用户名和密码的客户端才能读取和写的权限
159          *     ACL digetacl=new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("shangxiaofei:shangxiaofei")));
160          * 
161          *     List<ACL> myaclAcls=new ArrayList<ACL>();
162          *     myaclAcls.add(ipacl);
163          *     myaclAcls.add(digetacl);
164          */
165         String pathString=builder.forPath("/node_135/node_135_2", "zhengjiuzhonghua".getBytes());
166     
167     
168         
169         System.out.println("TestRurator.testCreateNode()"+pathString);
170     }
171     
172     /**
173      * 测试删除节点(调用顺序不能乱)
174      * @param client
175      * @throws Exception 
176      */
177     public static void testDeleteNode(CuratorFramework client) throws Exception{
178         DeleteBuilder delteBuilder=client.delete();
179         delteBuilder.guaranteed();//保证节点必须删除,如果删除出现错误,则后台程序会不断去尝试删除。
180         delteBuilder.deletingChildrenIfNeeded();//如果存在子节点,先删除子节点
181         delteBuilder.withVersion(-1);//指定版本号
182         delteBuilder.forPath("/node_135");//指定路径
183     }
184     
185     /**
186      * 获取子节点列表
187      * @param client
188      * @throws Exception 
189      */
190     public static void getChildList(CuratorFramework client) throws Exception{
191         GetChildrenBuilder getChildBuilder=client.getChildren();
192         List<String> nodelist=getChildBuilder.forPath("/");
193         System.out.println("TestRurator.getChildList()"+nodelist);
194         
195     }
196     /**
197      * 获取节点中存取的值
198      * @throws Exception 
199      */
200     public static void getNodeValue(CuratorFramework client) throws Exception{
201         
202         GetDataBuilder dataBuilder=    client.getData();
203         
204         Stat stat=new Stat();
205         dataBuilder.storingStatIn(stat);
206         byte[] bytes=dataBuilder.forPath("/node_135/node_135_2");
207         System.out.println("TestRurator.getNodeValue(data==>)"+new String(bytes));
208         System.out.println("TestRurator.getNodeValue()"+stat);
209     }
210     
211     /**
212      * 修改节点中的数据
213      * @param client
214      * @throws Exception 
215      */
216     public static void updateNodeValue(CuratorFramework client) throws Exception{
217         SetDataBuilder setDataBuilder=client.setData();
218         Stat stat=setDataBuilder.withVersion(-1).forPath("/node_135/node_135_2", "tianxiawushuang".getBytes());
219         System.out.println("TestRurator.updateNodeValue()"+stat);
220     }
221     
222     /**
223      * 判断一个节点是否存在
224      * @param client
225      * @throws Exception 
226      */
227     public static void isExistsNode(CuratorFramework client) throws Exception{
228         ExistsBuilder existsBuilder=client.checkExists();
229         Stat stat=existsBuilder.forPath("/node_135/node_135_2");
230         if(stat!=null){
231             System.out.println("TestRurator.isExistsNode()节点存在");
232         }else{
233             System.out.println("TestRurator.isExistsNode()节点不存在");
234         }
235     }
236     /**
237      * 以判断节点是否存在,讲解异步调用的使用
238      * @param client
239      * @throws Exception 
240      */
241     public static void testSynCallBack(CuratorFramework client) throws Exception{
242         ExistsBuilder existsBuilder=client.checkExists();
243         existsBuilder.inBackground(new myBalckCallBack(),"sxf",es).forPath("/node_135/node_135_2");
244         //System.out.println("TestRurator.testSynCallBack(异步调用返回)"+stat);
245     }
246     
247     /**
248      * 异步调用回调接口的实现类
249      * @author sxf
250      *
251      */
252     private static class myBalckCallBack implements BackgroundCallback{
253 
254         @Override
255         public void processResult(CuratorFramework client, CuratorEvent event)
256                 throws Exception {
257                 //回调类型
258                 CuratorEventType curatorEventType=event.getType();
259                 //节点路径
260                 String path=event.getPath();
261                 //节点列表
262                 List<String> list=event.getChildren();
263                 //获取上下文
264                 Object context=event.getContext();
265                 //获取返回碼  异步操作成功,返回0
266                 int code=event.getResultCode();
267                 //获取数据内容
268                 byte[] data=event.getData();
269                 
270                 StringBuffer sb=new StringBuffer();
271                 sb.append("curatorEventType="+curatorEventType).append("\n");
272                 sb.append("path="+path).append("\n");
273                 sb.append("list="+list).append("\n");
274                 sb.append("context="+context).append("\n");
275                 sb.append("data="+new String(data)).append("\n");
276                 
277                 System.out.println(sb.toString());
278         }
279         
280     }
281     
282     
283     /**
284      *节点事件监听(节点被创建,节点中存储的数据被修改)
285      * @param client
286      * @throws Exception 
287      */
288     public  static void nodeValueChangeListern(CuratorFramework client) throws Exception{
289         //注册监听
290         final NodeCache nodeCache =new NodeCache(client, "/node_135/node_135_2");;
291         //开启监听
292         nodeCache.start();
293         //添加监听器(实现指定接口的实例对象)
294         nodeCache.getListenable().addListener(new NodeCacheListener() {
295             
296             @Override
297             public void nodeChanged() throws Exception {
298                 //获取当前节点的最近数据
299                 byte[] bytes=nodeCache.getCurrentData().getData();
300                 System.out.println(new String(bytes));
301                 
302             }
303         });
304     }
305     
306     /**
307      * 监听节点的子节点事件注册
308      * @param client
309      * @throws Exception 
310      */
311     public static void nodeChildListernDothing(CuratorFramework client) throws Exception{
312         //注册监听器
313         //第三个参数:系统在监听到子节点列表发生变化时同时获取子节点的数据内容
314         final PathChildrenCache cache=new PathChildrenCache(client, "/node_135", true);
315         //开启监听
316         cache.start();
317         //注册监听器
318         cache.getListenable().addListener(new PathChildrenCacheListener() {
319             
320             @Override
321             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
322                     throws Exception {
323                 Type type=event.getType();
324                 switch (event.getType()) {
325                 case CHILD_ADDED://添加子节点
326                         System.out.println("新添加子节点中存储的值==>"+new String(event.getData().getData()));
327                     break;
328 
329                 case CHILD_UPDATED://子节点被修改
330                         System.out.println("被修改的子节点中存储的值==>"+new String(event.getData().getData()));
331                     break;
332                 case CHILD_REMOVED://子节点被删除    
333                         System.out.println("被删除的字节点的路径==>"+event.getInitialData());
334                     break;
335                 
336                 default:
337                     break;
338                 }
339                 
340             }
341         });
342     }
343     
344 }
View Code

 

以上是关于八:Zookeeper开源客户端Curator的api测试的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper开源客户端Curator之基本功能讲解

浅谈Zookeeper开源客户端框架Curator

ZooKeeper开源客户端Curator

Apache Curator入门实战

Zookeeper客户端Curator使用详解

7.5 zookeeper客户端curator的基本使用