zookeeper源码之客户端
Posted zwh1988
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper源码之客户端相关的知识,希望对你有一定的参考价值。
ZooKeeper客户端可以对指定节点设置指定Watcher,当服务器指定节点发生变化是,客户端会收到服务器的通知,然后客户端可以执行相应Watcher的代码。
默认ZooKeeper内置了一个watcher,用于打印收到的服务器的通知。
源码ZooKeeperMain.Watcher:
1 protected void connectToZK(String newHost) throws InterruptedException, IOException { 2 if (zk != null && zk.getState().isAlive()) { 3 zk.close(); 4 } 5 host = newHost; 6 zk = new ZooKeeper(host, 7 Integer.parseInt(cl.getOption("timeout")), 8 new MyWatcher()); 9 } 10 11 private class MyWatcher implements Watcher { 12 public void process(WatchedEvent event) { 13 if (getPrintWatches()) { 14 ZooKeeperMain.printMessage("WATCHER::"); 15 ZooKeeperMain.printMessage(event.toString()); 16 } 17 } 18 }
在获取子节点、获取数据、获取状态可以设置Watcher,该Watcher会被存储到Packet包中,当Packet包收到响应时注册该Watcher,当收到服务器notification时,执行Watcher代码。
源码ZooKeeper.getChildren:
1 public List<String> getChildren(final String path, Watcher watcher) 2 throws KeeperException, InterruptedException 3 { 4 //将watcher封装成childwatcher 5 WatchRegistration wcb = null; 6 if (watcher != null) { 7 wcb = new ChildWatchRegistration(watcher, clientPath); 8 } 9 10 final String serverPath = prependChroot(path); 11 RequestHeader h = new RequestHeader(); 12 h.setType(ZooDefs.OpCode.getChildren); 13 GetChildrenRequest request = new GetChildrenRequest(); 14 request.setPath(serverPath); 15 request.setWatch(watcher != null); 16 GetChildrenResponse response = new GetChildrenResponse(); 17 ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); 18 if (r.getErr() != 0) { 19 throw KeeperException.create(KeeperException.Code.get(r.getErr()), 20 clientPath); 21 } 22 return response.getChildren(); 23 }
源码ClientCnxn.submitRequest:
1 public ReplyHeader submitRequest(RequestHeader h, Record request, 2 Record response, WatchRegistration watchRegistration) 3 throws InterruptedException { 4 ReplyHeader r = new ReplyHeader(); 5 //watchRegistration被封装到Packet中 6 Packet packet = queuePacket(h, r, request, response, null, null, null, 7 null, watchRegistration); 8 synchronized (packet) { 9 while (!packet.finished) { 10 packet.wait(); 11 } 12 } 13 return r; 14 }
源码ClientCnxn.queuePacket:
1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, 2 Record response, AsyncCallback cb, String clientPath, 3 String serverPath, Object ctx, WatchRegistration watchRegistration) 4 { 5 Packet packet = null; 6 synchronized (outgoingQueue) { 7 if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) { 8 h.setXid(getXid()); 9 } 10 //watchRegistration被封装到Packet中 11 packet = new Packet(h, r, request, response, null, 12 watchRegistration); 13 packet.cb = cb; 14 packet.ctx = ctx; 15 packet.clientPath = clientPath; 16 packet.serverPath = serverPath; 17 if (!zooKeeper.state.isAlive() || closing) { 18 conLossPacket(packet); 19 } else { 20 // If the client is asking to close the session then 21 // mark as closing 22 if (h.getType() == OpCode.closeSession) { 23 closing = true; 24 } 25 outgoingQueue.add(packet); 26 } 27 } 28 29 sendThread.wakeup(); 30 return packet; 31 }
当Packet包收到响应时注册该Watcher,源码ClientCnxn.finishPacket:
1 private void finishPacket(Packet p) { 2 if (p.watchRegistration != null) { 3 p.watchRegistration.register(p.replyHeader.getErr()); 4 } 5 6 if (p.cb == null) { 7 synchronized (p) { 8 p.finished = true; 9 p.notifyAll(); 10 } 11 } else { 12 p.finished = true; 13 eventThread.queuePacket(p); 14 } 15 }
当收到服务器notification时,执行Watcher代码,源码ClientCnxn.EventThread:
1 private void processEvent(Object event) { 2 if (event instanceof WatcherSetEventPair) { 3 //执行watcher 4 WatcherSetEventPair pair = (WatcherSetEventPair) event; 5 for (Watcher watcher : pair.watchers) { 6 try { 7 watcher.process(pair.event); 8 } catch (Throwable t) { 9 LOG.error("Error while calling watcher ", t); 10 } 11 } 12 } 13 }
在删除节点、创建节点、获取子节点、设置数据、获取数据、获取权限、设置权限等异步操作时,可以设置CallBack回调函数,该回调对象会被存储到Packet包中,当Packet包收到响应时执行CallBack代码。
源码ZooKeeper.getChildren:
1 public void getChildren(final String path, Watcher watcher, 2 ChildrenCallback cb, Object ctx) 3 { 4 final String clientPath = path; 5 WatchRegistration wcb = null; 6 if (watcher != null) { 7 wcb = new ChildWatchRegistration(watcher, clientPath); 8 } 9 10 final String serverPath = prependChroot(clientPath); 11 12 RequestHeader h = new RequestHeader(); 13 h.setType(ZooDefs.OpCode.getChildren); 14 GetChildrenRequest request = new GetChildrenRequest(); 15 request.setPath(serverPath); 16 request.setWatch(watcher != null); 17 GetChildrenResponse response = new GetChildrenResponse(); 18 cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, 19 clientPath, serverPath, ctx, wcb); 20 }
源码ClientCnxn.queuePacket:
1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, 2 Record response, AsyncCallback cb, String clientPath, 3 String serverPath, Object ctx, WatchRegistration watchRegistration) 4 { 5 Packet packet = null; 6 synchronized (outgoingQueue) { 7 if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) { 8 h.setXid(getXid()); 9 } 10 packet = new Packet(h, r, request, response, null, 11 watchRegistration); 12 packet.cb = cb; 13 packet.ctx = ctx; 14 packet.clientPath = clientPath; 15 packet.serverPath = serverPath; 16 if (!zooKeeper.state.isAlive() || closing) { 17 conLossPacket(packet); 18 } else { 19 // If the client is asking to close the session then 20 // mark as closing 21 if (h.getType() == OpCode.closeSession) { 22 closing = true; 23 } 24 outgoingQueue.add(packet); 25 } 26 } 27 28 sendThread.wakeup(); 29 return packet; 30 }
源码ClientCnxn.EventThread:
1 private void processEvent(Object event) { 2 try { 3 Packet p = (Packet) event; 4 int rc = 0; 5 String clientPath = p.clientPath; 6 if (p.replyHeader.getErr() != 0) { 7 rc = p.replyHeader.getErr(); 8 } 9 if (p.response instanceof ExistsResponse 10 || p.response instanceof SetDataResponse 11 || p.response instanceof SetACLResponse) { 12 StatCallback cb = (StatCallback) p.cb; 13 if (rc == 0) { 14 if (p.response instanceof ExistsResponse) { 15 cb.processResult(rc, clientPath, p.ctx, 16 ((ExistsResponse) p.response) 17 .getStat()); 18 } else if (p.response instanceof SetDataResponse) { 19 cb.processResult(rc, clientPath, p.ctx, 20 ((SetDataResponse) p.response) 21 .getStat()); 22 } else if (p.response instanceof SetACLResponse) { 23 cb.processResult(rc, clientPath, p.ctx, 24 ((SetACLResponse) p.response) 25 .getStat()); 26 } 27 } else { 28 cb.processResult(rc, clientPath, p.ctx, null); 29 } 30 } else if (p.response instanceof GetDataResponse) { 31 DataCallback cb = (DataCallback) p.cb; 32 GetDataResponse rsp = (GetDataResponse) p.response; 33 if (rc == 0) { 34 cb.processResult(rc, clientPath, p.ctx, rsp 35 .getData(), rsp.getStat()); 36 } else { 37 cb.processResult(rc, clientPath, p.ctx, null, 38 null); 39 } 40 } else if (p.response instanceof GetACLResponse) { 41 ACLCallback cb = (ACLCallback) p.cb; 42 GetACLResponse rsp = (GetACLResponse) p.response; 43 if (rc == 0) { 44 cb.processResult(rc, clientPath, p.ctx, rsp 45 .getAcl(), rsp.getStat()); 46 } else { 47 cb.processResult(rc, clientPath, p.ctx, null, 48 null); 49 } 50 } else if (p.response instanceof GetChildrenResponse) { 51 ChildrenCallback cb = (ChildrenCallback) p.cb; 52 GetChildrenResponse rsp = (GetChildrenResponse) p.response; 53 if (rc == 0) { 54 cb.processResult(rc, clientPath, p.ctx, rsp 55 .getChildren()); 56 } else { 57 cb.processResult(rc, clientPath, p.ctx, null); 58 } 59 } else if (p.response instanceof GetChildren2Response) { 60 Children2Callback cb = (Children2Callback) p.cb; 61 GetChildren2Response rsp = (GetChildren2Response) p.response; 62 if (rc == 0) { 63 cb.processResult(rc, clientPath, p.ctx, rsp 64 .getChildren(), rsp.getStat()); 65 } else { 66 cb.processResult(rc, clientPath, p.ctx, null, null); 67 } 68 } else if (p.response instanceof CreateResponse) { 69 StringCallback cb = (StringCallback) p.cb; 70 CreateResponse rsp = (CreateResponse) p.response; 71 if (rc == 0) { 72 cb.processResult(rc, clientPath, p.ctx, 73 (chrootPath == null 74 ? rsp.getPath() 75 : rsp.getPath() 76 .substring(chrootPath.length()))); 77 } else { 78 cb.processResult(rc, clientPath, p.ctx, null); 79 } 80 } else if (p.cb instanceof VoidCallback) { 81 VoidCallback cb = (VoidCallback) p.cb; 82 cb.processResult(rc, clientPath, p.ctx); 83 } 84 } catch (Throwable t) { 85 LOG.error("Caught unexpected throwable", t); 86 } 87 } 88 }
以上是关于zookeeper源码之客户端的主要内容,如果未能解决你的问题,请参考以下文章