Java API操作ZooKeeper
Posted KoKo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java API操作ZooKeeper相关的知识,希望对你有一定的参考价值。
- 创建会话
1 package org.zln.zk; 2 3 import org.apache.zookeeper.WatchedEvent; 4 import org.apache.zookeeper.Watcher; 5 import org.apache.zookeeper.ZooKeeper; 6 7 import java.io.IOException; 8 9 /** 10 * Created by sherry on 16/8/27. 11 */ 12 public class TestZooKeeperClientApi { 13 14 private static ZooKeeper zooKeeper; 15 16 public static void main(String[] args) throws IOException, InterruptedException { 17 createSession(); 18 } 19 20 /** 21 * 创建会话 22 */ 23 private static ZooKeeper createSession() throws IOException, InterruptedException { 24 //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间 监听器(实现water接口,监听器用于接收通知) 25 zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() { 26 @Override 27 public void process(WatchedEvent watchedEvent) { 28 System.out.println("收到事件:"+watchedEvent);//收到事件:WatchedEvent state:SyncConnected type:None path:null 29 30 31 //TODO 32 } 33 }); 34 System.out.println("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING 35 36 //如果不停一段时间,那么,监听器还没收到监听,方法就已经退出了 37 Thread.sleep(5000); 38 39 return zooKeeper; 40 41 42 } 43 }
- 创建节点
1 package org.zln.zk; 2 3 import org.apache.zookeeper.*; 4 import org.apache.zookeeper.data.ACL; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 8 import java.io.IOException; 9 import java.io.UnsupportedEncodingException; 10 import java.util.ArrayList; 11 12 /** 13 * Created by sherry on 16/8/27. 14 */ 15 public class TestZooKeeperClientApi { 16 17 private static Logger logger = LoggerFactory.getLogger(TestZooKeeperClientApi.class); 18 19 private static ZooKeeper zooKeeper; 20 21 public static void main(String[] args) throws IOException, InterruptedException { 22 createSession(); 23 } 24 25 /** 26 * 创建会话 27 */ 28 private static ZooKeeper createSession() throws IOException, InterruptedException { 29 //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间 监听器(实现water接口,监听器用于接收通知) 30 zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() { 31 @Override 32 public void process(WatchedEvent watchedEvent) { 33 //TODO 与 ZooKeeper 的交互,一般都放在这里 34 if (watchedEvent.getState() == Event.KeeperState.SyncConnected){//已连接 35 logger.info("连接上了"); 36 37 try { 38 //参数说明:节点路径 数据的字节数组 权限 创建节点模式 39 String nodePath = createNode(zooKeeper,"/node_1","123".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 40 logger.info("创建节点:"+nodePath); 41 } catch (UnsupportedEncodingException|KeeperException|InterruptedException e) { 42 e.printStackTrace(); 43 } 44 45 } 46 } 47 }); 48 logger.info("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING 49 50 //如果不停一段时间,那么,监听器还没收到监听,方法就已经退出了 51 Thread.sleep(5000); 52 53 return zooKeeper; 54 55 56 } 57 58 /** 59 * 创建ZooKeeper节点 60 * @param zooKeeper ZooKeeper连接 61 * @return 节点路径 62 */ 63 public static String createNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws UnsupportedEncodingException, KeeperException, InterruptedException { 64 //参数说明:节点路径 数据的字节数组 权限 创建节点模式 65 return zooKeeper.create(path,bytes, acls, createMode); 66 } 67 }
创建模式
PERSISTENT 持久节点
PERSISTENT_SEQUENTIAL 持久顺序节点
EPHEMERAL 临时节点
EPHEMERAL_SEQUENTIAL 临时顺序节点
以上代码,是属于同步创建
1 /** 2 * 异步创建节点 3 * @param zooKeeper 4 * @param path 5 * @param bytes 6 * @param acls 7 * @param createMode 8 * @throws KeeperException 9 * @throws InterruptedException 10 */ 11 public static void asCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws KeeperException, InterruptedException { 12 13 //异步创建需要增加 AsyncCallback.StringCallback 接口的实现类 以及 一个上下文对象参数 14 zooKeeper.create(path, bytes, acls, createMode, new AsyncCallback.StringCallback() { 15 /** 16 * 17 * @param rc 节点创建结果返回码 0-节点创建成功 18 * @param path 节点真实路径 19 * @param ctx 异步调用上下文 就是 create方法本地调用的那个最后一个参数 20 * @param name 21 */ 22 @Override 23 public void processResult(int rc, String path, Object ctx, String name) { 24 StringBuilder stringBuilder = new StringBuilder(); 25 stringBuilder.append("\\nrc="+rc+"\\n" + 26 "path="+path+"\\n" + 27 "ctx="+ctx+"\\n" + 28 "name="+name+"\\n"); 29 logger.info(stringBuilder.toString()); 30 } 31 },"异步创建"); 32 }
- 获取子节点
1 /** 2 * 同步方式获取子节点 3 * @param zooKeeper 连接 4 * @param parentPath 父路径 5 * @return 6 * @throws KeeperException 7 * @throws InterruptedException 8 */ 9 public static List<String> getChildList(ZooKeeper zooKeeper,String parentPath) throws KeeperException, InterruptedException { 10 //参数说明: 父节点路径 是否需要关注子节点的变化 11 List<String> childs = zooKeeper.getChildren(parentPath,false); 12 return childs; 13 }
异步方式获取子节点且关注子节点的变化
1 /** 2 * 异步方式获取子节点 关注子节点变化 3 * @param zooKeeper 连接 4 * @param parentPath 父路径 5 */ 6 public static void asGetChildListAndWatch(ZooKeeper zooKeeper,String parentPath){ 7 zooKeeper.getChildren(parentPath, true, new AsyncCallback.Children2Callback() { 8 @Override 9 public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { 10 logger.info("变化后的子节点:"); 11 for (String name:children){ 12 logger.info("子节点:"+name); 13 } 14 } 15 },"关注子节点变化"); 16 }
目前为止可以发现这个规律,有回调函数的是异步方式调用,没有回调函数的是同步调用
问:同步调用和异步调用的使用场景是???
答:下面的操作依赖调用结果的时候,就需要调用同步方法
1 package org.zln.zk; 2 3 import org.apache.zookeeper.*; 4 import org.apache.zookeeper.data.ACL; 5 import org.apache.zookeeper.data.Stat; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import java.io.IOException; 10 import java.io.UnsupportedEncodingException; 11 import java.util.ArrayList; 12 import java.util.List; 13 14 /** 15 * Created by sherry on 16/8/27. 16 */ 17 public class TestZooKeeperClientApi { 18 19 private static Logger logger = LoggerFactory.getLogger(TestZooKeeperClientApi.class); 20 21 private static ZooKeeper zooKeeper; 22 23 public static void main(String[] args) throws IOException, InterruptedException { 24 createSession(); 25 26 Thread.sleep(Integer.MAX_VALUE); 27 } 28 29 /** 30 * 创建会话 31 */ 32 private static ZooKeeper createSession() throws IOException, InterruptedException { 33 //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间 监听器(实现water接口,监听器用于接收通知) 34 zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() { 35 @Override 36 public void process(WatchedEvent watchedEvent) { 37 //TODO 与 ZooKeeper 的交互,一般都放在这里 38 if (watchedEvent.getState() == Event.KeeperState.SyncConnected){//已连接 39 logger.info("连接上了"); 40 try { 41 //同步方式创建节点 42 //String nodePath = sysCreateNode(zooKeeper,"/node_1","123".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 43 //logger.info("创建节点:"+nodePath); 44 45 //异步方式创建节点 46 //asCreateNode(zooKeeper,"/node_2","234".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 47 48 49 //同步方式获取子节点 不关注子节点变化 50 // List<String> list = getChildListNoWatch(zooKeeper,"/"); 51 // for (String name:list){ 52 // logger.info("子节点:"+name); 53 // } 54 55 //异步方式获取节点 关注子节点变化 56 // asGetChildListAndWatch(zooKeeper,"/"); 57 58 //同步方式获取节点数据 sysGetNodeData 59 60 byte[] bytes = sysGetNodeDataNoWatch(zooKeeper,"/node_1"); 61 logger.info("获取节点数据"+new String(bytes,"UTF-8")); 62 63 deleteNode(zooKeeper,"/node_1",0); 64 } catch (KeeperException|InterruptedException|UnsupportedEncodingException e) { 65 e.printStackTrace(); 66 } 67 68 } 69 } 70 }); 71 logger.info("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING 72 73 return zooKeeper; 74 75 76 } 77 78 /** 79 * 同步创建节点 80 * @param zooKeeper 连接 81 * @param path 节点路径 82 * @param bytes 字节数组数据 83 * @param acls 权限 84 * @param createMode 创建模式 85 * @return 86 * @throws UnsupportedEncodingException 87 * @throws KeeperException 88 * @throws InterruptedException 89 */ 90 public static String sysCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws UnsupportedEncodingException, KeeperException, InterruptedException { 91 return zooKeeper.create(path,bytes, acls, createMode); 92 } 93 94 95 /** 96 * 异步创建节点 97 * @param zooKeeper 连接 98 * @param path 节点路径 99 * @param bytes 字节数组数据 100 * @param acls 权限 101 * @param createMode 创建模式 102 * @throws KeeperException 103 * @throws InterruptedException 104 */ 105 public static void asCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws KeeperException, InterruptedException { 106 107 //异步创建需要增加 AsyncCallback.StringCallback 接口的实现类 以及 一个上下文对象参数 108 zooKeeper.create(path, bytes, acls, createMode, new AsyncCallback.StringCallback() { 109 /** 110 * 111 * @param rc 节点创建结果返回码 0-节点创建成功 112 * @param path 节点真实路径 113 * @param ctx 异步调用上下文 就是 create方法本地调用的那个最后一个参数 114 * @param name 115 */ 116 @Override 117 public void processResult(int rc, String path, Object ctx, String name) { 118 StringBuilder stringBuilder = new StringBuilder(); 119 stringBuilder.append("\\nrc="+rc+"\\n" + 120 "path="+path+"\\n" + 121 "ctx="+ctx+"\\n" + 122 "name="+name+"\\n"); 123 logger.info(stringBuilder.toString()); 124 } 125 },"异步创建"); 126 } 127 128 129 /** 130 * 同步方式获取子节点 不关注子节点变化 131 * @param zooKeeper 连接 132 * @param parentPath 父路径 133 * @return 134 * @throws KeeperException 135 * @throws InterruptedException 136 */ 137 public static List<String> sysGetChildListNoWatch(ZooKeeper zooKeeper,String parentPath) throws KeeperException, InterruptedException { 138 //参数说明: 父节点路径 是否需要关注子节点的变化 如果 true,则子节点发生变化后,会产生 NodeChildrenChanged 事件 139 List<String> childs = zooKeeper.getChildren(parentPath,false); 140 return childs; 141 } 142 143 /** 144 * 异步方式获取子节点 关注子节点变化 145 * @param zooKeeper 连接 146 * @param parentPath 父路径 147 */ 148 public static void asGetChildListAndWatch(ZooKeeper zooKeeper,String parentPath){ 149 zooKeeper.getChildren(parentPath, true, new AsyncCallback.Children2Callback() { 150 @Override 151 public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { 152 logger.info("变化后的子节点:"); 153 for (String name:children){ 154 logger.info("子节点:"+name); 155 } 156 } 157 },"关注子节点变化"); 158 } 159 160 /** 161 * 同步方式获取数据 162 * @param zooKeeper 163 * @param path 164 * @return 165 * @throws KeeperException 166 * @throws InterruptedException 167 */ 168 public static byte[] sysGetNodeDataNoWatch(ZooKeeper zooKeeper,String path) throws KeeperException, InterruptedException { 169 //路径 是否关注数据变化 状态 170 return zooKeeper.getData(path,false,new Stat()); 171 } 172 173 /** 174 * 删除节点 175 * @param zooKeeper 176 * @param nodePath 177 * @param version 178 * @throws KeeperException 179 * @throws InterruptedException 180 */ 181 public static void deleteNode(ZooKeeper zooKeeper,String nodePath,int version) throws KeeperException, InterruptedException { 182 zooKeeper.delete(nodePath,version); 183 } 184 185 186 }
除了ZooKeeper提供的Java API外,还有两种客户端,ZKClient和Curator两种客户端,都是对原生API的封装,使得操作更方便
《从PAXOS到ZOOKEEPER分布式一致性原理与实践》,可以参考这本书
以上是关于Java API操作ZooKeeper的主要内容,如果未能解决你的问题,请参考以下文章
使用ZooKeeper提供的Java API操作ZooKeeper
zookeeper学习-4Java API操作 - 服务端和客户端操作
zookeeper学习-5Java API操作 - Watcher监听机制