Apache ZooKeeper Watcher机制源码解释

Posted 麦克叔叔每晚10点说

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache ZooKeeper Watcher机制源码解释相关的知识,希望对你有一定的参考价值。

16

清单16 processRequest代码

public void processRequest(Request request) {

if (request.hdr != null) {

              TxnHeader hdr = request.hdr;

              Record txn = request.txn;

              rc = zks.processTxn(hdr, txn);

           }

17

清单17 ZooKeeperServer代码

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {

ProcessTxnResult rc;

       int opCode = hdr.getType();

       long sessionId = hdr.getClientId();

       rc = getZKDatabase().processTxn(hdr, txn);

       if (opCode == OpCode.createSession) {

           if (txn instanceof CreateSessionTxn) {

               CreateSessionTxn cst = (CreateSessionTxn) txn;

               sessionTracker.addSession(sessionId, cst

                       .getTimeOut());

           } else {

               LOG.warn("*****>>>>> Got "

                       + txn.getClass() + " "

                       + txn.toString());

           }

       } else if (opCode == OpCode.closeSession) {

           sessionTracker.removeSession(sessionId);

       }

       return rc;

}

18

清单18 ZKDatabase代码

public ProcessTxnResult processTxn(TxnHeader header, Record txn)

   {

switch (header.getType()) {

case OpCode.setData:

                   SetDataTxn setDataTxn = (SetDataTxn) txn;

                   rc.path = setDataTxn.getPath();

                   rc.stat = setData(setDataTxn.getPath(), setDataTxn

                           .getData(), setDataTxn.getVersion(), header

                           .getZxid(), header.getTime());

                   break;

对于注册Watcher请求,FinalRequestProcessor的ProcessRequest方法会判断当前请求是否需要注册Watcher,如果为true,就会将当前的ServerCnxn对象和数据节点路径传入getData方法中去。ServerCnxn是一个ZooKeeper客户端和服务器之间的连接接口,代表了一个客户端和服务器的连接,我们后面讲到的process回调方法,实际上也是从这里回调的,所以可以把ServerCnxn看作是一个Watcher对象。数据节点的节点路径和ServerCnxn最终会被存储在WatchManager的watchTable和watch2Paths中。

19

清单19 判断是否注册Watcher代码

case OpCode.getData: {

               lastOp = "GETD";

               GetDataRequest getDataRequest = new GetDataRequest();

               ByteBufferInputStream.byteBuffer2Record(request.request,

                       getDataRequest);

               DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());

               if (n == null) {

                   throw new KeeperException.NoNodeException();

               }

               Long aclL;

               synchronized(n) {

                   aclL = n.acl;

               }

               PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),

                       ZooDefs.Perms.READ,

                       request.authInfo);

               Stat stat = new Stat();

               byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,

                       getDataRequest.getWatch() ? cnxn : null);

               rsp = new GetDataResponse(b, stat);

               break;

           }

如前所述,WatchManager负责Watcher事件的触发,它是一个统称,在服务端DataTree会托管两个WatchManager,分别是dataWatches和childWatches,分别对应数据变更Watcher和子节点变更Watcher。

20

清单20 WatchManger两个队列

private final HashMap<string, hashset

       new HashMap<string, hashset

   private final HashMap<watcher, hashset

       new HashMap<watcher, hashset

回到主题,如清单21到23所示,当发生Create、Delete、NodeChange(数据变更)这样的事件后,DataTree会调用相应方法去触发WatchManager的triggerWatch方法,该方法返回ZNODE的信息,自此进入到回调本地process的序列。

21

清单21 processTxn代码

public ProcessTxnResult processTxn(TxnHeader header, Record txn)

   {

       ProcessTxnResult rc = new ProcessTxnResult();

       try {

switch (header.getType()) {

case OpCode.setData:

                   SetDataTxn setDataTxn = (SetDataTxn) txn;

                   rc.path = setDataTxn.getPath();

                   rc.stat = setData(setDataTxn.getPath(), setDataTxn

                           .getData(), setDataTxn.getVersion(), header

                           .getZxid(), header.getTime());

                   break;

22

清单22 setData代码

public Stat setData(String path, byte data[], int version, long zxid,

           long time) throws KeeperException.NoNodeException {

       Stat s = new Stat();

       DataNodeV1 n = nodes.get(path);

       if (n == null) {

           throw new KeeperException.NoNodeException();

       }

       synchronized (n) {

           n.data = data;

           n.stat.setMtime(time);

           n.stat.setMzxid(zxid);

           n.stat.setVersion(version);

           n.copyStat(s);

       }

       dataWatches.triggerWatch(path, EventType.NodeDataChanged);

       return s;

   }

23

清单23 triggerWatch代码

public Set triggerWatch(String path, EventType type, Set supress) {

WatchedEvent e = new WatchedEvent(type,

               KeeperState.SyncConnected, path);

//将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个WatchedEvent对象

       HashSet watchers;

       synchronized (this) {

//根据数据节点的节点路径从watchTable里面取出对应的Watcher。如果没有找到Watcher对象,说明没有任何客户端在该数据节点上注册过Watcher,直接退出。如果找打了Watcher就将其提取出来,同时会直接从watchTable和watch2Paths里删除Watcher,即Watcher是一次性的,触发一次就失效了。

           watchers = watchTable.remove(path);

for (Watcher w : watchers) {

               HashSet paths = watch2Paths.get(w);

           }

       }

       for (Watcher w : watchers) {

           if (supress != null && supress.contains(w)) {

               continue;

           }

//对于需要注册Watcher的请求,ZooKeeper会把请求对应的恶ServerCnxn作为一个Watcher存储,所以这里调用的process方法实质上是ServerCnxn的对应方法

           w.process(e);

       }

       return watchers;

}

从上面的代码我们可以总结出,如果想要处理一个Watcher,需要执行的步骤如下所示:


  1. 将事件类型(EventType)、通知状态(WatchedEvent)、节点路径封装成一个WatchedEvent对象。

  2. 根据数据节点的节点路径从watchTable里面取出对应的Watcher。如果没有找到Watcher对象,说明没有任何客户端在该数据节点上注册过Watcher,直接退出。如果找打了Watcher就将其提取出来,同时会直接从watchTable和watch2Paths里删除Watcher,即Watcher是一次性的,触发一次就失效了。

  3. 对于需要注册Watcher的请求,ZooKeeper会把请求对应的ServerCnxn作为一个Watcher存储,所以这里调用的process方法实质上是ServerCnxn的对应方法,如清单所示,在请求头标记“-1”表示当前是一个通知,将WatchedEvent包装成WatcherEvent用于网络传输序列化,向客户端发送通知,真正的回调方法在客户端,就是我们清单10里面定义的process()方法。

24

清单24 ServerCnxn类代码

synchronized public void process(WatchedEvent event) {

       ReplyHeader h = new ReplyHeader(-1, -1L, 0);

       if (LOG.isTraceEnabled()) {

           ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,

                                    "Deliver event " + event + " to 0x"

                                    + Long.toHexString(this.sessionId)

                                    + " through " + this);

       }

       // Convert WatchedEvent to a type that can be sent over the wire

       WatcherEvent e = event.getWrapper();

       sendResponse(h, e, "notification");

   }

如清单25所示,SendThread接收到服务端的通知事件后,会通过调用EventThread类的queueEvent方法将事件传给EventThread线程,queueEvent方法根据该通知事件,从ZKWatchManager中取出所有相关的Watcher,如清单26所示。

26

清单26 EventThread线程代码

class EventThread extends ZooKeeperThread {

public void queueEvent(WatchedEvent event) {

           if (event.getType() == EventType.None

                   && sessionState == event.getState()) {

               return;

           }

           sessionState = event.getState();

           // materialize the watchers based on the event

           WatcherSetEventPair pair = new WatcherSetEventPair(

                   watcher.materialize(event.getState(), event.getType(),

                           event.getPath()),

                           event);

           // queue the pair (watch set & event) for later processing

           waitingEvents.add(pair);

       }

客户端在识别出事件类型EventType之后,会从相应的Watcher存储中删除对应的Watcher,获取到相关的Watcher之后,会将其放入waitingEvents队列,该队列从字面上就能理解是一个待处理队列,线程的run方法会不断对该该队列进行处理,这就是一种异步处理思维的实现。

27

清单27 ZKWatchManager取出Watcher

public Set materialize(Watcher.Event.KeeperState state,

                                       Watcher.Event.EventType type,

                                       String clientPath)

       {

           Set result = new HashSet ();

case NodeCreated:

               synchronized (dataWatches) {

                   addTo(dataWatches.remove(clientPath), result);

               }

               synchronized (existWatches) {

                   addTo(existWatches.remove(clientPath), result);

               }

               break;

28

清单28 EventThread线程的run方法

public void run() {

          try {

             isRunning = true;

             while (true) {

                Object event = waitingEvents.take();

                if (event == eventOfDeath) {

                   wasKilled = true;

                } else {

                   processEvent(event);

                }

                if (wasKilled)

                   synchronized (waitingEvents) {

                      if (waitingEvents.isEmpty()) {

                         isRunning = false;

                         break;

                      }

                   }

             }

ZooKeeper Watcher特性总结

1.注册只能确保一次消费

无论是服务端还是客户端,一旦一个Watcher被触发,ZooKeeper都会将其从相应的存储中移除。因此,开发人员在Watcher的使用上要记住的一点是需要反复注册。这样的设计有效地减轻了服务端的压力。如果注册一个Watcher之后一直有效,那么针对那些更新非常频繁的节点,服务端会不断地向客户端发送事件通知,这无论对于网络还是服务端性能的影响都非常大。


2.客户端串行执行

客户端Watcher回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要开发人员注意的一点是,千万不要因为一个Watcher的处理逻辑影响了整个客户端的Watcher回调。


3.轻量级设计

WatchedEvent是ZooKeeper整个Watcher通知机制的最小通知单元,这个数据结构中只包含三部分的内容:通知状态、事件类型和节点路径。也就是说,Watcher通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。例如针对NodeDataChanged事件,ZooKeeper的Watcher只会通知客户指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的新数据都无法从这个事件中直接获取到,而是需要客户端主动重新去获取数据,这也是ZooKeeper的Watcher机制的一个非常重要的特性。另外,客户端向服务端注册Watcher的时候,并不会把客户端真实的Watcher对象传递到服务端,仅仅只是在客户端请求中使用boolean类型属性进行了标记,同时服务端也仅仅只是保存了当前连接的ServerCnxn对象。这样轻量级的Watcher机制设计,在网络开销和服务端内存开销上都是非常廉价的。


发个小广告!!!走过路过,不要错过

Apache ZooKeeper Watcher机制源码解释(4)
Apache ZooKeeper Watcher机制源码解释(4)

这里有你想买的书!

http://product.dangdang.com/23949549.html#ddclick_reco_reco_relate

麦克叔叔每晚十点说


以上是关于Apache ZooKeeper Watcher机制源码解释的主要内容,如果未能解决你的问题,请参考以下文章

java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher$Event$KeeperState

zookeeper(10)源码分析-事件监听Watcher

zookeeper--基于watcher原理实现带注册中心的RPC框架

Zookeeper 在Java中的操作

Zookeeper连接eclipse

Java API操作ZooKeeper