ZooKeeperCurator API介绍及基本使用
Posted 辰纪忻
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeperCurator API介绍及基本使用相关的知识,希望对你有一定的参考价值。
目录
介绍
Curator是Apache ZooKeeper的Java客户端库,常见的ZooKeeper Java API有原生的Java API、ZkClient、Curator。Curator框架在zookeeper原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。提供ZooKeeper各种应用场景如:分布式锁服务、集群领导选举、分布式队列等的抽象封装,是最好用、最流行的zookeeper的客户端。
Curator提供的常见操作有建立连接、节点创建、节点查询、节点删除、节点修改、Watch事件监听、分布式锁的实现等。
使用
1.建立连接
首先在IDEA里新建一个空的maven项目,引入相关依赖和插件
<dependencies>
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<!-- curator recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
<!-- curator framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<!-- 日志相关 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
连接有两种建立方法如下,个人比较推荐使用第二种
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//方法一
/*
connectString 连接字符串--> ip:端口号,ip:端口号...
sessionTimeoutMs 会话超时时间(ms) 默认值为60*1000
connectionTimeoutMs 连接超时时间(ms) 默认值为45*1000
retryPolicy 重试策略
*/
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.5.156:2181", 60 * 1000, 45 * 1000, retryPolicy);
client.start(); //开启连接
//方法二
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.5.156:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(45 * 1000)
.retryPolicy(retryPolicy)
.namespace("jason") //加上namespace可以使每次创建或修改的节点都是在/jason/下进行的
.build();
client.start();
2.节点创建
在已建立连接的基础上就可以开始对节点进行一些操作了,此处的client是建立连接时的CuratorFramework对象,通过调用create().forPath()创建节点,创建节点默认存储的值是当前客户端的ip地址,若要自定义值需要再加一个值参数(byte数组类型)。与此同时,节点默认也是持久化的,若有需要则可以通过withMode来指定存储类型。
ps:注意forPath异常的抛出
@Test
public void createTest1() throws Exception {
//默认节点的值是当前客户端的ip地址,自定义时要注意值的类型为byte数组
client.create().forPath("/app2","hello".getBytes());
}
@Test
public void createTest2() throws Exception {
//创建节点时默认为持久化节点
//若要设置其他类型需要使用到withMode方法
client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
}
3.查询节点
/*
测试查询
查询数据:get
查询子节点:ls
查询子节点状态信息:ls -s
*/
@Test
public void getTest() throws Exception {
//查询节点数据:get
byte[] data1 = client.getData().forPath("/app1");
//查询子节点:ls
List<String> nodes = client.getChildren().forPath("/app1");
//查询子节点状态信息:ls -s
Stat status = new Stat();
client.getData.storingStatIn(status).forPath("/app1");
}
4.修改节点
普通修改节点数据
@Test
public void setTest1() throws Exception {
client.setData().forPath("/app1","Jason".getBytes());
}
通过版本号修改节点数据
首先要查询当前节点的状态信息(参考3.查询节点),然后通过getVersion获取版本号
@Test
public void setTest2() throws Exception {
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/app1");
int version = status.getVersion();
client.setData().withVersion(version).forPath("/app1","这是修改的数据".getBytes());
}
5.删除节点
前两行的实现对应了delete和deleteall
由于连接可能出现超时或者网络的波动,通过guaranteed可以保证删除的成功性(尽量加上这个)
要想删除实现回调,则需要调用inBackground,通过匿名内部类或Lambda表达式实现BackgroundCallback接口,重写processResult,执行一些回调任务。
@Test
public void deleteTest() throws Exception {
//删除空节点
client.delete().forPath("/app1");
//删除非空节点(包含子节点)
client.delete().deletingChildrenIfNeeded().forPath("/app1");
//若连接超时或出现异常,以上两种方式可能出现删除失败的情况,于是可以通过以下方式保证删除的成功
client.delete().guaranteed().forPath("/app1");
//回调
client.delete().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println(curatorEvent);
}
}).forPath("/app1");
}
Watch事件监听
1.概述
ZooKeeper允许用户在指定节点上注册一些Watcher,并在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是实现分布式协调服务的重要特性。
ZooKeeper中引入了Watcher机制来实现了发布/订阅功能,可以让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
Curator引入了Cache来实现对ZooKeeper服务端事件的监听。
ZooKeeper提供了三种Watcher:
1.NodeCache:只是监听某一个特定节点
2.PathChildrenCache:监控一个ZNode的子节点
3.TreeCache:可以监控整个树上的所有节点,类似于NodeCache和PathChildrenCache的组合
2.三种Watcher的实现
1. NodeCache
@Test
public void testNodeCache() throws Exception {
//1.创建NodeCache对象
NodeCache nodeCache = new NodeCache(client,"/app1");
//2.注册监听 在重写的方法中编写触发事件后要执行的任务
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了~");
//获取当前修改后的值
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3.开启监听
nodeCache.start(true);
while(true){
//保证当前线程一直在跑,因为要测试相关操作
}
}
2.PathChildrenCache
@Test
public void testPathChildrenCache() throws Exception {
//1.创建PathChildrenCache对象 cacheData表示是否缓存变化的事件
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app1",true);
//2.注册监听 在重写的方法中编写触发事件后要执行的任务
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
System.out.println("子节点发生变化了~");
//这里打印一下event
System.out.println(event);
//监听子节点的数据变更,并拿到变更后的数据
//获取子节点的事件类型
PathChildrenCacheEvent.Type type = event.getType();
//若事件为update,那么打印当前变化的数据
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}
});
//3.开启监听
pathChildrenCache.start();
while (true){
//保证当前线程一直在跑,因为要测试相关操作
}
}
3.TreeCache
@Test
public void testTreeCache() throws Exception {
//1.创建TreeCache对象
TreeCache treeCache = new TreeCache(client,"/app1");
//2.注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {
System.out.println("节点或子节点变化了~");
//看一下事件
System.out.println(event);
//具体可以参照前两个
}
});
//3.开启监听
treeCache.start();
while (true){
//保证当前线程一直在跑,因为要测试相关操作
}
}
以上是关于ZooKeeperCurator API介绍及基本使用的主要内容,如果未能解决你的问题,请参考以下文章
ASP.NET Aries 3.0发布(附带通用API设计及基本教程介绍)