Java API操作ZooKeeper

Posted KoKo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java API操作ZooKeeper相关的知识,希望对你有一定的参考价值。

  • 创建会话
 1 package org.zln.zk;
 2 
 3 import org.apache.zookeeper.WatchedEvent;
 4 import org.apache.zookeeper.Watcher;
 5 import org.apache.zookeeper.ZooKeeper;
 6 
 7 import java.io.IOException;
 8 
 9 /**
10  * Created by sherry on 16/8/27.
11  */
12 public class TestZooKeeperClientApi {
13 
14     private static ZooKeeper zooKeeper;
15 
16     public static void main(String[] args) throws IOException, InterruptedException {
17         createSession();
18     }
19 
20     /**
21      * 创建会话
22      */
23     private static ZooKeeper createSession() throws IOException, InterruptedException {
24         //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间  监听器(实现water接口,监听器用于接收通知)
25         zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {
26             @Override
27             public void process(WatchedEvent watchedEvent) {
28                 System.out.println("收到事件:"+watchedEvent);//收到事件:WatchedEvent state:SyncConnected type:None path:null
29 
30 
31                 //TODO
32             }
33         });
34         System.out.println("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING
35 
36         //如果不停一段时间,那么,监听器还没收到监听,方法就已经退出了
37         Thread.sleep(5000);
38 
39         return zooKeeper;
40 
41 
42     }
43 }
创建会话

 

  • 创建节点
 1 package org.zln.zk;
 2 
 3 import org.apache.zookeeper.*;
 4 import org.apache.zookeeper.data.ACL;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 
 8 import java.io.IOException;
 9 import java.io.UnsupportedEncodingException;
10 import java.util.ArrayList;
11 
12 /**
13  * Created by sherry on 16/8/27.
14  */
15 public class TestZooKeeperClientApi {
16 
17     private static Logger logger = LoggerFactory.getLogger(TestZooKeeperClientApi.class);
18 
19     private static ZooKeeper zooKeeper;
20 
21     public static void main(String[] args) throws IOException, InterruptedException {
22         createSession();
23     }
24 
25     /**
26      * 创建会话
27      */
28     private static ZooKeeper createSession() throws IOException, InterruptedException {
29         //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间  监听器(实现water接口,监听器用于接收通知)
30         zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {
31             @Override
32             public void process(WatchedEvent watchedEvent) {
33                 //TODO 与 ZooKeeper 的交互,一般都放在这里
34                 if (watchedEvent.getState() == Event.KeeperState.SyncConnected){//已连接
35                     logger.info("连接上了");
36 
37                     try {
38                         //参数说明:节点路径  数据的字节数组 权限 创建节点模式
39                         String nodePath = createNode(zooKeeper,"/node_1","123".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
40                         logger.info("创建节点:"+nodePath);
41                     } catch (UnsupportedEncodingException|KeeperException|InterruptedException e) {
42                         e.printStackTrace();
43                     }
44 
45                 }
46             }
47         });
48         logger.info("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING
49 
50         //如果不停一段时间,那么,监听器还没收到监听,方法就已经退出了
51         Thread.sleep(5000);
52 
53         return zooKeeper;
54 
55 
56     }
57 
58     /**
59      * 创建ZooKeeper节点
60      * @param zooKeeper ZooKeeper连接
61      * @return 节点路径
62      */
63     public static String createNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws UnsupportedEncodingException, KeeperException, InterruptedException {
64         //参数说明:节点路径  数据的字节数组 权限 创建节点模式
65         return zooKeeper.create(path,bytes, acls, createMode);
66     }
67 }
创建节点
创建模式

PERSISTENT          持久节点
PERSISTENT_SEQUENTIAL  
持久顺序节点
EPHEMERAL          临时节点
EPHEMERAL_SEQUENTIAL   临时顺序节点

 

以上代码,是属于同步创建

 

 1 /**
 2      * 异步创建节点
 3      * @param zooKeeper
 4      * @param path
 5      * @param bytes
 6      * @param acls
 7      * @param createMode
 8      * @throws KeeperException
 9      * @throws InterruptedException
10      */
11     public static void asCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws KeeperException, InterruptedException {
12 
13         //异步创建需要增加 AsyncCallback.StringCallback 接口的实现类 以及 一个上下文对象参数
14         zooKeeper.create(path, bytes, acls, createMode, new AsyncCallback.StringCallback() {
15             /**
16              *
17              * @param rc  节点创建结果返回码  0-节点创建成功
18              * @param path 节点真实路径
19              * @param ctx  异步调用上下文  就是 create方法本地调用的那个最后一个参数
20              * @param name
21              */
22             @Override
23             public void processResult(int rc, String path, Object ctx, String name) {
24                 StringBuilder stringBuilder = new StringBuilder();
25                 stringBuilder.append("\\nrc="+rc+"\\n" +
26                         "path="+path+"\\n" +
27                         "ctx="+ctx+"\\n" +
28                         "name="+name+"\\n");
29                 logger.info(stringBuilder.toString());
30             }
31         },"异步创建");
32     }
异步方式创建节点

 

  • 获取子节点
 1     /**
 2      * 同步方式获取子节点
 3      * @param zooKeeper     连接
 4      * @param parentPath    父路径
 5      * @return
 6      * @throws KeeperException
 7      * @throws InterruptedException
 8      */
 9     public static List<String> getChildList(ZooKeeper zooKeeper,String parentPath) throws KeeperException, InterruptedException {
10         //参数说明:  父节点路径  是否需要关注子节点的变化
11         List<String> childs = zooKeeper.getChildren(parentPath,false);
12         return childs;
13     }
同步方式获取子节点且不关注子节点的变化
异步方式获取子节点且关注子节点的变化
 1     /**
 2      * 异步方式获取子节点 关注子节点变化
 3      * @param zooKeeper     连接
 4      * @param parentPath    父路径
 5      */
 6     public static void asGetChildListAndWatch(ZooKeeper zooKeeper,String parentPath){
 7         zooKeeper.getChildren(parentPath, true, new AsyncCallback.Children2Callback() {
 8             @Override
 9             public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
10                 logger.info("变化后的子节点:");
11                 for (String name:children){
12                     logger.info("子节点:"+name);
13                 }
14             }
15         },"关注子节点变化");
16     }
AsyncCallback.Children2Callback接口实现异步关注

目前为止可以发现这个规律,有回调函数的是异步方式调用,没有回调函数的是同步调用

问:同步调用和异步调用的使用场景是???

答:下面的操作依赖调用结果的时候,就需要调用同步方法

 

 

 

  1 package org.zln.zk;
  2 
  3 import org.apache.zookeeper.*;
  4 import org.apache.zookeeper.data.ACL;
  5 import org.apache.zookeeper.data.Stat;
  6 import org.slf4j.Logger;
  7 import org.slf4j.LoggerFactory;
  8 
  9 import java.io.IOException;
 10 import java.io.UnsupportedEncodingException;
 11 import java.util.ArrayList;
 12 import java.util.List;
 13 
 14 /**
 15  * Created by sherry on 16/8/27.
 16  */
 17 public class TestZooKeeperClientApi {
 18 
 19     private static Logger logger = LoggerFactory.getLogger(TestZooKeeperClientApi.class);
 20 
 21     private static ZooKeeper zooKeeper;
 22 
 23     public static void main(String[] args) throws IOException, InterruptedException {
 24         createSession();
 25 
 26         Thread.sleep(Integer.MAX_VALUE);
 27     }
 28 
 29     /**
 30      * 创建会话
 31      */
 32     private static ZooKeeper createSession() throws IOException, InterruptedException {
 33         //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间  监听器(实现water接口,监听器用于接收通知)
 34         zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {
 35             @Override
 36             public void process(WatchedEvent watchedEvent) {
 37                 //TODO 与 ZooKeeper 的交互,一般都放在这里
 38                 if (watchedEvent.getState() == Event.KeeperState.SyncConnected){//已连接
 39                     logger.info("连接上了");
 40                     try {
 41                     //同步方式创建节点
 42                     //String nodePath = sysCreateNode(zooKeeper,"/node_1","123".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 43                     //logger.info("创建节点:"+nodePath);
 44 
 45                     //异步方式创建节点
 46                     //asCreateNode(zooKeeper,"/node_2","234".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 47 
 48 
 49                     //同步方式获取子节点  不关注子节点变化
 50 //                  List<String> list = getChildListNoWatch(zooKeeper,"/");
 51 //                  for (String name:list){
 52 //                      logger.info("子节点:"+name);
 53 //                  }
 54 
 55                     //异步方式获取节点 关注子节点变化
 56 //                    asGetChildListAndWatch(zooKeeper,"/");
 57 
 58                     //同步方式获取节点数据 sysGetNodeData
 59 
 60                         byte[] bytes = sysGetNodeDataNoWatch(zooKeeper,"/node_1");
 61                         logger.info("获取节点数据"+new String(bytes,"UTF-8"));
 62 
 63                         deleteNode(zooKeeper,"/node_1",0);
 64                     } catch (KeeperException|InterruptedException|UnsupportedEncodingException e) {
 65                         e.printStackTrace();
 66                     }
 67 
 68                 }
 69             }
 70         });
 71         logger.info("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING
 72 
 73         return zooKeeper;
 74 
 75 
 76     }
 77 
 78     /**
 79      * 同步创建节点
 80      * @param zooKeeper  连接
 81      * @param path       节点路径
 82      * @param bytes      字节数组数据
 83      * @param acls       权限
 84      * @param createMode 创建模式
 85      * @return
 86      * @throws UnsupportedEncodingException
 87      * @throws KeeperException
 88      * @throws InterruptedException
 89      */
 90     public static String sysCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws UnsupportedEncodingException, KeeperException, InterruptedException {
 91         return zooKeeper.create(path,bytes, acls, createMode);
 92     }
 93 
 94 
 95     /**
 96      * 异步创建节点
 97      * @param zooKeeper  连接
 98      * @param path       节点路径
 99      * @param bytes      字节数组数据
100      * @param acls       权限
101      * @param createMode 创建模式
102      * @throws KeeperException
103      * @throws InterruptedException
104      */
105     public static void asCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws KeeperException, InterruptedException {
106 
107         //异步创建需要增加 AsyncCallback.StringCallback 接口的实现类 以及 一个上下文对象参数
108         zooKeeper.create(path, bytes, acls, createMode, new AsyncCallback.StringCallback() {
109             /**
110              *
111              * @param rc  节点创建结果返回码  0-节点创建成功
112              * @param path 节点真实路径
113              * @param ctx  异步调用上下文  就是 create方法本地调用的那个最后一个参数
114              * @param name
115              */
116             @Override
117             public void processResult(int rc, String path, Object ctx, String name) {
118                 StringBuilder stringBuilder = new StringBuilder();
119                 stringBuilder.append("\\nrc="+rc+"\\n" +
120                         "path="+path+"\\n" +
121                         "ctx="+ctx+"\\n" +
122                         "name="+name+"\\n");
123                 logger.info(stringBuilder.toString());
124             }
125         },"异步创建");
126     }
127 
128 
129     /**
130      * 同步方式获取子节点  不关注子节点变化
131      * @param zooKeeper     连接
132      * @param parentPath    父路径
133      * @return
134      * @throws KeeperException
135      * @throws InterruptedException
136      */
137     public static List<String> sysGetChildListNoWatch(ZooKeeper zooKeeper,String parentPath) throws KeeperException, InterruptedException {
138         //参数说明:  父节点路径  是否需要关注子节点的变化  如果 true,则子节点发生变化后,会产生  NodeChildrenChanged 事件
139         List<String> childs = zooKeeper.getChildren(parentPath,false);
140         return childs;
141     }
142 
143     /**
144      * 异步方式获取子节点 关注子节点变化
145      * @param zooKeeper     连接
146      * @param parentPath    父路径
147      */
148     public static void asGetChildListAndWatch(ZooKeeper zooKeeper,String parentPath){
149         zooKeeper.getChildren(parentPath, true, new AsyncCallback.Children2Callback() {
150             @Override
151             public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
152                 logger.info("变化后的子节点:");
153                 for (String name:children){
154                     logger.info("子节点:"+name);
155                 }
156             }
157         },"关注子节点变化");
158     }
159 
160     /**
161      * 同步方式获取数据
162      * @param zooKeeper
163      * @param path
164      * @return
165      * @throws KeeperException
166      * @throws InterruptedException
167      */
168     public static byte[] sysGetNodeDataNoWatch(ZooKeeper zooKeeper,String path) throws KeeperException, InterruptedException {
169         //路径 是否关注数据变化 状态
170         return zooKeeper.getData(path,false,new Stat());
171     }
172 
173     /**
174      * 删除节点
175      * @param zooKeeper
176      * @param nodePath
177      * @param version
178      * @throws KeeperException
179      * @throws InterruptedException
180      */
181     public static void deleteNode(ZooKeeper zooKeeper,String nodePath,int version) throws KeeperException, InterruptedException {
182         zooKeeper.delete(nodePath,version);
183     }
184 
185 
186 }
客户端代码汇总

 

 

除了ZooKeeper提供的Java API外,还有两种客户端,ZKClient和Curator两种客户端,都是对原生API的封装,使得操作更方便

《从PAXOS到ZOOKEEPER分布式一致性原理与实践》,可以参考这本书

以上是关于Java API操作ZooKeeper的主要内容,如果未能解决你的问题,请参考以下文章

Java API操作ZooKeeper

使用ZooKeeper提供的Java API操作ZooKeeper

zookeeper学习-4Java API操作 - 服务端和客户端操作

zookeeper学习-5Java API操作 - Watcher监听机制

大数据讲课笔记6.6 ZooKeeper的Java API操作

大数据讲课笔记6.6 ZooKeeper的Java API操作