Zookeeper--06---Curator客户端的使⽤zk的watch机制
Posted 高高for 循环
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper--06---Curator客户端的使⽤zk的watch机制相关的知识,希望对你有一定的参考价值。
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
Zookeeper客户端(zkCli)的使⽤
Zookeeper–03–常用Shell命令
Curator客户端的使⽤
Curator介绍
- Curator是Netflix公司开源的⼀套zookeeper客户端框架,Curator是对Zookeeper⽀持最好 的客户端框架。
- Curator封装了⼤部分Zookeeper的功能,⽐如Leader选举、分布式锁等,减少了技术⼈员在使⽤Zookeeper时的底层细节开发⼯作。
1.引⼊Curator
<!--Curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<!--Zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.14</version>
</dependency>
application.properties配置⽂件
curator.retryCount=5
curator.elapsedTimeMs=5000
curator.connectString=172.16.253.35:2181
curator.sessionTimeoutMs=60000
curator.connectionTimeoutMs=5000
注⼊配置Bean
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "curator")
public class WrapperZK
private int retryCount;
private int elapsedTimeMs;
private String connectString;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
注⼊CuratorFramework
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CuratorConfig
@Autowired
WrapperZK wrapperZk;
@Bean(initMethod = "start")
public CuratorFramework curatorFramework()
return CuratorFrameworkFactory.newClient(
wrapperZk.getConnectString(),
wrapperZk.getSessionTimeoutMs(),
wrapperZk.getConnectionTimeoutMs(),
new RetryNTimes(wrapperZk.getRetryCount(), wrapperZk.getElapsedTimeMs()));
2.创建节点
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.zookeeper.CreateMode;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.logging.Logger;
@Slf4j
@SpringBootTest
class BootZkClientApplicationTests
@Autowired
CuratorFramework curatorFramework;
@Test
void createNode() throws Exception
//添加持久节点
String path = curatorFramework.create().forPath("/curator-node");
//添加临时序号节点
String path1 = curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "some-data".getBytes());
System.out.println(String.format("curator create node :%s successfully.",path));
System.in.read();
3.获得节点数据
@Test
public void testGetData() throws Exception
byte[] bytes = curatorFramework.getData().forPath("/curator-node");
System.out.println(new String(bytes));
4.修改节点数据
@Test
public void testSetData() throws Exception
curatorFramework.setData().forPath("/curator-node","changed!".getBytes());
byte[] bytes = curatorFramework.getData().forPath("/curator-node");
System.out.println(new String(bytes));
5.创建节点同时创建⽗节点
@Test
public void testCreateWithParent() throws Exception
String pathWithParent="/node-parent/sub-node-1";
String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
System.out.println(String.format("curator create node :%s successfully.",path));
6.删除节点
@Test
public void testDelete() throws Exception
String pathWithParent="/node-parent";
curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
zk的watch机制
1.Watch机制介绍
- 我们可以把 Watch 理解成是注册在特定 Znode 上的触发器。
- 当这个 Znode 发⽣改变,也就是调⽤了 create , delete , setData ⽅法的时候,将会触发 Znode上注册的对应事件,请求 Watch 的客户端会接收到异步通知。
具体交互过程如下:
- 客户端调⽤ getData ⽅法, watch 参数是 true 。服务端接到请求,返回节点数据,并且在对应的哈希表⾥插⼊被 Watch 的 Znode 路径,以及 Watcher 列表。
- 当被 Watch 的 Znode 已删除,服务端会查找哈希表,找到该 Znode 对应的所有Watcher,异步通知客户端,并且删除哈希表中对应的 Key-Value。
客户端使⽤了NIO通信模式监听服务端的调⽤。
2.监听器原理
3.zkCli客户端使⽤watch
get -w /test ⼀次性监听节点
4.curator客户端使⽤watch
@Test
public void addNodeListener() throws Exception
NodeCache nodeCache = new NodeCache(curatorFramework, "/curator-node");
nodeCache.getListenable().addListener(new NodeCacheListener()
@Override
public void nodeChanged() throws Exception
log.info(" path nodeChanged: ","/curator-node");
printNodeData();
);
nodeCache.start();
System.in.read();
public void printNodeData() throws Exception
byte[] bytes = curatorFramework.getData().forPath("/curator-node");
log.info("data: ",new String(bytes));
以上是关于Zookeeper--06---Curator客户端的使⽤zk的watch机制的主要内容,如果未能解决你的问题,请参考以下文章