zookeeper源码之配置监听

Posted zhangwanhua

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper源码之配置监听相关的知识,希望对你有一定的参考价值。

   配置存储不仅维护了一个树结构,还对各个节点添加了变更监听。

类图

技术分享图片

  DataTree内部维护两个通知管理器,分别监听节点数据变更和子节点变更。

public class DataTree {
   private final WatchManager dataWatches = new WatchManager();
   private final WatchManager childWatches = new WatchManager();
public void removeCnxn(Watcher watcher) {
        dataWatches.removeWatcher(watcher);
        childWatches.removeWatcher(watcher);
    }public String createNode(String path, byte data[], List<ACL> acl,
            long ephemeralOwner, long zxid, long time)
            throws KeeperException.NoNodeException,
            KeeperException.NodeExistsException {
       ...
        String parentName = path.substring(0, lastSlash);
       ...
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                Event.EventType.NodeChildrenChanged);
        return path;
    } 
public void deleteNode(String path, long zxid)
            throws KeeperException.NoNodeException {
        int lastSlash = path.lastIndexOf(‘/‘);
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        DataNode node = nodes.get(path);
        ...
        Set<Watcher> processed = dataWatches.triggerWatch(path,
                EventType.NodeDeleted);
        childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                EventType.NodeChildrenChanged);
    }
public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
        ...
        DataNode n = nodes.get(path);
        ...
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }
public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path);
        ...
        dataWatches.addWatch(path, watcher);
        ...
            return n.data;
        }
    }
}

   WatchManager主要维护了路径和监听器的关联关系,当响应的路径发生变化时,调用监听器的监听方法。

public class WatchManager {
 ...
    private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>();
    private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>();
    public synchronized int size(){
        return watchTable.size();
    }
    public synchronized void addWatch(String path, Watcher watcher) {
        HashSet<Watcher> list = watchTable.get(path);
        if (list == null) {
            list = new HashSet<Watcher>(4);
            watchTable.put(path, list);
        }
        list.add(watcher);
        HashSet<String> paths = watch2Paths.get(watcher);
        if (paths == null) {
            paths = new HashSet<String>();
            watch2Paths.put(watcher, paths);
        }
        paths.add(path);
    }
    public synchronized void removeWatcher(Watcher watcher) {
        HashSet<String> paths = watch2Paths.remove(watcher);
        if (paths == null) {
            return;
        }
        for (String p : paths) {
            HashSet<Watcher> list = watchTable.get(p);
            if (list != null) {
                list.remove(watcher);
                if (list.size() == 0) {
                    watchTable.remove(p);
                }
            }
        }
    }
...
    public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            ...for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e);
        }
        return watchers;
    }
...
}

   Watcher定义了监听接口方法。

public interface Watcher {
    abstract public void process(WatchedEvent event);
}

 

以上是关于zookeeper源码之配置监听的主要内容,如果未能解决你的问题,请参考以下文章

zookeeper源码之服务端启动模块

Dubbo之ZookeeperRegistry源码分析

zookeeper源码分析-事件监听Watcher

Zookeeper之Watcher监听事件丢失分析

zookeeper源码之配置存储

ZooKeeper序列之ZooKeeper配置