Ⅵ:zookeeper的Watcher事件监听机制

Posted IT挖掘机y

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Ⅵ:zookeeper的Watcher事件监听机制相关的知识,希望对你有一定的参考价值。

2021最新zookeeper系列

❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️

Ⅰ:zookeeper的单机安装 - 详细教程:https://blog.csdn.net/Kevinnsm/article/details/116134397?spm=1001.2014.3001.5501

Ⅱ:zookeeper的相关shell命令:https://blog.csdn.net/Kevinnsm/article/details/116137602?spm=1001.2014.3001.5501

Ⅲ:zookeeper之查看节点的状态信息:https://blog.csdn.net/Kevinnsm/article/details/116143218?spm=1001.2014.3001.5501

Ⅳ:zookeeper的acl权限控制:https://blog.csdn.net/Kevinnsm/article/details/116167394?spm=1001.2014.3001.5501

Ⅴ:zookeeper的相关Java Api:https://blog.csdn.net/Kevinnsm/article/details/116462557?spm=1001.2014.3001.5501

Ⅵ:zookeeper的Watcher事件监听机制:https://blog.csdn.net/Kevinnsm/article/details/116501842?spm=1001.2014.3001.5501

Ⅶ:教你一招利用zookeeper作为服务的配置中心:https://blog.csdn.net/Kevinnsm/article/details/116542974?spm=1001.2014.3001.5501

❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️


在这里插入图片描述

xshell7连接云服务器演示结果,如果未知请看第一章

前置:–》把握住Watcher流程《–

1、连接zookeeper服务器
2、连接时必须使当前线程等待(等待其他线程创建连接zookeeper服务成功,使用计数器实现)
3、执行回调函数process
4、释放当前线程

1、watcher的连接状态判断

package com.zookeeper.watcher;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @author:抱着鱼睡觉的喵喵
 * @date:2021/5/7
 * @description:
 */
public class WatcherConnection implements Watcher {
//计数器,使当前线程等待其他线程完成
    static CountDownLatch countDownLatch = new CountDownLatch(1);
    static ZooKeeper zooKeeper;
    public static void main(String[] args) {
        try {
        //连接zookeeper服务
            zooKeeper = new ZooKeeper("8.140.37.103:2181", 5000, new WatcherConnection());
            //使当前线程等待其他线程完成(其他线程也就是连接zookeeper服务的线程)
            countDownLatch.await();
            Thread.sleep(1000);
            zooKeeper.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    //回调函数,进性状态的判断
    @Override
    public void process(WatchedEvent watchedEvent) {
        try {
            if (watchedEvent.getType() == Event.EventType.None) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("连接成功!");
                    countDownLatch.countDown();
                } else if (watchedEvent.getState() == Event.KeeperState.Disconnected) {
                    System.out.println("断开连接");
                } else if (watchedEvent.getState() == Event.KeeperState.Expired) {
                    System.out.println("超时了");
                } else if (watchedEvent.getState() == Event.KeeperState.AuthFailed) {
                    System.out.println("认证失败!");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

2、watcher机制下的exists

Ⅰ、连接对象的监听器

public class WatcherExistsTest {
    private String IP = "8.140.37.103:2181";
    private ZooKeeper zookeeper;
    @Before
    public void connection() throws IOException, InterruptedException {
        //计数器对象,使当前线程等待其他线程的完成
        final CountDownLatch downLatch = new CountDownLatch(1);
        zookeeper = new ZooKeeper(IP, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //判断是否连接成功
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    //使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了)
                    downLatch.countDown();
                    System.out.println("连接成功!");
                }
                System.out.println(watchedEvent.getPath());
                System.out.println(watchedEvent.getType());
            }
        });
        //主线程进入等待态
        downLatch.await();
    }
	 @Test
    public void watcherExists() throws KeeperException, InterruptedException {
        //第一个参数是节点路径
        //第二个参数为Boolean类型,true代表监听path下的节点,false表示不进行监听
        zookeeper.exists("/exists", true);
        Thread.sleep(10000);
    }
    @After
    public void close() {
        try {
            zookeeper.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

此时在zookeeper客户端创建/exists节点
在这里插入图片描述
IDEA控制台就会出现NodeCreated在这里插入代码片

当然还有删除节点的NodeDeleted等,不再演示
在这里插入图片描述

Ⅱ、自定义watcher

public class WatcherExistsTest {
    private String IP = "8.140.37.103:2181";
    private ZooKeeper zookeeper;
    @Before
    public void connection() throws IOException, InterruptedException {
        //计数器对象,使当前线程等待其他线程的完成
        final CountDownLatch downLatch = new CountDownLatch(1);
        zookeeper = new ZooKeeper(IP, 6000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    //使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了)
                    downLatch.countDown();
                }
            }
        });
        //主线程进入等待态
        downLatch.await();
    }

    @Test
    public void watcherExists2() throws KeeperException, InterruptedException {
        zookeeper.exists("/exists2", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("自定义watcher!");
                System.out.println(watchedEvent.getPath());
                System.out.println(watchedEvent.getType());
            }
        });
        Thread.sleep(10000);
        System.out.println("--------------");
    }
    @After
    public void close() {
        try {
            zookeeper.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行@Test注解方法-》客户端创建/exists2节点-》IDEA控制台查看结果
在这里插入图片描述
在这里插入图片描述

当我修改/exists2节点的数据时,控制台出现了NodeDataChanged
在这里插入图片描述
在这里插入图片描述

Ⅲ、watcher的多次监听

本质上只能进性一次注册,一次监听;当然可以利用循环调用进行生命周期内的多次监听

    @Test
    public void watcherExists2() throws KeeperException, InterruptedException {

        zookeeper.exists("/exists2", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    System.out.println("自定义watcher!");
                    System.out.println(watchedEvent.getPath());
                    System.out.println(watchedEvent.getType());
                    zookeeper.exists("/exists2", this);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread.sleep(10000);
        System.out.println("--------------");
    }

在这里插入图片描述
在这里插入图片描述

Ⅳ、多个watcher同时监听一个节点

一般来说这种多个监听对象才比较符合发布-订阅模式,当节点中的数据发生变化时,会通知所有的监听对象。

@Test
    public void watcherExists3() throws KeeperException, InterruptedException {
        System.out.println("============================");
        zookeeper.exists("/exists3", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("监听对象1");
                System.out.println(watchedEvent.getType());
                System.out.println(watchedEvent.getPath());
            }
        });
        zookeeper.exists("/exists3", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("监听对象2");
                System.out.println(watchedEvent.getType());
                System.out.println(watchedEvent.getPath());
            }
        });
        zookeeper.exists("/exists3", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("监听对象3");
                System.out.println(watchedEvent.getType());
                System.out.println(watchedEvent.getPath());
            }
        });
        Thread.sleep(10000);
        System.out.println("==========================");
    }

在这里插入图片描述
在这里插入图片描述


3、watcher机制下的getData

getData(String path, boolean b, Stat stat)连接对象的监听器
getData(String path, watcher watcher, Stat stat) 自定义的监听器

Ⅰ、连接对象的监听器

public class WatcherGetDataTest {
    static CountDownLatch countDownLatch = new CountDownLatch(1);
    static ZooKeeper zooKeeper;
    final String IP = "8.140.37.103:2181";
    @Before
    public void before() throws IOException, InterruptedException {
        zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("=================");
                    countDownLatch.countDown();
                }
                System.out.println(watchedEvent.getPath());
                System.out.println(watchedEvent.getType());
            }
        });
        countDownLatch.await();
    }

    @Test
    public void test() throws KeeperException, InterruptedException {
        zooKeeper.getData("/data",true, null);
        Thread.sleep(10000);
        System.out.println("=======================");
    }
    @After
    public void after() throws InterruptedException {
        zooKeeper.close();
    }

}

启动测试-》修改data节点的数据-》查看idea控制台结果
在这里插入图片描述
在这里插入图片描述

Ⅱ、自定义watcher监听器

    @Test
    public void test2() throws KeeperException, InterruptedException {
        System.out.println("========================");
        zooKeeper.getData("/data", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent.getType());
                System.out.println(watchedEvent.getPath());
            }
        }, null);
        Thread.sleep(10000);
        System.out.println("============================");
    }

在这里插入图片描述
在这里插入图片描述

Ⅲ、多次watcher监听

    @Test
    public void test3() throws KeeperException, InterruptedException {
        System.out.println("=========================");
        Watcher watcher = new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

                try {
                    System.out.println(watchedEvent.getType());
                    System.out.println(watchedEvent.getPath());
                    zooKeeper.getData("/data", this, null);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        zooKeeper.getData("/data", watcher, null);
        Thread.sleep(5000);
        System.out.println("=======================");
    }

在这里插入图片描述
在这里插入图片描述

Ⅳ、多个watcher

以上是关于Ⅵ:zookeeper的Watcher事件监听机制的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper Watcher(事件监听器)?

zookeeper源码分析-事件监听Watcher

Watcher监听

ZooKeeper Watcher 机制

Watcher 实现机制之client注冊

zookeeper:核心原理(Watcher事件和状态)