Zookeeper源码阅读 Server端Watcher

Posted gongcomeon

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper源码阅读 Server端Watcher相关的知识,希望对你有一定的参考价值。

前言

前面一篇主要介绍了Watcher接口相关的接口和实体类,但是主要是zk客户端相关的代码,如前一篇开头所说,client需要把watcher注册到server端,这一篇分析下server端的watcher。

主要分析Watchmanager类。

Watchmanager

技术分享图片

这是WatchManager的类图介绍。来看看代码:

/**
 * This class manages watches. It allows watches to be associated with a string
 * and removes watchers and their watches in addition to managing triggers.
 */
//如注释所言,这个类主要负责管理watcher
public class WatchManager {
    private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);

    //路径->watcher的映射
    private final HashMap<String, HashSet<Watcher>> watchTable =
        new HashMap<String, HashSet<Watcher>>();

    //watcher->路径的映射
    private final HashMap<Watcher, HashSet<String>> watch2Paths =
        new HashMap<Watcher, HashSet<String>>();

size

public synchronized int size(){
    int result = 0;
    for(Set<Watcher> watches : watchTable.values()) {//遍历路径->watcher的映射
        result += watches.size();//把所有的watch个数加起来,但这里是不是会有重复???
    }
    return result;
}

addWatch

//为某个path注册watcher
public synchronized void addWatch(String path, Watcher watcher) {
    HashSet<Watcher> list = watchTable.get(path);//获得路径对应的watcher的set
    if (list == null) {//之前没有watcher
        // don't waste memory if there are few watches on a node
        // rehash when the 4th entry is added, doubling size thereafter
        // seems like a good compromise
        list = new HashSet<Watcher>(4);//这里有优化,只建立为4的set,可能是考虑到实际使用中同一个znode不会有过多的watcher,节省了memory
        watchTable.put(path, list);//更新watchtable
    }
    list.add(watcher);//添加watcher进入set

    HashSet<String> paths = watch2Paths.get(watcher);//在watcher->路径中查找对应的路径
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();//同理,同一个watcher可能被加到多个znode上
        watch2Paths.put(watcher, paths);
    }
    paths.add(path);//加入set
}

其实这个方法总的来说就是两大步,第一是更新路径->watcher的映射,第二步是更新watcher->路径的映射,很好理解。

removeWatcher

//与上面方法相反,remove对应的watcher
public synchronized void removeWatcher(Watcher watcher) {
    HashSet<String> paths = watch2Paths.remove(watcher);//从watcher->路径的映射中把整个watcher和它对应的所有path删掉
    if (paths == null) {//paths是否为空
        return;
    }
    for (String p : paths) {//不为空的话就取出来一个一个在路径->watcher的映射里扫描
        HashSet<Watcher> list = watchTable.get(p);//取出watcher的set
        if (list != null) {
            list.remove(watcher);//remove对应的watcher
            if (list.size() == 0) {//如果之前只有一个watcher,那么相应的path就没有watcher了,应该删掉
                watchTable.remove(p);
            }
        }
    }
}

这里其实也是两大步,第一是更新watcher->路径的映射,第二步更新路径->watcher的映射,只是第二步的时候需要遍历所有path。

triggerWatch

//根据事件类型和路径触发watcher,supress是指定的应该被过滤的watcher集合
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
            KeeperState.SyncConnected, path);//新建watchedEvent对象,这时一定是连接状态的
    HashSet<Watcher> watchers;
    synchronized (this) {
        watchers = watchTable.remove(path);//把对应路径所有的watcher删除并返回
        if (watchers == null || watchers.isEmpty()) {//watcher为空直接打log
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,
                        ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                        "No watchers for " + path);
            }
            return null;
        }
        for (Watcher w : watchers) {//watcher不为空
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);//把所有的路径删掉
            }
        }
    }
    for (Watcher w : watchers) {//遍历前面获得的所有watcher
        if (supress != null && supress.contains(w)) {//如果watcher在supress的set中跳过
            continue;
        }
        w.process(e);//不在set中就触发
    }
    return watchers;
}

这里有两点需要特别说一下:

  1. 为啥这里需要一个过滤的操作呢,可以通过下面datatree中deletenode里的代码可以了解:

    Set<Watcher> processed = dataWatches.triggerWatch(path,
            EventType.NodeDeleted);//1
    childWatches.triggerWatch(path, EventType.NodeDeleted, processed);//2
    childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
            EventType.NodeChildrenChanged);

可以看到,每个节点对应的watch会存到datawatches里,且如果一个节点是另一个节点的子节点,那么在server获取getchildren指令的时候会把children相关的的watch加入到datatree的childwatches里去。这时如果节点本身已经触发过了那么childwatches里的节点的watches便不用被触发了(因为节点都要被delete了,不存在子节点)。

  1. 最后的process方法并不是客户端的watcher,而是ServerCnxn的process,默认实现是NioserverCnxn。
@Override
synchronized public void process(WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                 "Deliver event " + event + " to 0x"
                                 + Long.toHexString(this.sessionId)
                                 + " through " + this);
    }

    // Convert WatchedEvent to a type that can be sent over the wire
    WatcherEvent e = event.getWrapper();//包装watcherevent

    sendResponse(h, e, "notification");//发送回复
}

DumpWatches

/**
 * String representation of watches. Warning, may be large!
 * @param byPath iff true output watches by paths, otw output
 * watches by connection
 * @return string representation of watches
 */
//把watch写到磁盘中
public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
    if (byPath) {
        for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) {
            pwriter.println(e.getKey());//利用PrintWriter去写
            for (Watcher w : e.getValue()) {
                pwriter.print("	0x");
                pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
                pwriter.print("
");
            }
        }
    } else {
        for (Entry<Watcher, HashSet<String>> e : watch2Paths.entrySet()) {
            pwriter.print("0x");
            pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
            for (String path : e.getValue()) {
                pwriter.print("	");
                pwriter.println(path);
            }
        }
    }
}

总结

  1. zk的cnxn的实现由NIO和Netty两种方式,最近工作也用了些Netty相关的,抽空好好学习总结下。

参考

https://www.cnblogs.com/leesf456/p/6288709.html

https://www.jianshu.com/p/9cf98fab15ac

以上是关于Zookeeper源码阅读 Server端Watcher的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper源码阅读(十五) Zookeeper集群之server启动

Zookeeper源码阅读 Seesion

ZooKeeper单机服务端的启动源码阅读

ZooKeeper源码阅读心得分享+源码基本结构+源码环境搭建

如何远程调试zookeeper集群

ZooKeeper 源码阅读版本选择