ZooKeeperCurator API介绍及基本使用

Posted 辰纪忻

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeperCurator API介绍及基本使用相关的知识,希望对你有一定的参考价值。

目录

介绍

使用

1.建立连接

 2.节点创建

3.查询节点

4.修改节点

5.删除节点

Watch事件监听

1.概述

2.三种Watcher的实现

1. NodeCache

2.PathChildrenCache

 3.TreeCache


介绍

Curator是Apache ZooKeeper的Java客户端库,常见的ZooKeeper Java API有原生的Java API、ZkClient、Curator。Curator框架在zookeeper原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。提供ZooKeeper各种应用场景如:分布式锁服务、集群领导选举、分布式队列等的抽象封装,是最好用、最流行的zookeeper的客户端。

官网:Apache Curator –

Curator提供的常见操作有建立连接、节点创建、节点查询、节点删除、节点修改、Watch事件监听、分布式锁的实现等。

使用

1.建立连接

首先在IDEA里新建一个空的maven项目,引入相关依赖和插件

<dependencies>
        <!-- junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.1</version>
            <scope>test</scope>
        </dependency>
        <!-- curator recipes -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.2.0</version>
        </dependency>
        <!-- curator framework -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.2.0</version>
        </dependency>

        <!-- 日志相关 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

 连接有两种建立方法如下,个人比较推荐使用第二种

//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//方法一
/*
            connectString  连接字符串--> ip:端口号,ip:端口号...
            sessionTimeoutMs  会话超时时间(ms) 默认值为60*1000
            connectionTimeoutMs  连接超时时间(ms) 默认值为45*1000
            retryPolicy  重试策略
*/
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.5.156:2181", 60 * 1000, 45 * 1000, retryPolicy);
client.start(); //开启连接
//方法二
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.5.156:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(45 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("jason")  //加上namespace可以使每次创建或修改的节点都是在/jason/下进行的
                .build();
client.start();

 2.节点创建

已建立连接的基础上就可以开始对节点进行一些操作了,此处的client是建立连接时的CuratorFramework对象,通过调用create().forPath()创建节点,创建节点默认存储的值是当前客户端的ip地址,若要自定义值需要再加一个值参数(byte数组类型)。与此同时,节点默认也是持久化的,若有需要则可以通过withMode来指定存储类型。

ps:注意forPath异常的抛出

@Test
public void createTest1() throws Exception {
        //默认节点的值是当前客户端的ip地址,自定义时要注意值的类型为byte数组
        client.create().forPath("/app2","hello".getBytes());

}
@Test
public void createTest2() throws Exception {
        //创建节点时默认为持久化节点
        //若要设置其他类型需要使用到withMode方法
        client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
}

3.查询节点

    /*
        测试查询
        查询数据:get
        查询子节点:ls
        查询子节点状态信息:ls -s
     */
    @Test
    public void getTest() throws Exception {
        //查询节点数据:get
        byte[] data1 = client.getData().forPath("/app1");
        //查询子节点:ls
        List<String> nodes = client.getChildren().forPath("/app1");
        //查询子节点状态信息:ls -s
        Stat status = new Stat();
        client.getData.storingStatIn(status).forPath("/app1");

    }

4.修改节点

普通修改节点数据

    @Test
    public void setTest1() throws Exception {
        client.setData().forPath("/app1","Jason".getBytes());
    }

 通过版本号修改节点数据

首先要查询当前节点的状态信息(参考3.查询节点),然后通过getVersion获取版本号

    @Test
    public void setTest2() throws Exception {
        Stat status = new Stat();
        client.getData().storingStatIn(status).forPath("/app1");
        int version = status.getVersion();
        client.setData().withVersion(version).forPath("/app1","这是修改的数据".getBytes());
    }

5.删除节点

前两行的实现对应了delete和deleteall

由于连接可能出现超时或者网络的波动,通过guaranteed可以保证删除的成功性(尽量加上这个)

要想删除实现回调,则需要调用inBackground,通过匿名内部类或Lambda表达式实现BackgroundCallback接口,重写processResult,执行一些回调任务。

@Test
    public void deleteTest() throws Exception {
        //删除空节点
        client.delete().forPath("/app1");
        //删除非空节点(包含子节点)
        client.delete().deletingChildrenIfNeeded().forPath("/app1");
        //若连接超时或出现异常,以上两种方式可能出现删除失败的情况,于是可以通过以下方式保证删除的成功
        client.delete().guaranteed().forPath("/app1");
        //回调
        client.delete().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                System.out.println(curatorEvent);
            }
        }).forPath("/app1");
    }

Watch事件监听

1.概述

ZooKeeper允许用户在指定节点上注册一些Watcher,并在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是实现分布式协调服务的重要特性。

ZooKeeper中引入了Watcher机制来实现了发布/订阅功能,可以让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。

Curator引入了Cache来实现对ZooKeeper服务端事件的监听。

ZooKeeper提供了三种Watcher:

1.NodeCache:只是监听某一个特定节点

2.PathChildrenCache:监控一个ZNode的子节点 

3.TreeCache:可以监控整个树上的所有节点,类似于NodeCache和PathChildrenCache的组合

2.三种Watcher的实现

1. NodeCache

@Test
    public void testNodeCache() throws Exception {
        //1.创建NodeCache对象
        NodeCache nodeCache = new NodeCache(client,"/app1");
        //2.注册监听 在重写的方法中编写触发事件后要执行的任务
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("节点变化了~");
                //获取当前修改后的值
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println(new String(data));
            }
        });
        //3.开启监听
        nodeCache.start(true);
        while(true){
            //保证当前线程一直在跑,因为要测试相关操作
        }
    }

2.PathChildrenCache

@Test
    public void testPathChildrenCache() throws Exception {
        //1.创建PathChildrenCache对象 cacheData表示是否缓存变化的事件
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app1",true);
        //2.注册监听 在重写的方法中编写触发事件后要执行的任务
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                System.out.println("子节点发生变化了~");
                //这里打印一下event
                System.out.println(event);
                //监听子节点的数据变更,并拿到变更后的数据
                //获取子节点的事件类型
                PathChildrenCacheEvent.Type type = event.getType();
                //若事件为update,那么打印当前变化的数据
                if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                    byte[] data = event.getData().getData();
                    System.out.println(new String(data));
                }

            }
        });
        //3.开启监听
        pathChildrenCache.start();
        while (true){
            //保证当前线程一直在跑,因为要测试相关操作
        }
    }

 3.TreeCache

@Test
    public void testTreeCache() throws Exception {
        //1.创建TreeCache对象
        TreeCache treeCache = new TreeCache(client,"/app1");
        //2.注册监听
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) throws Exception {
                System.out.println("节点或子节点变化了~");
                //看一下事件
                System.out.println(event);
                //具体可以参照前两个
            }
        });
        //3.开启监听
        treeCache.start();
        while (true){
            //保证当前线程一直在跑,因为要测试相关操作
        }
    }

以上是关于ZooKeeperCurator API介绍及基本使用的主要内容,如果未能解决你的问题,请参考以下文章

蓝牙API介绍及基本功能实现

ASP.NET Aries 3.0发布(附带通用API设计及基本教程介绍)

javascript:javascript基本介绍及基本语法

Glide介绍及基本使用方法

KONG GATEWAY 基本介绍及安装

Netty框架之概述及基本组件介绍