zookeeper学习-5Java API操作 - Watcher监听机制

Posted weixin_45135482

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper学习-5Java API操作 - Watcher监听机制相关的知识,希望对你有一定的参考价值。

Watch事件监听

概念

ZooKeeper允许用户在指定节点上注册一些Watcher, 并且在一些特定事件触发的时候, ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是ZooKeeper实现分布式协调服务的重要特性。

ZooKeeper中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象, 当一个对象自身状态变化时,会通知所有订阅者。

ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐。

Curator引入了Cache来实现对ZooKeeper服务端事件的监听。

ZooKeeper提供了三种Watcher:

  • NodeCache:只是监听某一个特定的节点
  • PathChildrenCache: 监控一个ZNode的子节点
  • TreeCache: 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

总结:Watcher实际就是一种监听机制。当A节点发生变更的时候,就会被watcher自动监听到,通知关注了A节点变更的B节点,B节点就可以做出相应的操作。

代码实现

1. NodeCache

    /**
     * 演示 NodeCache:给指定一个节点注册监听器
     */
    @Test
    public void testNodeCache() throws Exception 
        // 1. 创建NodeCache
        NodeCache nodeCache = new NodeCache(client, "/app7");

        // 2. 注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() 
            @Override
            public void nodeChanged() throws Exception 
                System.out.println("/app7节点发生变更");
            
        );

        // 3. 开启监听,如果设置为true,则开启监听时,加载缓冲数据
        nodeCache.start(true);

        // 让程序不要结束,一直开启监听
        while(true)
    

对app7节点进行 修改数据、创建子节点、删除子节点、删除节点操作 

只有修改数据、删除节点事件被监听到

 

如果需要获取变更后节点的数据和信息,可在监听方法中通过nodeCache对象获取。

查看修改后的节点状态信息也是可以通过 nodeCache.getCurrentData().getStat() 获取。

        // 2. 注册监听
        nodeCache.getListenable().addListener(new NodeCacheListener() 
            @Override
            public void nodeChanged() throws Exception 
                System.out.println("/app7节点发生变更");

                // 获取修改节点后的数据
                byte[] dataBytes = nodeCache.getCurrentData().getData();
                System.out.println("修改后的数据:" + new String(dataBytes));
            
        );

在客户端修改节点数据。 

程序监听成功。

2. PathChildrenCache

    /**
     * 演示 PathChildrenCache:监听指定节点下的所有子节点,无法监听指定节点自身
     */
    @Test
    public void testPathChildrenCache() throws Exception 
        // 1. 创建PathChildrenCache
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app6", true);

        // 2. 注册监听
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() 
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception 
                System.out.println("/app6的子节点发生变更");
                System.out.println(pathChildrenCacheEvent);
            
        );

        // 3. 开启监听,如果设置为true,则开启监听时,加载缓冲数据
        pathChildrenCache.start(true);

        // 让程序不要结束,一直开启监听
        while(true)
    

在 app6 节点下,添加,修改,删除 p5 节点

 

这些操作都被成功监听,可以通过 event 对象可以看到对哪个子节点做了什么变更。

3. TreeCache

    /**
     * 演示 TreeCache:监听节点及其子节点 相当于 NodeCache 和 PathChildren 的结合体
     */
    @Test
    public void testTreeCache() throws Exception 
        // 1. 创建TreeCache
        TreeCache treeCache = new TreeCache(client, "/app6");

        // 2. 注册监听
        treeCache.getListenable().addListener(new TreeCacheListener() 
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception 
                System.out.println("/app6节点或其子节点发生了变更");
                System.out.println(treeCacheEvent);
            
        );

        // 3. 开启监听,如果设置为true,则开启监听时,加载缓冲数据
        treeCache.start();

        // 让程序不要结束,一直开启监听
        while(true)

在 app6 节点,创建,修改,删除子节点 p6

修改 app6 节点的数据

在程序中,自身节点和子节点的变更,均能被监听到。

变更后的数据,也是可以通过 event 对象来获取。

过时类替代

事实上,以上演示的NodeCache、PathChildren、TreeCache已经过时,已经被CuratorCache代替了。

以下代码是以PathChildrenCache为例:

第一步的 XxxCache对象的创建方式,需要通过CuratorCache创建出来。

第二步注册监听,需要通过CuratorCacheListener.builder().forXxxxCache(),来选择监听类型。

    @Test
    public void testPathChildrenCache2() throws Exception 
        // 1. 创建PathChildrenCache
        CuratorCache pathChildrenCache = CuratorCache.builder(client, "/app8").build();

        // 2. 注册监听
        pathChildrenCache.listenable().addListener(
                CuratorCacheListener.builder().forPathChildrenCache("/app8", client, new PathChildrenCacheListener() 
                    @Override
                    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception 
                        System.out.println("/app8的子节点发生了变更");
                        System.out.println(event);
                    
                ).build()
        );

        // 3. 开启监听
        pathChildrenCache.start();

        while (true)
    

以上是关于zookeeper学习-5Java API操作 - Watcher监听机制的主要内容,如果未能解决你的问题,请参考以下文章

七天玩转Redis | Day5Java操作Redis

Zookeeper学习 客户端和原生API

zookeeper学习-4Java API操作 - 服务端和客户端操作

zookeeper学习02-Java API操作

大数据讲课笔记6.6 ZooKeeper的Java API操作

大数据讲课笔记6.6 ZooKeeper的Java API操作