zookeeper客户端
Posted zhangwanhua
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper客户端相关的知识,希望对你有一定的参考价值。
zookeeper客户端主要负责与用户进行交互,将命令发送到服务器,接收服务器的响应,反馈给用户。主要分为一下三层:
用户命令处理层
用户命令处理层的功能是读取用户输入的命令,解析用户命令和输入参数,根据命令和参数,进行一些校验,然后执行节点操作。
源码实例(ZooKeeperMain):
1 public class ZooKeeperMain { 2 // 命令解析器。用于解析命令 3 protected MyCommandOptions cl = new MyCommandOptions(); 4 5 // 主函数 6 public static void main(String args[]) throws KeeperException, IOException, InterruptedException { 7 // 运行客户端 8 ZooKeeperMain main = new ZooKeeperMain(args); 9 main.run(); 10 } 11 12 public ZooKeeperMain(String args[]) throws IOException, InterruptedException { 13 // 解析启动参数 14 cl.parseOptions(args); 15 // 获取server参数,连接服务器 16 connectToZK(cl.getOption("server")); 17 18 } 19 20 // 连接服务器 21 protected void connectToZK(String newHost) throws InterruptedException, IOException { 22 host = newHost; 23 zk = new ZooKeeper(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher()); 24 } 25 26 void run() throws KeeperException, IOException, InterruptedException { 27 // 循环读取命令, 28 BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); 29 String line; 30 while ((line = br.readLine()) != null) { 31 // 执行命令 32 executeLine(line); 33 } 34 } 35 36 public void executeLine(String line) throws InterruptedException, IOException, KeeperException { 37 if (!line.equals("")) { 38 // 解析命令 39 cl.parseCommand(line); 40 // 执行命令 41 processZKCmd(cl); 42 } 43 } 44 45 protected boolean processZKCmd(MyCommandOptions co) throws KeeperException, IOException, InterruptedException { 46 // 读取命令和参数 47 Stat stat = new Stat(); 48 String[] args = co.getArgArray(); 49 String cmd = co.getCommand(); 50 boolean watch = args.length > 2; 51 String path = null; 52 List<ACL> acl = Ids.OPEN_ACL_UNSAFE; 53 // 执行不同的命令,主要是进行一些校验,然后调用zookeeper方法 54 if (cmd.equals("quit")) { 55 zk.close(); 56 System.exit(0); 57 } else if (cmd.equals("redo") && args.length >= 2) { 58 Integer i = Integer.decode(args[1]); 59 if (commandCount <= i) { 60 return false; 61 } 62 cl.parseCommand(history.get(i)); 63 history.put(commandCount, history.get(i)); 64 processCmd(cl); 65 } else if (cmd.equals("history")) { 66 for (int i = commandCount - 10; i <= commandCount; ++i) { 67 if (i < 0) 68 continue; 69 System.out.println(i + " - " + history.get(i)); 70 } 71 } else if (cmd.equals("printwatches")) { 72 if (args.length == 1) { 73 System.out.println("printwatches is " + (printWatches ? "on" : "off")); 74 } else { 75 printWatches = args[1].equals("on"); 76 } 77 } else if (cmd.equals("connect")) { 78 if (args.length >= 2) { 79 connectToZK(args[1]); 80 } else { 81 connectToZK(host); 82 } 83 } 84 if (cmd.equals("create") && args.length >= 3) { 85 int first = 0; 86 CreateMode flags = CreateMode.PERSISTENT; 87 if ((args[1].equals("-e") && args[2].equals("-s")) || (args[1]).equals("-s") && (args[2].equals("-e"))) { 88 first += 2; 89 flags = CreateMode.EPHEMERAL_SEQUENTIAL; 90 } else if (args[1].equals("-e")) { 91 first++; 92 flags = CreateMode.EPHEMERAL; 93 } else if (args[1].equals("-s")) { 94 first++; 95 flags = CreateMode.PERSISTENT_SEQUENTIAL; 96 } 97 if (args.length == first + 4) { 98 acl = parseACLs(args[first + 3]); 99 } 100 path = args[first + 1]; 101 String newPath = zk.create(path, args[first + 2].getBytes(), acl, flags); 102 } else if (cmd.equals("delete") && args.length >= 2) { 103 path = args[1]; 104 zk.delete(path, watch ? Integer.parseInt(args[2]) : -1); 105 } else if (cmd.equals("set") && args.length >= 3) { 106 path = args[1]; 107 stat = zk.setData(path, args[2].getBytes(), args.length > 3 ? Integer.parseInt(args[3]) : -1); 108 printStat(stat); 109 } else if (cmd.equals("aget") && args.length >= 2) { 110 path = args[1]; 111 zk.getData(path, watch, dataCallback, path); 112 } else if (cmd.equals("get") && args.length >= 2) { 113 path = args[1]; 114 byte data[] = zk.getData(path, watch, stat); 115 data = (data == null) ? "null".getBytes() : data; 116 System.out.println(new String(data)); 117 printStat(stat); 118 } else if (cmd.equals("ls") && args.length >= 2) { 119 path = args[1]; 120 List<String> children = zk.getChildren(path, watch); 121 System.out.println(children); 122 } else if (cmd.equals("ls2") && args.length >= 2) { 123 path = args[1]; 124 List<String> children = zk.getChildren(path, watch, stat); 125 System.out.println(children); 126 printStat(stat); 127 } else if (cmd.equals("getAcl") && args.length >= 2) { 128 path = args[1]; 129 acl = zk.getACL(path, stat); 130 for (ACL a : acl) { 131 System.out.println(a.getId() + ": " + getPermString(a.getPerms())); 132 } 133 } else if (cmd.equals("setAcl") && args.length >= 3) { 134 path = args[1]; 135 stat = zk.setACL(path, parseACLs(args[2]), args.length > 4 ? Integer.parseInt(args[3]) : -1); 136 printStat(stat); 137 } else if (cmd.equals("stat") && args.length >= 2) { 138 path = args[1]; 139 stat = zk.exists(path, watch); 140 printStat(stat); 141 } else if (cmd.equals("listquota") && args.length >= 2) { 142 path = args[1]; 143 String absolutePath = Quotas.quotaZookeeper + path + "/" + Quotas.limitNode; 144 byte[] data = null; 145 try { 146 data = zk.getData(absolutePath, false, stat); 147 StatsTrack st = new StatsTrack(new String(data)); 148 data = zk.getData(Quotas.quotaZookeeper + path + "/" + Quotas.statNode, false, stat); 149 System.out.println("Output stat for " + path + " " + new StatsTrack(new String(data)).toString()); 150 } catch (KeeperException.NoNodeException ne) { 151 System.err.println("quota for " + path + " does not exist."); 152 } 153 } else if (cmd.equals("setquota") && args.length >= 4) { 154 String option = args[1]; 155 String val = args[2]; 156 path = args[3]; 157 System.err.println("Comment: the parts are " + "option " + option + " val " + val + " path " + path); 158 if ("-b".equals(option)) { 159 // we are setting the bytes quota 160 createQuota(zk, path, Long.parseLong(val), -1); 161 } else if ("-n".equals(option)) { 162 // we are setting the num quota 163 createQuota(zk, path, -1L, Integer.parseInt(val)); 164 } else { 165 usage(); 166 } 167 168 } else if (cmd.equals("delquota") && args.length >= 2) { 169 // if neither option -n or -b is specified, we delete 170 // the quota node for thsi node. 171 if (args.length == 3) { 172 // this time we have an option 173 String option = args[1]; 174 path = args[2]; 175 if ("-b".equals(option)) { 176 delQuota(zk, path, true, false); 177 } else if ("-n".equals(option)) { 178 delQuota(zk, path, false, true); 179 } 180 } else if (args.length == 2) { 181 path = args[1]; 182 // we dont have an option specified. 183 // just delete whole quota node 184 delQuota(zk, path, true, true); 185 } else if (cmd.equals("help")) { 186 usage(); 187 } 188 } else if (cmd.equals("close")) { 189 zk.close(); 190 } else if (cmd.equals("addauth") && args.length >= 2) { 191 byte[] b = null; 192 if (args.length >= 3) 193 b = args[2].getBytes(); 194 195 zk.addAuthInfo(args[1], b); 196 } else { 197 usage(); 198 } 199 return watch; 200 } 201 }
除了基础的节点操作外,用户命令层还提供了节点配额的控制。节点配额的控制通过在/zookeeper/quaota对应的目录下记录当前节点数据大小和现在大小实现。
源码实例(ZooKeeperMain.createQuota):
1 public static boolean createQuota(ZooKeeper zk, String path, 2 long bytes, int numNodes) 3 throws KeeperException, IOException, InterruptedException 4 { 5 //判断指定路径是否存在 6 Stat initStat = zk.exists(path, false); 7 if (initStat == null) { 8 throw new IllegalArgumentException(path + " does not exist."); 9 } 10 String quotaPath = Quotas.quotaZookeeper; 11 String realPath = Quotas.quotaZookeeper + path; 12 try { 13 //判断在子节点中是否有限量设置 14 List<String> children = zk.getChildren(realPath, false); 15 for (String child: children) { 16 if (!child.startsWith("zookeeper_")) { 17 throw new IllegalArgumentException(path + " has child " + 18 child + " which has a quota"); 19 } 20 } 21 } catch(KeeperException.NoNodeException ne) { 22 // this is fine 23 } 24 //判断夫节点中是否有限量设置 25 checkIfParentQuota(zk, path); 26 //如果当前节点限量设置为空,逐级创建节点数据 27 if (zk.exists(quotaPath, false) == null) { 28 try { 29 zk.create(Quotas.procZookeeper, null, Ids.OPEN_ACL_UNSAFE, 30 CreateMode.PERSISTENT); 31 zk.create(Quotas.quotaZookeeper, null, Ids.OPEN_ACL_UNSAFE, 32 CreateMode.PERSISTENT); 33 } catch(KeeperException.NodeExistsException ne) { 34 // do nothing 35 } 36 } 37 String[] splits = path.split("/"); 38 StringBuilder sb = new StringBuilder(); 39 sb.append(quotaPath); 40 for (int i=1; i<splits.length; i++) { 41 sb.append("/" + splits[i]); 42 quotaPath = sb.toString(); 43 try { 44 zk.create(quotaPath, null, Ids.OPEN_ACL_UNSAFE , 45 CreateMode.PERSISTENT); 46 } catch(KeeperException.NodeExistsException ne) { 47 //do nothing 48 } 49 } 50 //创建限量设置节点 51 String statPath = quotaPath + "/" + Quotas.statNode; 52 quotaPath = quotaPath + "/" + Quotas.limitNode; 53 StatsTrack strack = new StatsTrack(null); 54 strack.setBytes(bytes); 55 strack.setCount(numNodes); 56 try { 57 zk.create(quotaPath, strack.toString().getBytes(), 58 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 59 StatsTrack stats = new StatsTrack(null); 60 stats.setBytes(0L); 61 stats.setCount(0); 62 zk.create(statPath, stats.toString().getBytes(), 63 Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 64 } catch(KeeperException.NodeExistsException ne) { 65 byte[] data = zk.getData(quotaPath, false , new Stat()); 66 StatsTrack strackC = new StatsTrack(new String(data)); 67 if (bytes != -1L) { 68 strackC.setBytes(bytes); 69 } 70 if (numNodes != -1) { 71 strackC.setCount(numNodes); 72 } 73 zk.setData(quotaPath, strackC.toString().getBytes(), -1); 74 } 75 return true; 76 }
节点处理层
节点处理层主要是提供节点操作功能,将节点操作参数封装成数据对象,然后通过网络层发送数据对象,并返回结果。网络层提供了同步和异步两种网络请求方式。
创建节点(ZooKeeper):
public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) { final String clientPath = path; //解析client相对路径到全路径 final String serverPath = prependChroot(clientPath); //设置请求头 RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.create); //设置创建节点请求体 CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); ReplyHeader r = new ReplyHeader(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); request.setAcl(acl); //通过网络层发送请求 cnxn.queuePacket(h, r, request, response, cb, clientPath, serverPath, ctx, null); }
删除节点(ZooKeeper):
1 public void delete(final String path, int version) 2 throws InterruptedException, KeeperException 3 { 4 final String clientPath = path; 5 //解析client相对路径到全路径 6 final String serverPath = prependChroot(clientPath); 7 //设置请求头 8 RequestHeader h = new RequestHeader(); 9 h.setType(ZooDefs.OpCode.delete); 10 //设置删除节点请求体 11 DeleteRequest request = new DeleteRequest(); 12 request.setPath(serverPath); 13 request.setVersion(version); 14 cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath, 15 serverPath, ctx, null); 16 }
其他方法(ZooKeeper):
1 public void exists(final String path, Watcher watcher, 2 StatCallback cb, Object ctx) 3 { 4 final String clientPath = path; 5 PathUtils.validatePath(clientPath); 6 7 // the watch contains the un-chroot path 8 WatchRegistration wcb = null; 9 if (watcher != null) { 10 wcb = new ExistsWatchRegistration(watcher, clientPath); 11 } 12 13 final String serverPath = prependChroot(clientPath); 14 15 RequestHeader h = new RequestHeader(); 16 h.setType(ZooDefs.OpCode.exists); 17 ExistsRequest request = new ExistsRequest(); 18 request.setPath(serverPath); 19 request.setWatch(watcher != null); 20 SetDataResponse response = new SetDataResponse(); 21 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, 22 clientPath, serverPath, ctx, wcb); 23 } 24 public void getData(final String path, Watcher watcher, 25 DataCallback cb, Object ctx) 26 { 27 final String clientPath = path; 28 PathUtils.validatePath(clientPath); 29 30 // the watch contains the un-chroot path 31 WatchRegistration wcb = null; 32 if (watcher != null) { 33 wcb = new DataWatchRegistration(watcher, clientPath); 34 } 35 36 final String serverPath = prependChroot(clientPath); 37 38 RequestHeader h = new RequestHeader(); 39 h.setType(ZooDefs.OpCode.getData); 40 GetDataRequest request = new GetDataRequest(); 41 request.setPath(serverPath); 42 request.setWatch(watcher != null); 43 GetDataResponse response = new GetDataResponse(); 44 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, 45 clientPath, serverPath, ctx, wcb); 46 } 47 public void setData(final String path, byte data[], int version, 48 StatCallback cb, Object ctx) 49 { 50 final String clientPath = path; 51 PathUtils.validatePath(clientPath); 52 53 final String serverPath = prependChroot(clientPath); 54 55 RequestHeader h = new RequestHeader(); 56 h.setType(ZooDefs.OpCode.setData); 57 SetDataRequest request = new SetDataRequest(); 58 request.setPath(serverPath); 59 request.setData(data); 60 request.setVersion(version); 61 SetDataResponse response = new SetDataResponse(); 62 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, 63 clientPath, serverPath, ctx, null); 64 } 65 66 public void getACL(final String path, Stat stat, ACLCallback cb, 67 Object ctx) 68 { 69 final String clientPath = path; 70 PathUtils.validatePath(clientPath); 71 72 final String serverPath = prependChroot(clientPath); 73 74 RequestHeader h = new RequestHeader(); 75 h.setType(ZooDefs.OpCode.getACL); 76 GetACLRequest request = new GetACLRequest(); 77 request.setPath(serverPath); 78 GetACLResponse response = new GetACLResponse(); 79 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, 80 clientPath, serverPath, ctx, null); 81 } 82 public void setACL(final String path, List<ACL> acl, int version, 83 StatCallback cb, Object ctx) 84 { 85 final String clientPath = path; 86 PathUtils.validatePath(clientPath); 87 88 final String serverPath = prependChroot(clientPath); 89 90 RequestHeader h = new RequestHeader(); 91 h.setType(ZooDefs.OpCode.setACL); 92 SetACLRequest request = new SetACLRequest(); 93 request.setPath(serverPath); 94 request.setAcl(acl); 95 request.setVersion(version); 96 SetACLResponse response = new SetACLResponse(); 97 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, 98 clientPath, serverPath, ctx, null); 99 } 100 public void getChildren(final String path, Watcher watcher, 101 Children2Callback cb, Object ctx) 102 { 103 final String clientPath = path; 104 final String serverPath = prependChroot(clientPath); 105 106 WatchRegistration wcb = null; 107 if (watcher != null) { 108 wcb = new ChildWatchRegistration(watcher, clientPath); 109 } 110 111 RequestHeader h = new RequestHeader(); 112 h.setType(ZooDefs.OpCode.getChildren2); 113 GetChildren2Request request = new GetChildren2Request(); 114 request.setPath(serverPath); 115 request.setWatch(watcher != null); 116 GetChildren2Response response = new GetChildren2Response(); 117 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, 118 clientPath, serverPath, ctx, wcb); 119 } 120 public void sync(final String path, VoidCallback cb, Object ctx){ 121 final String clientPath = path; 122 PathUtils.validatePath(clientPath); 123 124 final String serverPath = prependChroot(clientPath); 125 126 RequestHeader h = new RequestHeader(); 127 h.setType(ZooDefs.OpCode.sync); 128 SyncRequest request = new SyncRequest(); 129 SyncResponse response = new SyncResponse(); 130 request.setPath(serverPath); 131 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, 132 clientPath, serverPath, ctx, null); 133 }
网络请求层
网络请求层最为复杂,主要实现nio异步网络请求以及结果回调,watcher管理。
提供了同步和异步两种通信方式。同步通信其实也是通过异步通信实现,首先会使用异步通信发送请求,然后判断返回结果是否ready,如果没有则通过wait进入阻塞状态。当异步通信返回请求时,会设置返回结果状态,并且唤醒阻塞的线程。
同步请求(ClientCnxn.submitRequest):
1 public ReplyHeader submitRequest(RequestHeader h, Record request, 2 Record response, WatchRegistration watchRegistration) 3 throws InterruptedException { 4 //异步发送请求包 5 ReplyHeader r = new ReplyHeader(); 6 Packet packet = queuePacket(h, r, request, response, null, null, null, 7 null, watchRegistration); 8 //如果请求包没有返回数据,则线上等待 9 synchronized (packet) { 10 while (!packet.finished) { 11 packet.wait(); 12 } 13 } 14 return r; 15 }
异步请求的参数会被封装成一个Packet对象放入outgoingQueue队列中。会有一个发送线程从outgoingQueue队列中取出一个可发送的Packet对象,并发送序列化信息,然后把该Packet放入到pendingQueue队列中,当接收到服务端响应,反序列号出结果数据,然后在pendingQueue中找到对应的Packet,设置结果,最后对于有回调和watcher的命令封装成事件放入事件队列中,会有另一个事件线程,从事件队列中读取事件消息,,执行回调和watcher逻辑。
异步请求(ClientCnxn.queuePacket):
1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, 2 Record response, AsyncCallback cb, String clientPath, 3 String serverPath, Object ctx, WatchRegistration watchRegistration) 4 { 5 6 Packet packet = null; 7 synchronized (outgoingQueue) { 8 //设置一个全局唯一的id,作为数据包的id 9 if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) { 10 h.setXid(getXid()); 11 } 12 //将请求头,请求体,返回结果,watcher等封装成数据包。 13 packet = new Packet(h, r, request, response, null, 14 watchRegistration); 15 packet.cb = cb; 16 packet.ctx = ctx; 17 packet.clientPath = clientPath; 18 packet.serverPath = serverPath; 19 //将数据包添加到outgoing队列中。 20 outgoingQueue.add(packet); 21 } 22 sendThread.wakeup(); 23 return packet; 24 }
发送线程执行流程如下:
1.启动线程,建立服务器连接。(状态为Connecting)
2.建立连接后,进行初始化,主要是向服务器发送默认watcher命令、auth命令、connect命令。(状态为Connected)
3. 从outgoing队列中读取数据包,发送到服务端。
4.接收服务端请求,处理返回结构,connect命令记录sessionid、sessionpwd、timeout等;如果是其他命令,然后在pendingQueue中找到对应的Packet,设置结果。
5.对于有回调和watcher的命令封装成事件放入事件队列中。
发送线程主流程(ClientCnxn.SendThread.run):
1 class SendThread extends Thread { 2 SelectionKey sockKey; 3 private final Selector selector = Selector.open(); 4 public void run() { 5 while (zooKeeper.state.isAlive()) { 6 //建立连接 7 startConnect(); 8 //获取注册通道 9 selector.select(1000); 10 Set<SelectionKey> selected; 11 synchronized (this) { 12 selected = selector.selectedKeys(); 13 } 14 for (SelectionKey k : selected) { 15 SocketChannel sc = ((SocketChannel) k.channel()); 16 if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) { 17 //建立连接 18 if (sc.finishConnect()) { 19 primeConnection(k); 20 } 21 //读写数据 22 } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { 23 doIO(); 24 } 25 } 26 } 27 try { 28 selector.close(); 29 } catch (IOException e) { 30 LOG.warn("Ignoring exception during selector close", e); 31 } 32 } 33 34 //通过nio建立连接 35 private void startConnect() throws IOException { 36 //从服务器列表中获取一台服务器地址 37 InetSocketAddress addr = serverAddrs.get(nextAddrToTry); 38 nextAddrToTry++; 39 if (nextAddrToTry == serverAddrs.size()) { 40 nextAddrToTry = 0; 41 } 42 //通过nio注册 43 SocketChannel sock; 44 sock = SocketChannel.open(); 45 sock.configureBlocking(false); 46 sock.socket().setSoLinger(false, -1); 47 sock.socket().setTcpNoDelay(true); 48 try { 49 sockKey = sock.register(selector, SelectionKey.OP_CONNECT); 50 } catch (IOException e) { 51 sock.close(); 52 throw e; 53 } 54 //初始化缓存 55 lenBuffer.clear(); 56 incomingBuffer = lenBuffer; 57 } 58 }
建立连接,进行初始化(ClientCnxn.SendThread.primeConnection):
1 private void primeConnection(SelectionKey k) throws IOException { 2 ConnectRequest conReq = new ConnectRequest(0, lastZxid, 3 sessionTimeout, sessionId, sessionPasswd); 4 //序列化连接命令 5 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 6 BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); 7 boa.writeInt(-1, "len"); 8 conReq.serialize(boa, "connect"); 9 baos.close(); 10 ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); 11 bb.putInt(bb.capacity() - 4); 12 bb.rewind(); 13 synchronized (outgoingQueue) { 14 //发送设置监听器请求,将请求封装成数据包,放入outgoing队列中 15 if (!disableAutoWatchReset) { 16 List<String> dataWatches = zooKeeper.getDataWatches(); 17 List<String> existWatches = zooKeeper.getExistWatches(); 18 List<String> childWatches = zooKeeper.getChildWatches(); 19 if (!dataWatches.isEmpty() 20 || !existWatches.isEmpty() || !childWatches.isEmpty()) { 21 SetWatches sw = new SetWatches(lastZxid, 22 prependChroot(dataWatches), 23 prependChroot(existWatches), 24 prependChroot(childWatches)); 25 RequestHeader h = new RequestHeader(); 26 h.setType(ZooDefs.OpCode.setWatches); 27 h.setXid(-8); 28 Packet packet = new Packet(h, new ReplyHeader(), sw, null, null, 29 null); 30 outgoingQueue.addFirst(packet); 31 } 32 } 33 //发送认证信息 34 for (AuthData id : authInfo) { 35 outgoingQueue.addFirst(new Packet(new RequestHeader(-4, 36 OpCode.auth), null, new AuthPacket(0, id.scheme, 37 id.data), null, null, null)); 38 } 39 //发送连接命令请求 40 outgoingQueue.addFirst((new Packet(null, null, null, null, bb, 41 null))); 42 } 43 //注册通道 44 synchronized (this) { 45 k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); 46 } 47 }
处理读写主流程,主要是nio操作(ClientCnxn.SendThread.doIO):
1 boolean doIO() throws InterruptedException, IOException { 2 boolean packetReceived = false; 3 //获取socketchannel 4 SocketChannel sock = (SocketChannel) sockKey.channel(); 5 //如果可读 6 if (sockKey.isReadable()) { 7 //读取数据到缓存中 8 int rc = sock.read(incomingBuffer); 9 //直到缓存读满 10 if (!incomingBuffer.hasRemaining()) { 11 //重置缓存 12 incomingBuffer.flip(); 13 //如果读取的是长度信息,读取长度信息,并且分配相应缓存 14 if (incomingBuffer == lenBuffer) { 15 int len = incomingBuffer.getInt(); 16 incomingBuffer = ByteBuffer.allocate(len); 17 } else if (!initialized) { 18 //如果是connect命令的返回值,获取session,timeout等相关信息 19 readConnectResult(); 20 enableRead(); 21 lenBuffer.clear(); 22 //重置缓存 23 incomingBuffer = lenBuffer; 24 initialized = true; 25 } else { 26 //读取数据内容 27 readResponse(); 28 //重置缓存 29 lenBuffer.clear(); 30 incomingBuffer = lenBuffer; 31 packetReceived = true; 32 } 33 } 34 } 35 //如果是写 36 if (sockKey.isWritable()) { 37 synchronized (outgoingQueue) { 38 if (!outgoingQueue.isEmpty()) { 39 //从outgoingQueue队列中拿数据包写入通道 40 ByteBuffer pbb = outgoingQueue.getFirst().bb; 41 sock.write(pbb); 42 if (!pbb.hasRemaining()) { 43 sentCount++; 44 Packet p = outgoingQueue.removeFirst(); 45 if (p.header != null 46 && p.header.getType() != OpCode.ping 47 && p.header.getType() != OpCode.auth) { 48 pendingQueue.add(p); 49 } 50 } 51 } 52 } 53 } 54 if (outgoingQueue.isEmpty()) { 55 disableWrite(); 56 } else { 57 enableWrite(); 58 } 59 return packetReceived; 60 }
处理返回结果,主要处理connect返回结果和其他请求返回结果。
connect命令主要返回sessionID, sessonpwd,timeout,(ClientCnxn.SendThread.readConnectResult):
1 //读取connect命令的结果 2 void readConnectResult() throws IOException { 3 //反序列化connect命令结果 4 ByteBufferInputStream bbis = new ByteBufferInputStream( 5 incomingBuffer); 6 BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); 7 ConnectResponse conRsp = new ConnectResponse(); 8 conRsp.deserialize(bbia, "connect"); 9 //获取timeout,session等信息 10 readTimeout = negotiatedSessionTimeout * 2 / 3; 11 connectTimeout = negotiatedSessionTimeout / serverAddrs.size(); 12 sessionId = conRsp.getSessionId(); 13 sessionPasswd = conRsp.getPasswd(); 14 zooKeeper.state = States.CONNECTED; 15 //向消息队列放入连接成功消息 16 eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, 17 Watcher.Event.KeeperState.SyncConnected, null)); 18 }
其他返回结果,xid=-2为ping命令的返回结果;xid=-4为auth命令;xid=-1为服务器发送的notification;其他命令返回结果。
1 void readResponse() throws IOException { 2 //对返回数据进行反序列化 3 ByteBufferInputStream bbis = new ByteBufferInputStream( 4 incomingBuffer); 5 BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); 6 ReplyHeader replyHdr = new ReplyHeader(); 7 replyHdr.deserialize(bbia, "header"); 8 //根据返回头信息,封装想要的事件,放入事件队列中,交给eventthread处理 9 //向消息队列放入验证失败消息 10 if (replyHdr.getXid() == -4) { 11 // -4 is the xid for AuthPacket 12 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { 13 zooKeeper.state = States.AUTH_FAILED; 14 eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 15 Watcher.Event.KeeperState.AuthFailed, null) ); 16 } 17 return; 18 } 19 // 20 if (replyHdr.getXid() == -1) { 21 WatcherEvent event = new WatcherEvent(); 22 event.deserialize(bbia, "response"); 23 if (chrootPath != null) { 24 String serverPath = event.getPath(); 25 if(serverPath.compareTo(chrootPath)==0) 26 event.setPath("/"); 27 else 28 event.setPath(serverPath.substring(chrootPath.length())); 29 } 30 WatchedEvent we = new WatchedEvent(event); 31 eventThread.queueEvent( we ); 32 return; 33 } 34 //反序列化返回结果 35 Packet packet = null; 36 synchronized (pendingQueue) { 37 packet = pendingQueue.remove(); 38 } 39 try { 40 packet.replyHeader.setXid(replyHdr.getXid()); 41 packet.replyHeader.setErr(replyHdr.getErr()); 42 packet.replyHeader.setZxid(replyHdr.getZxid()); 43 if (replyHdr.getZxid() > 0) { 44 lastZxid = replyHdr.getZxid(); 45 } 46 if (packet.response != null && replyHdr.getErr() == 0) { 47 packet.response.deserialize(bbia, "response"); 48 } 49 } finally { 50 finishPacket(packet); 51 } 52 }
事件线程主要是处理回调函数(ClientCnxn.EventThread.run):
1 public void run() { 2 try { 3 isRunning = true; 4 while (true) { 5 //从队列中获取事件 6 Object event = waitingEvents.take(); 7 if (event == eventOfDeath) { 8 wasKilled = true; 9 } else { 10 //处理事件 11 processEvent(event); 12 } 13 if (wasKilled) 14 synchronized (waitingEvents) { 15 if (waitingEvents.isEmpty()) { 16 isRunning = false; 17 break; 18 } 19 } 20 } 21 } catch (InterruptedException e) { 22 LOG.error("Event thread exiting due to interruption", e); 23 } 24 LOG.info("EventThread shut down"); 25 }
处理回调函数和watcher
1 private void processEvent(Object event) { 2 try { 3 //处理watcher 4 if (event instanceof WatcherSetEventPair) { 5 WatcherSetEventPair pair = (WatcherSetEventPair) event; 6 for (Watcher watcher : pair.watchers) { 7 try { 8 watcher.process(pair.event); 9 } catch (Throwable t) { 10 LOG.error("Error while calling watcher ", t); 11 } 12 } 13 } else { 14 // 15 Packet p = (Packet) event; 16 int rc = 0; 17 String clientPath = p.clientPath; 18 if (p.replyHeader.getErr() != 0) { 19 rc = p.replyHeader.getErr(); 20 } 21 if (p.response instanceof ExistsResponse 22 || p.response instanceof SetDataResponse 23 || p.response instanceof SetACLResponse) { 24 StatCallback cb = (StatCallback) p.cb; 25 if (rc == 0) { 26 if (p.response instanceof ExistsResponse) { 27 cb.processResult(rc, clientPath, p.ctx, 28 ((ExistsResponse) p.response) 29 .getStat()); 30 } else if (p.response instanceof SetDataResponse) { 31 cb.processResult(rc, clientPath, p.ctx, 32 ((SetDataResponse) p.response) 33 .getStat()); 34 } else if (p.response instanceof SetACLResponse) { 35 cb.processResult(rc, clientPath, p.ctx, 36 ((SetACLResponse) p.response) 37 .getStat()); 38 } 39 } else { 40 cb.processResult(rc, clientPath, p.ctx, null); 41 } 42 } else if (p.response instanceof GetDataResponse) { 43 DataCallback cb = (DataCallback) p.cb; 44 GetDataResponse rsp = (GetDataResponse) p.response; 45 if (rc == 0) { 46 cb.processResult(rc, clientPath, p.ctx, rsp 47 .getData(), rsp.getStat()); 48 } else { 49 cb.processResult(rc, clientPath, p.ctx, null, 50 null); 51 } 52 } else if (p.response instanceof GetACLResponse) { 53 ACLCallback cb = (ACLCallback) p.cb; 54 GetACLResponse rsp = (GetACLResponse) p.response; 55 if (rc == 0) { 56 cb.processResult(rc, clientPath, p.ctx, rsp 57 .getAcl(), rsp.getStat()); 58 } else { 59 cb.processResult(rc, clientPath, p.ctx, null, 60 null); 61 } 62 } else if (p.response instanceof GetChildrenResponse) { 63 ChildrenCallback cb = (ChildrenCallback) p.cb; 64 GetChildrenResponse rsp = (GetChildrenResponse) p.response; 65 if (rc == 0) { 66 cb.processResult(rc, clientPath, p.ctx, rsp 67 .getChildren()); 68 } else { 69 cb.processResult(rc, clientPath, p.ctx, null); 70 } 71 } else if (p.response instanceof GetChildren2Response) { 72 Children2Callback cb = (Children2Callback) p.cb; 73 GetChildren2Response rsp = (GetChildren2Response) p.response; 74 if (rc == 0) { 75 cb.processResult(rc, clientPath, p.ctx, rsp 76 .getChildren(), rsp.getStat()); 77 } else { 78 cb.processResult(rc, clientPath, p.ctx, null, null); 79 } 80 } else if (p.response instanceof CreateResponse) { 81 StringCallback cb = (StringCallback) p.cb; 82 CreateResponse rsp = (CreateResponse) p.response; 83 if (rc == 0) { 84 cb.processResult(rc, clientPath, p.ctx, 85 (chrootPath == null 86 ? rsp.getPath() 87 : rsp.getPath() 88 .substring(chrootPath.length()))); 89 } else { 90 cb.processResult(rc, clientPath, p.ctx, null); 91 } 92 } else if (p.cb instanceof VoidCallback) { 93 VoidCallback cb = (VoidCallback) p.cb; 94 cb.processResult(rc, clientPath, p.ctx); 95 } 96 } 97 } catch (Throwable t) { 98 LOG.error("Caught unexpected throwable", t); 99 } 100 }
以上是关于zookeeper客户端的主要内容,如果未能解决你的问题,请参考以下文章
Zookeeper3.5.7版本——客户端 API 操作(代码示例)