Curator使用

Posted ljing21

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Curator使用相关的知识,希望对你有一定的参考价值。

1.为什么使用Curator?

Curator本身是Netflix公司开源的zookeeper客户端

Curator  提供了各种应用场景实现封装

curator-framework  提供了fluent风格api;

curator-replice   提供了实现封装;

2.引入依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.11.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.11.0</version>
</dependency>

 3.创建会话连接

技术图片
 1 package com.karat.cn.zookeeper.curator;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.CuratorFrameworkFactory;
 5 import org.apache.curator.retry.ExponentialBackoffRetry;
 6 
 7 /**
 8  * 创建会话连接
 9  * @author Administrator
10  *
11  */
12 public class CuratorCreateSessionDemo {
13     private final static String CONNECTSTRING="47.107.121.215:2181";
14     
15     public static void main(String args[]) {
16         //创建会话连接的2种方式
17         //正常的风格
18         CuratorFramework curatorFramework1=CuratorFrameworkFactory.
19                 newClient(CONNECTSTRING,5000,5000,
20                         new ExponentialBackoffRetry(1000, 3));//重试机制
21         curatorFramework1.start();
22         //fluent风格
23         CuratorFramework curatorFramework2=CuratorFrameworkFactory.builder().
24                     connectString(CONNECTSTRING).
25                     sessionTimeoutMs(5000).
26                     retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
27         curatorFramework2.start();
28         System.out.println("success");
29         
30     }
31 }
View Code

4.curator连接的重试策略

ExponentialBackoffRetry()  衰减重试

RetryNTimes 指定最大重试次数

RetryOneTime 仅重试一次

RetryUnitilElapsed 一直重试知道规定的时间

5.节点操作

技术图片
  1 package com.karat.cn.zookeeper.curator;
  2 
  3 import org.apache.curator.framework.CuratorFramework;
  4 import org.apache.curator.framework.CuratorFrameworkFactory;
  5 import org.apache.curator.framework.api.BackgroundCallback;
  6 import org.apache.curator.framework.api.CuratorEvent;
  7 import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
  8 import org.apache.curator.retry.ExponentialBackoffRetry;
  9 import org.apache.zookeeper.CreateMode;
 10 import org.apache.zookeeper.data.Stat;
 11 
 12 import java.util.Collection;
 13 import java.util.Collections;
 14 import java.util.concurrent.CountDownLatch;
 15 import java.util.concurrent.Executor;
 16 import java.util.concurrent.ExecutorService;
 17 import java.util.concurrent.Executors;
 18 
 19 /**
 20  * curator对节点的增删改查
 21  * @author Administrator
 22  *
 23  */
 24 public class CuratorOperatorDemo {
 25 
 26     public static void main(String[] args) throws InterruptedException {
 27         CuratorFramework curatorFramework=CuratorClientUtils.getInstance();
 28         System.out.println("连接成功.........");
 29 
 30         //fluent风格api增删改查操作
 31         /**
 32          * 创建节点
 33          */
 34        /*try {
 35             String result=curatorFramework.create()
 36                     .creatingParentsIfNeeded()//创建父节点
 37                     .withMode(CreateMode.PERSISTENT)//持久节点:节点创建后,会一直存在,不会因客户端会话失效而删除;
 38                     .forPath("/curator/curator1/curator11","123".getBytes());
 39             System.out.println(result);
 40         } catch (Exception e) {
 41             e.printStackTrace();
 42         }*/
 43         /**
 44          * 删除节点
 45          */
 46         /*try {
 47             //默认情况下,version为-1
 48             curatorFramework.delete()//删除操作
 49             .deletingChildrenIfNeeded()//删除子节点
 50             .forPath("/node");
 51         } catch (Exception e) {
 52             e.printStackTrace();
 53         }*/
 54 
 55         /**
 56          * 查询
 57          */
 58         /*Stat stat=new Stat();
 59         try {
 60             byte[] bytes=curatorFramework
 61                     .getData()
 62                     .storingStatIn(stat)
 63                     .forPath("/curator/curator1/curator11");
 64             System.out.println(new String(bytes)+"-->stat:"+stat);
 65         } catch (Exception e) {
 66             e.printStackTrace();
 67         }*/
 68         /**
 69          * 更新
 70          */
 71         /*try {
 72             Stat stat=curatorFramework
 73                     .setData()
 74                     .forPath("/curator","lijing".getBytes());
 75             System.out.println(stat);
 76         } catch (Exception e) {
 77             e.printStackTrace();
 78         }*/
 79 
 80         /**
 81          * 异步操作
 82          */
 83         /*ExecutorService service= Executors.newFixedThreadPool(1);//线程池(创建节点的事件由线程池处理)
 84         CountDownLatch countDownLatch=new CountDownLatch(1);//计数器
 85         try {
 86             curatorFramework
 87             .create()
 88             .creatingParentsIfNeeded()
 89             .withMode(CreateMode.EPHEMERAL)
 90             .inBackground(new BackgroundCallback() {
 91                  @Override
 92                  public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
 93                      System.out.println(Thread.currentThread().getName()
 94                              +"->resultCode:"
 95                              +curatorEvent.getResultCode()+"->"//响应结果
 96                              +curatorEvent.getType());//当前节点操作类型
 97                      countDownLatch.countDown();
 98                  }
 99             },service)
100             .forPath("/mic","123".getBytes());
101         } catch (Exception e) {
102             e.printStackTrace();
103         }
104         countDownLatch.await();//等待(让当前线程等待)
105         service.shutdown();//关闭线程*/
106         /**
107          * 事务操作(curator独有的)
108          */
109         try {
110             Collection<CuratorTransactionResult> resultCollections=curatorFramework
111                     .inTransaction()//开启一个事务
112                     .create()
113                     .forPath("/trans","111".getBytes())//创建一个节点
114                     .and()//通过and去修改一个节点
115                     .setData()
116                     .forPath("/curator","111".getBytes())//当修改节点不存在,则一成功一失败,事务不会提交成功
117                     .and()
118                     .commit();//提交事务
119             for (CuratorTransactionResult result:resultCollections){
120                 System.out.println(result.getForPath()+"->"+result.getType());
121             }
122         } catch (Exception e) {
123             e.printStackTrace();
124         }
125     }
126 }
View Code

6.将会话连接做成工具类

技术图片
 1 package com.karat.cn.zookeeper.curator;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.CuratorFrameworkFactory;
 5 import org.apache.curator.retry.ExponentialBackoffRetry;
 6 
 7 /**
 8  * 会话连接工具类
 9  * @author Administrator
10  *
11  */
12 public class CuratorClientUtils {
13 
14     private static CuratorFramework curatorFramework;
15     
16     private final static String CONNECTSTRING="47.107.121.215:2181";
17 
18 
19     public static CuratorFramework getInstance(){
20         curatorFramework= CuratorFrameworkFactory.
21                 newClient(CONNECTSTRING,5000,5000,
22                         new ExponentialBackoffRetry(1000,3));
23         curatorFramework.start();
24         return curatorFramework;
25     }
26 }
View Code

7.监听

技术图片
 1 package com.karat.cn.zookeeper.curator;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.recipes.cache.NodeCache;
 5 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 6 import org.apache.zookeeper.CreateMode;
 7 
 8 import java.util.concurrent.TimeUnit;
 9 
10 /**
11  * 监听
12  * @author Administrator
13  *
14  */
15 public class CuratorEventDemo {
16 
17     /**
18      * 三种watcher来做节点的监听
19      * pathcache   监视一个路径下子节点的创建、删除、节点数据更新
20      * NodeCache   监视一个节点的创建、更新、删除
21      * TreeCache   pathcaceh+nodecache 的合体(监视路径下的创建、更新、删除事件),
22      * 缓存路径下的所有子节点的数据
23      */
24 
25     public static void main(String[] args) throws Exception {
26         CuratorFramework curatorFramework=CuratorClientUtils.getInstance();
27         /**
28          * 节点变化NodeCache
29          */
30         //监听
31         /*NodeCache cache=new NodeCache(curatorFramework,"/curator",false);
32         cache.start(true);
33         //监听事件
34         cache.getListenable().addListener(()-> System.out.println("节点数据发生变化,变化后的结果" +
35                 ":"+new String(cache.getCurrentData().getData())));
36         //修改节点
37         curatorFramework.setData().forPath("/curator","666".getBytes());*/
38 
39 
40         /**
41          * PatchChildrenCache
42          */
43 
44         PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);//参数2监听的节点,参数3是否缓存
45         cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
46         // Normal/ BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT
47 
48         cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{
49             switch (pathChildrenCacheEvent.getType()){
50                 case CHILD_ADDED:
51                     System.out.println("增加子节点");
52                     break;
53                 case CHILD_REMOVED:
54                     System.out.println("删除子节点");
55                     break;
56                 case CHILD_UPDATED:
57                     System.out.println("更新子节点");
58                     break;
59                 default:break;
60             }
61         });
62         //创建节点
63         curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
64         TimeUnit.SECONDS.sleep(1);
65         System.out.println("1");
66         //创建子节点
67         curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes());
68         TimeUnit.SECONDS.sleep(1);
69         System.out.println("2");
70         //修改节点
71         curatorFramework.setData().forPath("/event/event1","222".getBytes());
72         TimeUnit.SECONDS.sleep(1);
73         System.out.println("3");
74         //删除节点
75         curatorFramework.delete().forPath("/event/event1");
76         System.out.println("4");
77          
78         System.in.read();
79     }
80 }
View Code

 

以上是关于Curator使用的主要内容,如果未能解决你的问题,请参考以下文章

Curator使用:分布式锁

Apache Curator ZooKeeper单元测试产生错误

Curator使用

zookeeper curator使用

zookeeper开源客户端curator

Curator的使用