基于Curator的Zookeeper操作实战

Posted 外星喵

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Curator的Zookeeper操作实战相关的知识,希望对你有一定的参考价值。

前言

Zookeeper操作方式

这篇文章主要说的是利用java来操作zookeeper,就如操作mysql数据库一样,主要是实现增删改查功能,而实现这些功能的方式主要有以下三种:

  1. zookeeper官方提供的原生的api
  2. zkclient
  3. Apache Curator

简单说下三种方式的区别与各自的优劣:

  • zookeeper自带的客户端是官方提供的,比较底层、使用起来写代码麻烦,很多功能需要自己来实现、不够直接。
  • zkclient是另一个开源的ZooKeeper客户端。
  • Apache Curator是Apache的开源项目,封装了zookeeper自带的客户端,使用相对简便,易于使用。

Curator介绍

这篇主要讲解使用基于Curator操作Zookeeper的操作案例。

Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作。通过查看官方文档,可以发现Curator主要解决了三类问题:

  • 封装ZooKeeper client与ZooKeeper server之间的连接处理
  • 提供了一套Fluent风格的操作API
  • 提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装

基于Curator使用Zookeeper

导入依赖

        <!-- ZooKeeper 之 Curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.2.0</version>
        </dependency>

Zookeeper客户端

创建客户端

使用CuratorFrameworkFactory的两个静态工厂方法来创建zookeeper客户端对象,主要需要设置以下参数:

  • connectString:zookeeper服务器地址及端口号,多个zookeeper服务器地址以“,”分隔。
  • sessionTimeoutMs:会话超时时间,单位毫秒,默认为60000ms。
  • connectionTimeoutMs:连接超时时间,单位毫秒,默认为15000ms。
  • namespace: 为了实现不同的Zookeeper业务之间的隔离,往往会为每个业务分配一个独立的命名空间,即指定一个Zookeeper根路径
  • retryPolicy:重试连接策略,有四种实现,分别为:
    • ExponentialBackoffRetry(重试指定的次数, 且每一次重试之间停顿的时间逐渐增加)
    • RetryNtimes(指定最大重试次数的重试策略)
    • RetryOneTimes(仅重试一次)、
    • RetryUntilElapsed(一直重试直到达到规定的时间)
    //Zookeeper客户端
    private CuratorFramework client; 

    @Before
    public void testConnect() {
        client = CuratorFrameworkFactory.builder()
                .connectString("192.168.124.18:2181")  //连接地址和端口号
                .sessionTimeoutMs(10000)     //会话超时时间
                .connectionTimeoutMs(1000)   // 连接超时时间
                .namespace("/test")      //名称空间
                .retryPolicy(new ExponentialBackoffRetry(1000, 10))  //重试策略
                .build();
        client.start();//开启连接
    }

关闭客户端

    @After
    public void testClose() {
        if (client != null) {
            client.close();
        }
    }

节点操作

创建节点

主要有以下四种:

  1. 创建普通节点(默认即是持久化节点)
  2. 创建多集节点
  3. 创建指定节点类型的节点:
    • PERSISTENT:持久化节点
    • PERSISTENT_SEQUENTIAL:持久化且带序列号节点
    • EPHEMERAL:临时节点
    • EPHEMERAL_SEQUENTIAL:临时且带序列号节点
  4. 创建带指定数据的节点
    @Test
    public void testCreateNode() throws Exception {
        //1.普通创建
        client.create().forPath("/test1");
        //2.创建多集节点
        client.create().creatingParentContainersIfNeeded().forPath("/test2/demo");
        //3.设置创建节点类型
        client.create().withMode(CreateMode.EPHEMERAL).forPath("/test3");
        //4.创建带指定数据的节点
        client.create().forPath("/test4", "This is test4".getBytes(StandardCharsets.UTF_8));
    }

更新节点

    @Test
    public void testUpdateNode() throws Exception {
        //更新一个节点的数据内容
        client.setData().forPath("/test4", "第一次更新".getBytes());
        //更新一个节点的数据内容,强制指定版本进行更新
        client.setData().withVersion(1).forPath("/test4", "第二次更新".getBytes());
    }

查询节点

    @Test
    public void testQueryNode() throws Exception {
        Stat stat1 = client.checkExists().forPath("/test1");
        System.out.println("路径节点/test1是否存在:" + (stat1 != null));

        byte[] bytes = client.getData().forPath("/test2");
        System.out.println("路径/test2的数据是:" + new String(bytes));

        Stat stat2 = new Stat();
        bytes = client.getData().storingStatIn(stat2).forPath("/test4");
        System.out.println("路径/test4的数据是:" + new String(bytes));
        System.out.println("路径/test4的数据的版本是:" + stat2.getVersion());
    }

删除节点

    @Test
    public void testDeleteNode() throws Exception {

        //只能删除叶子节点
        client.delete().forPath("/test1");
        //删除一个节点,并递归删除其所有子节点
        client.delete().deletingChildrenIfNeeded().forPath("/test2");
        //强制指定版本进行删除
        client.delete().withVersion(1).forPath("/test4");
        //注意:由于一些网络原因,上述的删除操作有可能失败,使用guaranteed()
        // 如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止
        client.delete().guaranteed().forPath("/test3");
    }

异步接口

上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。

CuratorEventType

事件类型对应CuratorFramework实例的方法
CREATEcreate()
DELETEdelete()
EXISTScheckExists()
GET_DATAgetData()
SET_DATAsetData()
CHILDRENgetChildren()
SYNCsync(String,Object)
GET_ACLgetACL()
SET_ACLsetACL()
WATCHEDWatcher(Watcher)
CLOSINGclose()

响应码(getResultCode())

响应码意义
0OK,即调用成功
-4ConnectionLoss,即客户端与服务端断开连接
-110NodeExists,即节点已经存在
-112SessionExpired,即会话过期

异步创建节点

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      
      	 System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

监听器

Curator提供了三种Watcher(Cache)来监听结点的变化:

  • Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。

  • Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。

  • Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

Node Cache

    @Test
    public void testNodeCache() throws Exception {
        //最后一个参数表示是否进行压缩
        NodeCache cache = new NodeCache(client, "/super", false);
        cache.start(true);
        //只会监听节点的创建和修改,删除不会监听
        cache.getListenable().addListener(() -> {
            System.out.println("路径:" + cache.getCurrentData().getPath());
            System.out.println("数据:" + new String(cache.getCurrentData().getData()));
            System.out.println("状态:" + cache.getCurrentData().getStat());
        });

        client.create().forPath("/nodeCache", "1234".getBytes());
        Thread.sleep(1000);
        client.setData().forPath("/nodeCache", "5678".getBytes());
        Thread.sleep(1000);
        client.delete().forPath("/nodeCache");
        Thread.sleep(5000);
    }

Path Cache

    @Test
    public void testPathChildrenCache() throws Exception {
        //第三个参数表示是否接收节点数据内容
        PathChildrenCache childrenCache = new PathChildrenCache(client, "/super", true);
        /**
         * 如果不填写这个参数,则无法监听到子节点的数据更新
         如果参数为PathChildrenCache.StartMode.BUILD_INITIAL_CACHE,则会预先创建之前指定的/super节点
         如果参数为PathChildrenCache.StartMode.POST_INITIALIZED_EVENT,效果与BUILD_INITIAL_CACHE相同,只是不会预先创建/super节点
         参数为PathChildrenCache.StartMode.NORMAL时,与不填写参数是同样的效果,不会监听子节点的数据更新操作
         */
        childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        childrenCache.getListenable().addListener((framework, event) -> {
            switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +
                            new String(event.getData().getData()) + ",状态:" + event.getData().getStat());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +
                            new String(event.getData().getData()) + ",状态:" + event.getData().getStat());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +
                            new String(event.getData().getData()) + ",状态:" + event.getData().getStat());
                    break;
                default:
                    break;
            }
        });

        client.create().forPath("/pathChildrenCache", "123".getBytes());
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1内容".getBytes());
        //经测试,不会监听到本节点的数据变更,只会监听到指定节点下子节点数据的变更
        client.setData().forPath("/pathChildrenCache", "456".getBytes());
        client.setData().forPath("/pathChildrenCache/c1", "c1新内容".getBytes());
        client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");
        Thread.sleep(5000);
    }

Tree Cache

    @Test
    public void testTreeCache() throws Exception {
        TreeCache treeCache = new TreeCache(client, "/treeCache");
        treeCache.start();
        treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {
            switch (treeCacheEvent.getType()) {
                case NODE_ADDED:
                    System.out.println("NODE_ADDED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
                            + ",状态:" + treeCacheEvent.getData().getStat());
                    break;
                case NODE_UPDATED:
                    System.out.println("NODE_UPDATED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
                            + ",状态:" + treeCacheEvent.getData().getStat());
                    break;
                case NODE_REMOVED:
                    System.out.println("NODE_REMOVED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())
                            + ",状态:" + treeCacheEvent.getData().getStat());
                    break;
                default:
                    break;
            }
        });

        client.create().forPath("/treeCache", "123".getBytes());
        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/treeCache/c1", "456".getBytes());
        client.setData().forPath("/treeCache", "789".getBytes());
        client.setData().forPath("/treeCache/c1", "910".getBytes());
        client.delete().forPath("/treeCache/c1");
        client.delete().forPath("/treeCache");
        Thread.sleep(5000);

    }

事务管理

碰到异常,事务会回滚

    @Test
    public void testTransaction() throws Exception{
        //定义几个基本操作
        CuratorOp createOp = client.transactionOp().create()
                .forPath("/curator/one_path","some data".getBytes());
        
        CuratorOp setDataOp = client.transactionOp().setData()
                .forPath("/curator","other data".getBytes());
        
        CuratorOp deleteOp = client.transactionOp().delete()
                .forPath("/curator");
        
        //事务执行结果
        List<CuratorTransactionResult> results = client.transaction()
                .forOperations(createOp,setDataOp,deleteOp);
        
        //遍历输出结果
        //因为节点“/curator”存在子节点,所以在删除的时候将会报错,事务回滚
        for(CuratorTransactionResult result : results){
            System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());
        }
    }

Leader选举

在分布式计算中, leader elections是很重要的一个功能, 这个选举过程是这样子的: 指派一个进程作为组织者,将任务分发给各节点。 在任务开始前, 哪个节点都不知道谁是leader(领导者)或者coordinator(协调者). 当选举算法开始执行后, 每个节点最终会得到一个唯一的节点作为任务leader. 除此之外, 选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。

在zookeeper集群中,leader负责写操作,然后通过Zab协议实现follower的同步,leader或者follower都可以处理读操作。

Curator 有两种leader选举的recipe,分别是

  • LeaderSelector:所有存活的客户端不间断的轮流做Leader
  • LeaderLatch:一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权
public class CuratorLeaderTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_PATH = "/zktest";

    public static void main(String[] args) throws InterruptedException {
        LeaderSelectorListener listener = new LeaderSelectorListener() {
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println(Thread.currentThread().getName() + " take leadership!");

                // takeLeadership() method should only return when leadership is being relinquished.
                Thread.sleep(5000L);

                System.out.println(Thread.currentThread().getName() + " relinquish leadership!");
            }

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState state) {
            }
        };

        new Thread(() -> {
            registerListener(listener);
        }).start();

        new Thread(() -> {
            registerListener(listener);
        }).start();

        new Thread(() -> {
            registerListener(listener);
        }).start();

        Thread.sleep(Integer.MAX_VALUE);
    }

    private static void registerListener(LeaderSelectorListener listener) {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();

        // 2.Ensure path
        try {
            new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 3.Register listener
        LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);
        selector.autoRequeue();
        selector.start();
    }
}

分布式锁

分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。

具体实现请参考我的上一篇文章:Zookeeper实现分布式锁

以上是关于基于Curator的Zookeeper操作实战的主要内容,如果未能解决你的问题,请参考以下文章

flink hadoop 从0~1分布式计算与大数据项目实战zookeeper内部原理流程简介以及java curator client操作集群注册,读取

zookeeper curator ( 实战一)

Zookeeper客户端zkClient和curator的操作

Zookeeper客户端zkClient和curator的操作

zookeeper开源客户端curator

分布式锁三大技术方案实战——基于zookeeper方式实现分布式锁