7.5 zookeeper客户端curator的基本使用

Posted 赵计刚

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了7.5 zookeeper客户端curator的基本使用相关的知识,希望对你有一定的参考价值。

使用zookeeper原生API实现一些复杂的东西比较麻烦。所以,出现了两款比较好的开源客户端,对zookeeper的原生API进行了包装:zkClient和curator。后者是Netflix出版的,必属精品,也是最好用的zk的开源客户端。

 

一  curator基本API使用

引入依赖:

1         <dependency>
2             <groupId>org.apache.curator</groupId>
3             <artifactId>curator-framework</artifactId>
4             <version>2.12.0</version>
5         </dependency>

该依赖引入后,默认引入的zookeeper版本是3.4.8。

注意:不要引入>=3.0.0的curator-framework,默认引入的zookeeper版本是3.5.x(该版本还不稳定),目前测试起来还是有点问题的。

完整代码:

 1 package com.hulk.curator;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.CuratorFrameworkFactory;
 5 import org.apache.curator.framework.api.BackgroundCallback;
 6 import org.apache.curator.framework.api.CuratorEvent;
 7 import org.apache.curator.retry.ExponentialBackoffRetry;
 8 import org.apache.zookeeper.CreateMode;
 9 import org.apache.zookeeper.data.Stat;
10 
11 import java.util.concurrent.Executors;
12 
13 public class CuratorTest {
14     private static CuratorFramework client = CuratorFrameworkFactory.builder()
15             .connectString("10.211.55.4:2181")
16             .sessionTimeoutMs(50000)
17             .connectionTimeoutMs(30000)
18             .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
19 
20     public static void main(String[] args) throws Exception {
21         /**
22          * 创建会话
23          */
24         client.start();
25 
26         /**
27          * 创建节点
28          * 注意:
29          * 1 除非指明创建节点的类型,默认是持久节点
30          * 2 ZooKeeper规定:所有非叶子节点都是持久节点,所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点
31          */
32         client.create().forPath("/China");//创建一个初始内容为空的节点
33         client.create().forPath("/America", "zhangsan".getBytes());
34         client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//创建一个初始内容为空的临时节点
35         client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//递归创建,/Russia是持久节点
36 
37         /**
38          * 异步创建节点
39          * 注意:如果自己指定了线程池,那么相应的操作就会在线程池中执行,如果没有指定,那么就会使用Zookeeper的EventThread线程对事件进行串行处理
40          */
41         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
42             @Override
43             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
44                 System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
45                                    + ",type:" + event.getType());
46             }
47         }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my");
48 
49         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
50             @Override
51             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
52                 System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
53                                    + ",type:" + event.getType());
54             }
55         }).forPath("/async-curator-zookeeper");
56 
57         /**
58          * 获取节点内容
59          */
60         byte[] data = client.getData().forPath("/America");
61         System.out.println(new String(data));
62         byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //传入一个旧的stat变量,来存储服务端返回的最新的节点状态信息
63         System.out.println(new String(data2));
64         /**
65          * 更新数据
66          */
67         Stat stat = client.setData().forPath("/America");
68         client.setData().withVersion(4).forPath("/America", "lisi".getBytes());
69 
70         /**
71          * 删除节点
72          */
73         client.delete().forPath("/China");//只能删除叶子节点
74         client.delete().deletingChildrenIfNeeded().forPath("/Russia");//删除一个节点,并递归删除其所有子节点
75         client.delete().withVersion(5).forPath("/America");//强制指定版本进行删除
76         client.delete().guaranteed().forPath("/America");//注意:由于一些网络原因,上述的删除操作有可能失败,使用guaranteed(),如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止
77 
78         Thread.sleep(Integer.MAX_VALUE);
79     }
80 }

 

1  创建会话

curator创建会话有两种方式,推荐流式API。

1 CuratorFramework client = CuratorFrameworkFactory.builder()
2             .connectString("10.211.55.4:2181")
3             .sessionTimeoutMs(50000)
4             .connectionTimeoutMs(30000)
5             .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

参数:

  • connectString:zk的server地址,多个server之间使用英文逗号分隔开
  • connectionTimeoutMs:连接超时时间,如上是30s,默认是15s
  • sessionTimeoutMs:会话超时时间,如上是50s,默认是60s
  • retryPolicy:失败重试策略
    • ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
      • baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,
        • 计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
      • maxRetries:最大重试次数
      • maxSleepMs:最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间
    • 其他,查看org.apache.curator.RetryPolicy接口的实现类

此时会话还没创建,使用如下代码创建会话:

1 client.start();

start()会阻塞到会话创建成功为止。

 

2  创建节点

2.1  同步创建

1         client.create().forPath("/China");//创建一个初始内容为空的节点
2         client.create().forPath("/America", "zhangsan".getBytes());
3         client.create().withMode(CreateMode.EPHEMERAL).forPath("/France");//创建一个初始内容为空的临时节点
4         client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/Russia/car", "haha".getBytes());//递归创建,/Russia是持久节点

注意:

  • 除非指明创建节点的类型,默认是持久节点
  • ZooKeeper规定:所有非叶子节点都是持久节点,所以递归创建出来的节点,只有最后的数据节点才是指定类型的节点,其父节点是持久节点
  • creatingParentsIfNeeded():可以实现递归创建

2.2  异步创建

 1         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
 2             @Override
 3             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
 4                 System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
 5                                    + ",type:" + event.getType());
 6             }
 7         }, Executors.newFixedThreadPool(10)).forPath("/async-curator-my");
 8 
 9         client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
10             @Override
11             public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
12                 System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode()
13                                    + ",type:" + event.getType());
14             }
15         }).forPath("/async-curator-zookeeper");

注意:

  • 在curator中所有异步操作,都使用org.apache.curator.framework.api.BackgroundCallback接口的实现类完成
  • 如果在BackgroundCallback中自己指定了线程池,那么相应的操作就会在线程池中执行,如果没有指定,那么就会使用Zookeeper的EventThread线程对事件进行串行处理,所以上述的两个输出分别如下:
    当前线程:pool-3-thread-1,code:0,type:CREATE
    当前线程:main-EventThread,code:0,type:CREATE

     

3  获取节点内容

1         byte[] data = client.getData().forPath("/America");
2         System.out.println(new String(data));
3         byte[] data2 = client.getData().storingStatIn(new Stat()).forPath("/America"); //传入一个旧的stat变量,来存储服务端返回的最新的节点状态信息
4         System.out.println(new String(data2));

4  获取节点子节点列表

1 List<String> children = client.getChildren().forPath("/Russia");

 

5  更新数据

1         Stat stat = client.setData().forPath("/America");
2         client.setData().withVersion(4).forPath("/America", "lisi".getBytes());

注意:

  • version版本号还是为了实现CAS并发处理,也会强制某个线程必须更新相应的版本的数据

 

6  删除节点

1         client.delete().forPath("/China");//只能删除叶子节点
2         client.delete().deletingChildrenIfNeeded().forPath("/Russia");//删除一个节点,并递归删除其所有子节点
3         client.delete().withVersion(5).forPath("/America");//强制指定版本进行删除
4         client.delete().guaranteed().forPath("/America");

注意:

  • deletingChildrenIfNeeded()实现级联删除
  • guaranteed()由于一些网络原因,上述的删除操作有可能失败,使用guaranteed(),如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止

 

二  curator实现事件监听

引入两个依赖:

 1         <dependency>
 2             <groupId>org.apache.curator</groupId>
 3             <artifactId>curator-framework</artifactId>
 4             <version>2.12.0</version>
 5         </dependency>
 6         <dependency>
 7             <groupId>org.apache.curator</groupId>
 8             <artifactId>curator-recipes</artifactId>
 9             <version>2.12.0</version>
10         </dependency>

给出全部代码:

 1 package com.hulk.curator;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.CuratorFrameworkFactory;
 5 import org.apache.curator.framework.recipes.cache.NodeCache;
 6 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 7 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 8 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 9 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
10 import org.apache.curator.retry.ExponentialBackoffRetry;
11 
12 /**
13  * 事件监听器
14  */
15 public class CuratorWatcherTest {
16     private static CuratorFramework client = CuratorFrameworkFactory.builder()
17             .connectString("10.211.55.4:2181")
18             .sessionTimeoutMs(50000)
19             .connectionTimeoutMs(30000)
20             .retryPolicy(new ExponentialBackoffRetry(1000, 3))
21             .build();
22 
23     public static void main(String[] args) throws Exception {
24         /**
25          * 创建会话
26          */
27         client.start();
28         client.create().creatingParentsIfNeeded().forPath("/book/computer","java".getBytes());
29         /**
30          * 监听指定节点本身的变化,包括节点本身的创建和节点本身数据的变化
31          */
32         NodeCache nodeCache = new NodeCache(client,"/book/computer");
33         nodeCache.getListenable().addListener(new NodeCacheListener() {
34             @Override
35             public void nodeChanged() throws Exception {
36                 System.out.println("新的节点数据:" + new String(nodeCache.getCurrentData().getData()));
37             }
38         });
39         nodeCache.start(true);
40 
41         client.setData().forPath("/book/computer","c++".getBytes());
42         /**
43          * 监听子节点变化情况
44          * 1 新增子节点
45          * 2 删除子节点
46          * 3 子节点数据变更
47          */
48         PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/book13",true);
49         pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
50             @Override
51             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
52                 switch (event.getType()){
53                     case CHILD_ADDED:
54                         System.out.println("新增子节点:" + event.getData().getPath());
55                         break;
56                     case CHILD_UPDATED:
57                         System.out.println("子节点数据变化:" + event.getData().getPath());
58                         break;
59                     case CHILD_REMOVED:
60                         System.out.println("删除子节点:" + event.getData().getPath());
61                         break;
62                     default:
63                         break;
64                 }
65             }
66         });
67         pathChildrenCache.start();
68 
69         client.create().forPath("/book13");
70 
71         client.create().forPath("/book13/car", "bmw".getBytes());
72 
73         client.setData().forPath("/book13/car", "audi".getBytes());
74 
75         client.delete().forPath("/book13/car");
76     }
77 }

curator的事件监听分为:

  • NodeCache:对节点本身的监听
    • 监听节点本身的创建
    • 监听节点本身的数据的变化
  • PathChildrenCache:对节点的子节点的监听
    • 监听新增子节点
    • 监听删除子节点
    • 监听子节点数据变化

注意

  • PathChildrenCache只会监听指定节点的一级子节点,不会监听节点本身(例如:“/book13”),也不会监听子节点的子节点(例如,“/book13/car/color”)

以上是关于7.5 zookeeper客户端curator的基本使用的主要内容,如果未能解决你的问题,请参考以下文章

zookeeper_04:curator

zookeeper开源客户端curator

第三章 zookeeper客户端-curator详解

zookeeper java客户端之curator详解

Zookeeper客户端Curator使用详解

Zookeeper客户端之 Curator