Curator使用
Posted ljing21
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Curator使用相关的知识,希望对你有一定的参考价值。
1.为什么使用Curator?
Curator本身是Netflix公司开源的zookeeper客户端;
Curator 提供了各种应用场景的实现封装;
curator-framework 提供了fluent风格api;
curator-replice 提供了实现封装;
2.引入依赖:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.11.0</version> </dependency>
3.创建会话连接
1 package com.karat.cn.zookeeper.curator; 2 3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.CuratorFrameworkFactory; 5 import org.apache.curator.retry.ExponentialBackoffRetry; 6 7 /** 8 * 创建会话连接 9 * @author Administrator 10 * 11 */ 12 public class CuratorCreateSessionDemo { 13 private final static String CONNECTSTRING="47.107.121.215:2181"; 14 15 public static void main(String args[]) { 16 //创建会话连接的2种方式 17 //正常的风格 18 CuratorFramework curatorFramework1=CuratorFrameworkFactory. 19 newClient(CONNECTSTRING,5000,5000, 20 new ExponentialBackoffRetry(1000, 3));//重试机制 21 curatorFramework1.start(); 22 //fluent风格 23 CuratorFramework curatorFramework2=CuratorFrameworkFactory.builder(). 24 connectString(CONNECTSTRING). 25 sessionTimeoutMs(5000). 26 retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); 27 curatorFramework2.start(); 28 System.out.println("success"); 29 30 } 31 }
4.curator连接的重试策略
ExponentialBackoffRetry() 衰减重试
RetryNTimes 指定最大重试次数
RetryOneTime 仅重试一次
RetryUnitilElapsed 一直重试知道规定的时间
5.节点操作
1 package com.karat.cn.zookeeper.curator; 2 3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.CuratorFrameworkFactory; 5 import org.apache.curator.framework.api.BackgroundCallback; 6 import org.apache.curator.framework.api.CuratorEvent; 7 import org.apache.curator.framework.api.transaction.CuratorTransactionResult; 8 import org.apache.curator.retry.ExponentialBackoffRetry; 9 import org.apache.zookeeper.CreateMode; 10 import org.apache.zookeeper.data.Stat; 11 12 import java.util.Collection; 13 import java.util.Collections; 14 import java.util.concurrent.CountDownLatch; 15 import java.util.concurrent.Executor; 16 import java.util.concurrent.ExecutorService; 17 import java.util.concurrent.Executors; 18 19 /** 20 * curator对节点的增删改查 21 * @author Administrator 22 * 23 */ 24 public class CuratorOperatorDemo { 25 26 public static void main(String[] args) throws InterruptedException { 27 CuratorFramework curatorFramework=CuratorClientUtils.getInstance(); 28 System.out.println("连接成功........."); 29 30 //fluent风格api增删改查操作 31 /** 32 * 创建节点 33 */ 34 /*try { 35 String result=curatorFramework.create() 36 .creatingParentsIfNeeded()//创建父节点 37 .withMode(CreateMode.PERSISTENT)//持久节点:节点创建后,会一直存在,不会因客户端会话失效而删除; 38 .forPath("/curator/curator1/curator11","123".getBytes()); 39 System.out.println(result); 40 } catch (Exception e) { 41 e.printStackTrace(); 42 }*/ 43 /** 44 * 删除节点 45 */ 46 /*try { 47 //默认情况下,version为-1 48 curatorFramework.delete()//删除操作 49 .deletingChildrenIfNeeded()//删除子节点 50 .forPath("/node"); 51 } catch (Exception e) { 52 e.printStackTrace(); 53 }*/ 54 55 /** 56 * 查询 57 */ 58 /*Stat stat=new Stat(); 59 try { 60 byte[] bytes=curatorFramework 61 .getData() 62 .storingStatIn(stat) 63 .forPath("/curator/curator1/curator11"); 64 System.out.println(new String(bytes)+"-->stat:"+stat); 65 } catch (Exception e) { 66 e.printStackTrace(); 67 }*/ 68 /** 69 * 更新 70 */ 71 /*try { 72 Stat stat=curatorFramework 73 .setData() 74 .forPath("/curator","lijing".getBytes()); 75 System.out.println(stat); 76 } catch (Exception e) { 77 e.printStackTrace(); 78 }*/ 79 80 /** 81 * 异步操作 82 */ 83 /*ExecutorService service= Executors.newFixedThreadPool(1);//线程池(创建节点的事件由线程池处理) 84 CountDownLatch countDownLatch=new CountDownLatch(1);//计数器 85 try { 86 curatorFramework 87 .create() 88 .creatingParentsIfNeeded() 89 .withMode(CreateMode.EPHEMERAL) 90 .inBackground(new BackgroundCallback() { 91 @Override 92 public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { 93 System.out.println(Thread.currentThread().getName() 94 +"->resultCode:" 95 +curatorEvent.getResultCode()+"->"//响应结果 96 +curatorEvent.getType());//当前节点操作类型 97 countDownLatch.countDown(); 98 } 99 },service) 100 .forPath("/mic","123".getBytes()); 101 } catch (Exception e) { 102 e.printStackTrace(); 103 } 104 countDownLatch.await();//等待(让当前线程等待) 105 service.shutdown();//关闭线程*/ 106 /** 107 * 事务操作(curator独有的) 108 */ 109 try { 110 Collection<CuratorTransactionResult> resultCollections=curatorFramework 111 .inTransaction()//开启一个事务 112 .create() 113 .forPath("/trans","111".getBytes())//创建一个节点 114 .and()//通过and去修改一个节点 115 .setData() 116 .forPath("/curator","111".getBytes())//当修改节点不存在,则一成功一失败,事务不会提交成功 117 .and() 118 .commit();//提交事务 119 for (CuratorTransactionResult result:resultCollections){ 120 System.out.println(result.getForPath()+"->"+result.getType()); 121 } 122 } catch (Exception e) { 123 e.printStackTrace(); 124 } 125 } 126 }
6.将会话连接做成工具类
1 package com.karat.cn.zookeeper.curator; 2 3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.CuratorFrameworkFactory; 5 import org.apache.curator.retry.ExponentialBackoffRetry; 6 7 /** 8 * 会话连接工具类 9 * @author Administrator 10 * 11 */ 12 public class CuratorClientUtils { 13 14 private static CuratorFramework curatorFramework; 15 16 private final static String CONNECTSTRING="47.107.121.215:2181"; 17 18 19 public static CuratorFramework getInstance(){ 20 curatorFramework= CuratorFrameworkFactory. 21 newClient(CONNECTSTRING,5000,5000, 22 new ExponentialBackoffRetry(1000,3)); 23 curatorFramework.start(); 24 return curatorFramework; 25 } 26 }
7.监听
1 package com.karat.cn.zookeeper.curator; 2 3 import org.apache.curator.framework.CuratorFramework; 4 import org.apache.curator.framework.recipes.cache.NodeCache; 5 import org.apache.curator.framework.recipes.cache.PathChildrenCache; 6 import org.apache.zookeeper.CreateMode; 7 8 import java.util.concurrent.TimeUnit; 9 10 /** 11 * 监听 12 * @author Administrator 13 * 14 */ 15 public class CuratorEventDemo { 16 17 /** 18 * 三种watcher来做节点的监听 19 * pathcache 监视一个路径下子节点的创建、删除、节点数据更新 20 * NodeCache 监视一个节点的创建、更新、删除 21 * TreeCache pathcaceh+nodecache 的合体(监视路径下的创建、更新、删除事件), 22 * 缓存路径下的所有子节点的数据 23 */ 24 25 public static void main(String[] args) throws Exception { 26 CuratorFramework curatorFramework=CuratorClientUtils.getInstance(); 27 /** 28 * 节点变化NodeCache 29 */ 30 //监听 31 /*NodeCache cache=new NodeCache(curatorFramework,"/curator",false); 32 cache.start(true); 33 //监听事件 34 cache.getListenable().addListener(()-> System.out.println("节点数据发生变化,变化后的结果" + 35 ":"+new String(cache.getCurrentData().getData()))); 36 //修改节点 37 curatorFramework.setData().forPath("/curator","666".getBytes());*/ 38 39 40 /** 41 * PatchChildrenCache 42 */ 43 44 PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);//参数2监听的节点,参数3是否缓存 45 cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); 46 // Normal/ BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT 47 48 cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{ 49 switch (pathChildrenCacheEvent.getType()){ 50 case CHILD_ADDED: 51 System.out.println("增加子节点"); 52 break; 53 case CHILD_REMOVED: 54 System.out.println("删除子节点"); 55 break; 56 case CHILD_UPDATED: 57 System.out.println("更新子节点"); 58 break; 59 default:break; 60 } 61 }); 62 //创建节点 63 curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes()); 64 TimeUnit.SECONDS.sleep(1); 65 System.out.println("1"); 66 //创建子节点 67 curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes()); 68 TimeUnit.SECONDS.sleep(1); 69 System.out.println("2"); 70 //修改节点 71 curatorFramework.setData().forPath("/event/event1","222".getBytes()); 72 TimeUnit.SECONDS.sleep(1); 73 System.out.println("3"); 74 //删除节点 75 curatorFramework.delete().forPath("/event/event1"); 76 System.out.println("4"); 77 78 System.in.read(); 79 } 80 }
以上是关于Curator使用的主要内容,如果未能解决你的问题,请参考以下文章