Zookeeper -- Zookeeper JavaAPI相关操作(Curator介绍Curator API 常用操作(节点的CRUD,Watch事件监听)分布式锁模拟12306售票案例)
Posted CodeJiao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper -- Zookeeper JavaAPI相关操作(Curator介绍Curator API 常用操作(节点的CRUD,Watch事件监听)分布式锁模拟12306售票案例)相关的知识,希望对你有一定的参考价值。
文章目录
- 1. Curator介绍
- 2. Curator API 常用操作
1. Zookeeper – 初识Zookeeper、Zookeeper的安装和配置、Zookeeper命令操作(Zookeeper数据模型 Zookeeper服务端 / 客户端常用命令)
2. Zookeeper – Zookeeper JavaAPI相关操作(Curator介绍、Curator API 常用操作(节点的CRUD,Watch事件监听)、分布式锁、模拟12306售票案例)
3. Zookeeper – Zookeeper 集群搭建 集群角色说明
1. Curator介绍
- Curator 是 Apache ZooKeeper 的Java客户端库。
- 常见的ZooKeeper Java API :
- 原生Java API
- ZkClient
- Curator
- Curator 项目的目标是简化 ZooKeeper 客户端的使用。
- Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。
- 官网:http://curator.apache.org/
2. Curator API 常用操作
我们先创建一个名为curator-zk
的普通maven
工程。
然后导入相关的依赖:pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tian</groupId>
<artifactId>curator-zk</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<!--日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
</project>
配置日志信息打印:log4j.properties
log4j.rootLogger=off,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%dyyyy-MM-dd HH/:mm/:ss]%-5p %c(line/:%L) %x-%m%n
新建一个Curator
测试类:
package com.tian;
public class CuratorTest
下面的这些方法都是写在这个测试类里面的。
2.1 建立连接
2.1.1 连接方式1
@Test
public void testConnect1()
CuratorFrameworkFactory.newClient("192.168.135.130:2181", 60 * 1000, 15 * 1000, new ExponentialBackoffRetry(300, 10)).start();
参数说明:
运行结果:
2.1.2 连接方式2(链式编程)
@Test
public void testConnect2()
CuratorFrameworkFactory
.builder()
.connectString("192.168.135.130:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(new ExponentialBackoffRetry(300, 10))
.build()
.start();
运行结果:
2.2 添加节点
在此之前,我们先做一点点改造:
package com.tian;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class CuratorTest
private CuratorFramework client;
@Test
public void testConnect1()
CuratorFrameworkFactory.newClient("192.168.135.130:2181", 60 * 1000, 15 * 1000, new ExponentialBackoffRetry(300, 10)).start();
@Before
public void testConnect2()
client = CuratorFrameworkFactory
.builder()
.connectString("192.168.135.130:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(new ExponentialBackoffRetry(300, 10))
.build();
client.start();
@After
public void closeConnection()
client.close();
2.2.1 基本创建
@Test
public void basicCreate() throws Exception
// 1. 基本创建
// 如果创建节点没有指定数据, 则默认将当前客户端的ip作为数据存储
client.create().forPath("/app1");
运行结果:
2.2.1 创建节点:带有数据
@Test
public void createWithData() throws Exception
// 2. 创建节点:带有数据
client.create().forPath("/app2", "CodeJiao".getBytes(StandardCharsets.UTF_8));
运行结果:
2.2.3 创建节点:设置节点类型
@Test
public void createWithType() throws Exception
// 3. 设置类型, 默认类型, 持久化
// 现在来创建一个临时节点 /app3
// CreateMode是一个枚举类型
client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
运行结果:
但是这里却看不见/app3
节点,因为临时节点只在当前会话有效。Java方法执行完毕后,当前会话就结束了。
2.2.4 创建多级节点
@Test
public void createMultiLevelNode() throws Exception
// 3. 创建多级节点 creatingParentsIfNeeded() 表示如果没有父节点 则自动创建父节点.
client.create().creatingParentsIfNeeded().forPath("/app4/p1");
运行结果:
2.2.5 小结
2.3 删除节点
2.3.1 删除单个节点(无子节点)
@Test
public void testDelEmptyNode() throws Exception
client.delete().forPath("/app1");
2.3.2 删除带有子节点的节点(任何情况都可以删除:包括有子结点也可以删除成功)
@Test
public void testDelNode() throws Exception
client.delete().deletingChildrenIfNeeded().forPath("/app1");
2.3.3 必须成功的删除
@Test
public void testDelNode() throws Exception
client.delete().guaranteed().forPath("/app1");
2.3.4 回调(删完之后需要做的事情)
@Test
public void testWithCallback() throws Exception
client.delete().guaranteed().inBackground(new BackgroundCallback()
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception
System.out.println("/app1节点被删除了");
System.out.println(curatorFramework.toString());
System.out.println(curatorEvent);
).forPath("/app2");
运行结果:
2.4 修改节点
2.4.1 修改数据
@Test
public void modifyData() throws Exception
client.setData().forPath("/app1", "数据被修改了".getBytes(StandardCharsets.UTF_8));
运行结果:
运行结果:
2.4.2 根据版本去修改数据(保证不被其他线程或操作干扰)
@Test
public void modifyDataByVersion() throws Exception
// 查询版本信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
int version = stat.getVersion();
System.out.println(version);
// 根据版本修改数据:如果版本匹配, 则会报错
client.setData().withVersion(version).forPath("/app1", "根据版本修改数据".getBytes(StandardCharsets.UTF_8));
2.5 查询节点
2.5.1 查询数据
@Test
public void queryData() throws Exception
// 1. 查询数据
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes));
运行结果:
2.5.2 查询子节点
@Test
public void queryChildNode() throws Exception
// 2. 查询子结点
List<String> paths = client.getChildren().forPath("/");
for (String path : paths)
System.out.println(path);
运行结果:
2.5.3 查询节点状态信息
@Test
public void queryNodeStatusInfo() throws Exception
Stat stat = new Stat();
System.out.println(stat);
// 3.查询节点状态信息
byte[] bytes = client.getData().storingStatIn(stat).forPath("/app1");
System.out.println(stat);
运行结果:
2.6 Watch事件监听
- ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
- ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
- ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便 需要开发人员自己反复注册Watcher,比较繁琐。
- Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。 ZooKeeper提供了三种Watcher:
- NodeCache : 只是监听某一个特定的节点
- PathChildrenCache : 监控一个ZNode的子节点.
- TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合
2.6.1 NodeCache (监听某一个特定的节点 )
@Test
public void testNodeCache() throws Exception
// 1. 创建NodeCache对象
NodeCache nodeCache = new NodeCache(client, "/app1");
// 2. 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener()
@Override
public void nodeChanged() throws Exception
System.out.println("节点信息发生变化");
System.out.println("更新后的数据为: " + new String(nodeCache.getCurrentData().getData()));
);
// 3. 开启监听。true代表一次性把缓存的节点数据加载过来.
nodeCache.start(true);
while (true)
// 避免一直空转
Thread.sleep(500);
运行结果:
2.6.2 PathChildrenCache (监听某一节点的子节点的变化 但是不可以监听自己)
@Test
public void testPathChildCache() throws Exception
// true 代表加载缓存数据
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app1", true);
// 绑定监听器
pathChildrenCache.getListenable(以上是关于Zookeeper -- Zookeeper JavaAPI相关操作(Curator介绍Curator API 常用操作(节点的CRUD,Watch事件监听)分布式锁模拟12306售票案例)的主要内容,如果未能解决你的问题,请参考以下文章
Zookeeper -- 初识ZookeeperZookeeper的安装和配置Zookeeper命令操作(Zookeeper数据模型 Zookeeper服务端 / 客户端常用命令)