三天学会ZooKeeper第三天(全网最细)
Posted 活跃的咸鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了三天学会ZooKeeper第三天(全网最细)相关的知识,希望对你有一定的参考价值。
本篇文章来自黑马程序员ZooKeeper教程
三天学会ZooKeeper第一天(全网最细)
三天学会ZooKeeper第二天(全网最细)
zookeeper 开源客户端curator介绍
curator简介
curator是Netflix公司开源的一个zookeeper客户端,后捐献给apache, curator框架在zookeeper原生API接口上进行了包装,解决了很多zooKeeper客户端非常底层的细节开发。提供zooKeeper各种应用场景(比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。
原生zookeeperAPI的不足:
- 连接对象异步创建,需要开发人员自行编码等待
- 连接没有自动重连超时机制
- watcher一次注册生效一次
- 不支持递归创建树形节点
curator特点:
- 解决session会话超时重连
- watcher反复注册
- 简化开发api
- 遵循Fluent风格的API
- 提供了分布式锁服务、共享计数器、缓存机制等机制
Curator的组件
Maven依赖
我们在实际的应用时,最常用的是curator-recipes。
连接到ZooKeeper
依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
public class testConnection {
public static void main(String[] args) {
// session重连策略
/*
3秒后重连一次,只重连1次
RetryPolicy retryPolicy = new RetryOneTime(3000);
*/
/*
每3秒重连一次,重连3次
RetryPolicy retryPolicy = new RetryNTimes(3,3000);
*/
/*
每3秒重连一次,总等待时间超过10秒后停止重连
RetryPolicy retryPolicy=new RetryUntilElapsed(10000,3000);
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
// IP地址端口号
.connectString("192.168.1.108")
// 会话超时时间
.sessionTimeoutMs(5000)
// 重连机制
.retryPolicy(retryPolicy)
// 命名空间
.namespace("create")
// 构建连接对象
.build();
client.start();
System.out.println(client.isStarted());
client.close();
}
}
官网的打开连接方式
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
新增节点
public class testCreate {
private String ip="192.168.1.108:2181";
CuratorFramework client;
RetryPolicy retryPolicy= new RetryUntilElapsed(2000,3);
@Before
public void conn(){
client= CuratorFrameworkFactory.builder().connectString(ip).
sessionTimeoutMs(5000).
retryPolicy(retryPolicy)
.namespace("create").build();
client.start();
}
@Test
public void create()throws Exception{
client.create()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.READ_ACL_UNSAFE)
.forPath("/node","node1".getBytes());
System.out.println("end!!!");
}
@Test
public void create2() throws Exception{
// 自定义权限列表
// 权限列表
List<ACL> list = new ArrayList<ACL>();
// 授权模式和授权对象
Id id = new Id("ip", "192.168.1.102");
list.add(new ACL(ZooDefs.Perms.ALL, id));
client.create().withMode(CreateMode.PERSISTENT).withACL(list).
forPath("/node2", "node2".getBytes());
System.out.println("结束");
}
@Test
public void create3()throws Exception{
// 递归创建节点树
client.create()
// 递归节点的创建
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/node3/node31", "node31".getBytes());
System.out.println("结束");
}
@Test
public void create4() throws Exception {
// 异步方式创建节点
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
// 异步回调接口
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework
curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点的路径
System.out.println(curatorEvent.getPath());
// 时间类型
System.out.println(curatorEvent.getType());
}
})
.forPath("/node4","node4".getBytes());
Thread.sleep(5000);
System.out.println("结束");
}
@After
public void close(){
client.close();
}
}
更新节点
public class TestSet {
private String ip="192.168.1.102:2181";
CuratorFramework client;
RetryPolicy retryPolicy= new RetryUntilElapsed(2000,3);
@Before
public void conn(){
client= CuratorFrameworkFactory.builder().connectString(ip).
sessionTimeoutMs(5000).
retryPolicy(retryPolicy)
.namespace("set").build();
client.start();
}
@Test
public void set1() throws Exception {
// 更新节点
client.setData()
// arg1:节点的路径
// arg2:节点的数据
.forPath("/node1", "node11".getBytes());
System.out.println("结束");
}
@Test
public void set2() throws Exception {
client.setData()
// 指定版本号
.withVersion(2)
.forPath("/node1", "node1111".getBytes());
System.out.println("结束");
}
@Test
public void set3() throws Exception {
// 异步方式修改节点数据
client.setData()
.withVersion(-1).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework,
CuratorEvent curatorEvent) throws Exception {
// 节点的路径
System.out.println(curatorEvent.getPath());
// 事件的类型
System.out.println(curatorEvent.getType());
}
}).forPath("/node1", "node1".getBytes());
Thread.sleep(5000);
System.out.println("结束");
}
@After
public void close(){
client.close();
}
}
删除节点
public class testDelete {
private String ip="192.168.1.102:2181";
CuratorFramework client;
RetryPolicy retryPolicy= new RetryUntilElapsed(2000,3);
@Before
public void conn(){
client= CuratorFrameworkFactory.builder().connectString(ip).
sessionTimeoutMs(5000).
retryPolicy(retryPolicy)
.namespace("create").build();
client.start();
}
@Test
public void delete1() throws Exception {
// 删除节点
client.delete()
// 节点的路径
.forPath("/node1");
System.out.println("结束");
}
@Test
public void delete2() throws Exception {
client.delete()
// 版本号
.withVersion(0)
.forPath("/node1");
System.out.println("结束");
}
@Test
public void delete3() throws Exception {
//删除包含字节点的节点
client.delete()
.deletingChildrenIfNeeded()
.withVersion(-1)
.forPath("/node1");
System.out.println("结束");
}
@Test
public void delete4() throws Exception {
// 异步方式删除节点
client.delete()
.deletingChildrenIfNeeded()
.withVersion(-1)
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework
curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点路径
System.out.println(curatorEvent.getPath());
// 事件类型
System.out.println(curatorEvent.getType());
}
})
.forPath("/node1");
Thread.sleep(5000);
System.out.println("结束");
}
@After
public void close(){
client.close();
}
}
查看节点
public class TestGet {
private String ip="192.168.1.102:2181";
CuratorFramework client;
RetryPolicy retryPolicy= new RetryUntilElapsed(2000,3);
@Before
public void conn(){
client= CuratorFrameworkFactory.builder().connectString(ip).
sessionTimeoutMs(5000).
retryPolicy(retryPolicy)
.namespace("create").build();
client.start();
}
@Test
public void get1() throws Exception {
// 读取节点数据
byte [] bys=client.getData()
// 节点的路径
.forPath("/node1");
System.out.println(new String(bys));
}
@Test
public void get2() throws Exception {
// 读取数据时读取节点的属性
Stat stat=new Stat();
byte [] bys=client.getData()
// 读取属性
.storingStatIn(stat)
.forPath("/node1");
System.out.println(new String(bys));
System.out.println(stat.getVersion());
}
@Test
public void get3() throws Exception {
// 异步方式读取节点的数据
client.getData()
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework
curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 节点的路径
System.out.println(curatorEvent.getPath());
// 事件类型
System.out.println(curatorEvent.getType());
// 数据
System.out.println(new
String(curatorEvent.getData()));
}
})
.forPath("/node1");
Thread.sleep(5000);
System.out.println("结束");
}
@After
public void close(){
client.close();
}
}
查看子节点
public class TestGetchlden {
private String ip="192.168.1.102:2181";
CuratorFramework client;
RetryPolicy retryPolicy= new RetryUntilElapsed(2000,3);
@Before
public void conn(){
client= CuratorFrameworkFactory.builder().connectString(ip).
sessionTimeoutMs(5000).
retryPolicy(retryPolicy)
.namespace("get").build();
client.start()三天学会ZooKeeper第二天(全网最细)