配置存储不仅维护了一个树结构,还对各个节点添加了变更监听。
类图
DataTree内部维护两个通知管理器,分别监听节点数据变更和子节点变更。
public class DataTree { private final WatchManager dataWatches = new WatchManager(); private final WatchManager childWatches = new WatchManager(); public void removeCnxn(Watcher watcher) { dataWatches.removeWatcher(watcher); childWatches.removeWatcher(watcher); }public String createNode(String path, byte data[], List<ACL> acl, long ephemeralOwner, long zxid, long time) throws KeeperException.NoNodeException, KeeperException.NodeExistsException { ... String parentName = path.substring(0, lastSlash); ... dataWatches.triggerWatch(path, Event.EventType.NodeCreated); childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); return path; } public void deleteNode(String path, long zxid) throws KeeperException.NoNodeException { int lastSlash = path.lastIndexOf(‘/‘); String parentName = path.substring(0, lastSlash); String childName = path.substring(lastSlash + 1); DataNode node = nodes.get(path); ... Set<Watcher> processed = dataWatches.triggerWatch(path, EventType.NodeDeleted); childWatches.triggerWatch(path, EventType.NodeDeleted, processed); childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, EventType.NodeChildrenChanged); } public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { ... DataNode n = nodes.get(path); ... dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; } public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); ... dataWatches.addWatch(path, watcher); ... return n.data; } } }
WatchManager主要维护了路径和监听器的关联关系,当响应的路径发生变化时,调用监听器的监听方法。
public class WatchManager { ... private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>(); private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>(); public synchronized int size(){ return watchTable.size(); } public synchronized void addWatch(String path, Watcher watcher) { HashSet<Watcher> list = watchTable.get(path); if (list == null) { list = new HashSet<Watcher>(4); watchTable.put(path, list); } list.add(watcher); HashSet<String> paths = watch2Paths.get(watcher); if (paths == null) { paths = new HashSet<String>(); watch2Paths.put(watcher, paths); } paths.add(path); } public synchronized void removeWatcher(Watcher watcher) { HashSet<String> paths = watch2Paths.remove(watcher); if (paths == null) { return; } for (String p : paths) { HashSet<Watcher> list = watchTable.get(p); if (list != null) { list.remove(watcher); if (list.size() == 0) { watchTable.remove(p); } } } } ... public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); ...for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; } ... }
Watcher定义了监听接口方法。
public interface Watcher { abstract public void process(WatchedEvent event); }