原理系列之——zookeeper的watch监控机制
Posted 烁华
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了原理系列之——zookeeper的watch监控机制相关的知识,希望对你有一定的参考价值。
在进入今天的正题之前,先来简单介绍下Zookeeper:
Zookeeper是一个分布式应用程序协调服务,保证数据的一致性,其提供的功能包括:配置维护、域名维护、分布式同步、组服务等。
watch监控机制是zookeeper的关键技术之一,本文将通过zk的部分源码来简单了解下watch机制的实现原理。
watch监控机制的实现原理
当今时代,发布订阅场景到处可见,像微信中的公众号消息订阅,或者网购场景下库存消息的订阅通知等等,这些都是属于发布订阅的场景。
watch监控机制是zk的一个关键技术,zk通过它来实现发布订阅的功能,通过watch我们可以联想到设计模式中的观察者模式,二者确实有点类似,你可以将其看成是分布式场景下的观察者模式。
客户端watch的注册和回调
客户端watch注册实现过程:
发送一个带有watch事件的请求——>DataWatchRegistration保存watch事件——>将请求封装成Packet并放入一个队列等待发送——>调用SendThread中的readResponse——>ZKWatchManager将该watch事件进行存储
//Zookeeper.java
public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException
PathUtils.validatePath(path);
ZooKeeper.WatchRegistration wcb = null;
if (watcher != null)
//注册watch
wcb = new ZooKeeper.DataWatchRegistration(watcher, path);
String serverPath = this.prependChroot(path);
RequestHeader h = new RequestHeader();
h.setType(4);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0)
throw KeeperException.create(Code.get(r.getErr()), path);
else
if (stat != null)
DataTree.copyStat(response.getStat(), stat);
return response.getData();
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) throws InterruptedException
ReplyHeader r = new ReplyHeader();
ClientCnxn.Packet packet = this.queuePacket(h, r, request, response, (AsyncCallback)null, (String)null, (String)null, (Object)null, watchRegistration, watchDeregistration);
synchronized(packet)
while(!packet.finished)
packet.wait();
return r;
public ClientCnxn.Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration)
ClientCnxn.Packet packet = null;
packet = new ClientCnxn.Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
synchronized(this.state)
if (this.state.isAlive() && !this.closing)
if (h.getType() == -11)
this.closing = true;
this.outgoingQueue.add(packet);
else
this.conLossPacket(packet);
this.sendThread.getClientCnxnSocket().packetAdded();
return packet;
class SendThread extends ZooKeeperThread
.....
void readResponse(ByteBuffer incomingBuffer) throws IOException
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2)
if (ClientCnxn.LOG.isDebugEnabled())
ClientCnxn.LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(ClientCnxn.this.sessionId) + " after " + (System.nanoTime() - this.lastPingSentNs) / 1000000L + "ms");
else if (replyHdr.getXid() == -4)
if (replyHdr.getErr() == Code.AUTHFAILED.intValue())
ClientCnxn.this.state = States.AUTH_FAILED;
ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.AuthFailed, (String)null));
if (ClientCnxn.LOG.isDebugEnabled())
ClientCnxn.LOG.debug("Got auth sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
else if (replyHdr.getXid() == -1)
if (ClientCnxn.LOG.isDebugEnabled())
ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
if (ClientCnxn.this.chrootPath != null)
String serverPath = event.getPath();
if (serverPath.compareTo(ClientCnxn.this.chrootPath) == 0)
event.setPath("/");
else if (serverPath.length() > ClientCnxn.this.chrootPath.length())
event.setPath(serverPath.substring(ClientCnxn.this.chrootPath.length()));
else
ClientCnxn.LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + ClientCnxn.this.chrootPath);
WatchedEvent we = new WatchedEvent(event);
if (ClientCnxn.LOG.isDebugEnabled())
ClientCnxn.LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
ClientCnxn.this.eventThread.queueEvent(we);
else if (this.tunnelAuthInProgress())
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
ClientCnxn.this.zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
else
ClientCnxn.Packet packet;
synchronized(ClientCnxn.this.pendingQueue)
if (ClientCnxn.this.pendingQueue.size() == 0)
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
packet = (ClientCnxn.Packet)ClientCnxn.this.pendingQueue.remove();
try
if (packet.requestHeader.getXid() != replyHdr.getXid())
packet.replyHeader.setErr(Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet);
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0L)
ClientCnxn.this.lastZxid = replyHdr.getZxid();
if (packet.response != null && replyHdr.getErr() == 0)
packet.response.deserialize(bbia, "response");
if (ClientCnxn.LOG.isDebugEnabled())
ClientCnxn.LOG.debug("Reading reply sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId) + ", packet:: " + packet);
finally
ClientCnxn.this.finishPacket(packet);
private void finishPacket(ClientCnxn.Packet p)
int err = p.replyHeader.getErr();
if (p.watchRegistration != null)
p.watchRegistration.register(err);
if (p.watchDeregistration != null)
Map materializedWatchers = null;
try
materializedWatchers = p.watchDeregistration.unregister(err);
Iterator i$ = materializedWatchers.entrySet().iterator();
while(i$.hasNext())
Entry<EventType, Set<Watcher>> entry = (Entry)i$.next();
Set<Watcher> watchers = (Set)entry.getValue();
if (watchers.size() > 0)
this.queueEvent(p.watchDeregistration.getClientPath(), err, watchers, (EventType)entry.getKey());
p.replyHeader.setErr(Code.OK.intValue());
catch (NoWatcherException var9)
LOG.error("Failed to find watcher!", var9);
p.replyHeader.setErr(var9.code().intValue());
catch (KeeperException var10)
LOG.error("Exception when removing watcher", var10);
p.replyHeader.setErr(var10.code().intValue());
if (p.cb == null)
synchronized(p)
p.finished = true;
p.notifyAll();
else
p.finished = true;
this.eventThread.queuePacket(p);
客户端回调处理过程:
在SendThread.readResponse()中的xid=-1来进行处理——>调用 eventThread.queueEvent()进行处理
class SendThread extends ZooKeeperThread
.....
void readResponse(ByteBuffer incomingBuffer) throws IOException
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2)
....
else if (replyHdr.getXid() == -1)
if (ClientCnxn.LOG.isDebugEnabled())
ClientCnxn.LOG.debug("Got notification sessionid:0x" + Long.toHexString(ClientCnxn.this.sessionId));
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
if (ClientCnxn.thisZooKeeper Watch Java API浅析getData
ZooKeeper Watch Java API浅析getChildren
zookeeper系列之:独立模式部署zookeeper服务
SpringCloud Alibaba系列一文全面解析Zookeeper安装常用命令JavaAPI操作Watch事件监听分布式锁集群搭建核心理论