Ⅵ:zookeeper的Watcher事件监听机制
Posted IT挖掘机y
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Ⅵ:zookeeper的Watcher事件监听机制相关的知识,希望对你有一定的参考价值。
2021最新zookeeper系列
❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️
Ⅰ:zookeeper的单机安装 - 详细教程:https://blog.csdn.net/Kevinnsm/article/details/116134397?spm=1001.2014.3001.5501
Ⅱ:zookeeper的相关shell命令:https://blog.csdn.net/Kevinnsm/article/details/116137602?spm=1001.2014.3001.5501
Ⅲ:zookeeper之查看节点的状态信息:https://blog.csdn.net/Kevinnsm/article/details/116143218?spm=1001.2014.3001.5501
Ⅳ:zookeeper的acl权限控制:https://blog.csdn.net/Kevinnsm/article/details/116167394?spm=1001.2014.3001.5501
Ⅴ:zookeeper的相关Java Api:https://blog.csdn.net/Kevinnsm/article/details/116462557?spm=1001.2014.3001.5501
Ⅵ:zookeeper的Watcher事件监听机制:https://blog.csdn.net/Kevinnsm/article/details/116501842?spm=1001.2014.3001.5501
Ⅶ:教你一招利用zookeeper作为服务的配置中心:https://blog.csdn.net/Kevinnsm/article/details/116542974?spm=1001.2014.3001.5501
❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️
文章目录
xshell7连接云服务器演示结果,如果未知请看第一章
前置:–》把握住Watcher流程《–
1、连接zookeeper服务器
2、连接时必须使当前线程等待(等待其他线程创建连接zookeeper服务成功,使用计数器实现)
3、执行回调函数process
4、释放当前线程
1、watcher的连接状态判断
package com.zookeeper.watcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* @author:抱着鱼睡觉的喵喵
* @date:2021/5/7
* @description:
*/
public class WatcherConnection implements Watcher {
//计数器,使当前线程等待其他线程完成
static CountDownLatch countDownLatch = new CountDownLatch(1);
static ZooKeeper zooKeeper;
public static void main(String[] args) {
try {
//连接zookeeper服务
zooKeeper = new ZooKeeper("8.140.37.103:2181", 5000, new WatcherConnection());
//使当前线程等待其他线程完成(其他线程也就是连接zookeeper服务的线程)
countDownLatch.await();
Thread.sleep(1000);
zooKeeper.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
//回调函数,进性状态的判断
@Override
public void process(WatchedEvent watchedEvent) {
try {
if (watchedEvent.getType() == Event.EventType.None) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功!");
countDownLatch.countDown();
} else if (watchedEvent.getState() == Event.KeeperState.Disconnected) {
System.out.println("断开连接");
} else if (watchedEvent.getState() == Event.KeeperState.Expired) {
System.out.println("超时了");
} else if (watchedEvent.getState() == Event.KeeperState.AuthFailed) {
System.out.println("认证失败!");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、watcher机制下的exists
Ⅰ、连接对象的监听器
public class WatcherExistsTest {
private String IP = "8.140.37.103:2181";
private ZooKeeper zookeeper;
@Before
public void connection() throws IOException, InterruptedException {
//计数器对象,使当前线程等待其他线程的完成
final CountDownLatch downLatch = new CountDownLatch(1);
zookeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//判断是否连接成功
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
//使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了)
downLatch.countDown();
System.out.println("连接成功!");
}
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
});
//主线程进入等待态
downLatch.await();
}
@Test
public void watcherExists() throws KeeperException, InterruptedException {
//第一个参数是节点路径
//第二个参数为Boolean类型,true代表监听path下的节点,false表示不进行监听
zookeeper.exists("/exists", true);
Thread.sleep(10000);
}
@After
public void close() {
try {
zookeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
此时在zookeeper客户端创建/exists节点
IDEA控制台就会出现NodeCreated在这里插入代码片
当然还有删除节点的NodeDeleted等,不再演示
Ⅱ、自定义watcher
public class WatcherExistsTest {
private String IP = "8.140.37.103:2181";
private ZooKeeper zookeeper;
@Before
public void connection() throws IOException, InterruptedException {
//计数器对象,使当前线程等待其他线程的完成
final CountDownLatch downLatch = new CountDownLatch(1);
zookeeper = new ZooKeeper(IP, 6000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
//使CountDownLatch减到0(初始为1),其他线程可以继续执行(该处应该是主线程可以继续执行了)
downLatch.countDown();
}
}
});
//主线程进入等待态
downLatch.await();
}
@Test
public void watcherExists2() throws KeeperException, InterruptedException {
zookeeper.exists("/exists2", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("自定义watcher!");
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
});
Thread.sleep(10000);
System.out.println("--------------");
}
@After
public void close() {
try {
zookeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行@Test注解方法-》客户端创建/exists2节点-》IDEA控制台查看结果
当我修改/exists2节点的数据时,控制台出现了NodeDataChanged
Ⅲ、watcher的多次监听
本质上只能进性一次注册,一次监听;当然可以利用循环调用进行生命周期内的多次监听
@Test
public void watcherExists2() throws KeeperException, InterruptedException {
zookeeper.exists("/exists2", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
System.out.println("自定义watcher!");
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
zookeeper.exists("/exists2", this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread.sleep(10000);
System.out.println("--------------");
}
Ⅳ、多个watcher同时监听一个节点
一般来说这种多个监听对象才比较符合发布-订阅模式,当节点中的数据发生变化时,会通知所有的监听对象。
@Test
public void watcherExists3() throws KeeperException, InterruptedException {
System.out.println("============================");
zookeeper.exists("/exists3", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象1");
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
}
});
zookeeper.exists("/exists3", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象2");
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
}
});
zookeeper.exists("/exists3", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听对象3");
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
}
});
Thread.sleep(10000);
System.out.println("==========================");
}
3、watcher机制下的getData
getData(String path, boolean b, Stat stat)连接对象的监听器
getData(String path, watcher watcher, Stat stat) 自定义的监听器
Ⅰ、连接对象的监听器
public class WatcherGetDataTest {
static CountDownLatch countDownLatch = new CountDownLatch(1);
static ZooKeeper zooKeeper;
final String IP = "8.140.37.103:2181";
@Before
public void before() throws IOException, InterruptedException {
zooKeeper = new ZooKeeper(IP, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("=================");
countDownLatch.countDown();
}
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
});
countDownLatch.await();
}
@Test
public void test() throws KeeperException, InterruptedException {
zooKeeper.getData("/data",true, null);
Thread.sleep(10000);
System.out.println("=======================");
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
}
启动测试-》修改data节点的数据-》查看idea控制台结果
Ⅱ、自定义watcher监听器
@Test
public void test2() throws KeeperException, InterruptedException {
System.out.println("========================");
zooKeeper.getData("/data", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
}
}, null);
Thread.sleep(10000);
System.out.println("============================");
}
Ⅲ、多次watcher监听
@Test
public void test3() throws KeeperException, InterruptedException {
System.out.println("=========================");
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getPath());
zooKeeper.getData("/data", this, null);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
zooKeeper.getData("/data", watcher, null);
Thread.sleep(5000);
System.out.println("=======================");
}
Ⅳ、多个watcher
以上是关于Ⅵ:zookeeper的Watcher事件监听机制的主要内容,如果未能解决你的问题,请参考以下文章