Curator实现zookeeper的节点监听
Posted 暴躁的程序猿啊
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Curator实现zookeeper的节点监听相关的知识,希望对你有一定的参考价值。
Curator实现zookeeper的节点监听
Curtor框架中一共有三个实现监听的方式
一种是NodeCache监听指定节点
一种是pathChildrenCache监听子节点
一种是TreeCache可以监控所有节点 相当于以上两种的合集
引入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
创建一个测试类 连接好客户端
public class CuratorTest {
private CuratorFramework curatorFramework;
@Before
public void testCreate() {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(2, 10);
curatorFramework = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.connectionTimeoutMs(60 * 1000)
.sessionTimeoutMs(15 * 10000)
.retryPolicy(retry).build();
curatorFramework.start();
}
@After
public void close() {
if (curatorFramework != null) {
curatorFramework.close();
}
}
}
Watch监听之NodeCache
监听一个指定节点
@Test
public void testUpdate() throws Exception {
//监听一个节点
NodeCache nodeCache = new NodeCache(curatorFramework,"/dongwuyuan");
//注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了!!!!");
}
});
//开启监听 参数 如果设置为true 则开启监听时加载缓存数据
nodeCache.start(true);
while (true){
}
}
set /dongwuyuan "laohu"
Watch监听之PathChildrenCache
监听子节点的变化
@Test
public void testPathChildrenCache() throws Exception {
// 参数 客户端,路径 ,缓存数据,是否压缩,线程池
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework,"/dongwuyuan",true);
//绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点变化了");
System.out.println(pathChildrenCacheEvent);
//监听子节点的变更,并且拿到变更后的数据
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//判断类型是否是update
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
//拿到数据
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(data);
}
}
});
//开启监听
pathChildrenCache.start();
while (true){
}
}
watch监听之TreeCache
/**
* TreeCache:监听节点自己和所有子节点们
*/
@Test
public void testTreeCache(){
//1.创建监听器
TreeCache treeCache = new TreeCache(curatorFramework, "/dongwuyuan");
//2.注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("节点变化了");
System.out.println(event);
}
});
//开启监听
try {
treeCache.start();
while (true){
}
} catch (Exception e) {
e.printStackTrace();
}
}
[zk: localhost:2181(CONNECTED) 13] delete /dongwuyuan/node1
[zk: localhost:2181(CONNECTED) 15] set /dongwuyuan/node2 "shizi"
完整代码
public class CuratorTest {
private CuratorFramework curatorFramework;
@Before
public void testCreate() {
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(2, 10);
curatorFramework = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.connectionTimeoutMs(60 * 1000)
.sessionTimeoutMs(15 * 10000)
.retryPolicy(retry).build();
curatorFramework.start();
}
@Test
public void testUpdate() throws Exception {
//监听一个节点
NodeCache nodeCache = new NodeCache(curatorFramework,"/dongwuyuan");
//注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了!!!!");
}
});
//开启监听 参数 如果设置为true 则开启监听时加载缓存数据
nodeCache.start(true);
while (true){
}
}
@Test
public void testPathChildrenCache() throws Exception {
// 参数 客户端,路径 ,缓存数据,是否压缩,线程池
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework,"/dongwuyuan",true);
//绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点变化了");
System.out.println(pathChildrenCacheEvent);
//监听子节点的变更,并且拿到变更后的数据
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
//判断类型是否是update
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
//拿到数据
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(data);
}
}
});
//开启监听
pathChildrenCache.start();
while (true){
}
}
/**
* TreeCache:监听节点自己和所有子节点们
*/
@Test
public void testTreeCache(){
//1.创建监听器
TreeCache treeCache = new TreeCache(curatorFramework, "/dongwuyuan");
//2.注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("节点变化了");
System.out.println(event);
}
});
//开启监听
try {
treeCache.start();
while (true){
}
} catch (Exception e) {
e.printStackTrace();
}
}
@After
public void close() {
if (curatorFramework != null) {
curatorFramework.close();
}
}
}
以上是关于Curator实现zookeeper的节点监听的主要内容,如果未能解决你的问题,请参考以下文章
Zookeeper -- Zookeeper JavaAPI相关操作(Curator介绍Curator API 常用操作(节点的CRUD,Watch事件监听)分布式锁模拟12306售票案例)