zookeeper java客户端之curator详解

Posted zhjh256

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper java客户端之curator详解相关的知识,希望对你有一定的参考价值。

  关于zookeeper的原理解析,可以参见zookeeper核心原理详解,本文所述大多数实践基于对zookeeper原理的首先理解。

  Curator是Netflix公司开源的一个Zookeeper客户端,目前是apache顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量,相当于netty之于socket编程。提供了一套易用性和可读性更强的Fluent风格的客户端API框架。官网为http://curator.apache.org/

  除此之外,Curator中还提供了Zookeeper各种应用场景(Recipe,如共享锁服务、Master选举机制和分布式计算器等)的抽象封装。所以说啊,不管是做底层库还是应用,用户体验真的很重要。

关于zookeeper的java客户端

  Zookeeper的官方客户端提供了基本的操作,比如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。但对于开发人员来说,Zookeeper提供的基本操纵还是有一些不足之处。典型的缺点为:

(1)Zookeeper的Watcher是一次性的,每次触发之后都需要重新进行注册;
(2)Session超时之后没有实现重连机制;
(3)异常处理繁琐,Zookeeper提供了很多异常,对于开发人员来说可能根本不知道该如何处理这些异常信息;
(4)只提供了简单的byte[]数组的接口,没有提供针对对象级别的序列化;
(5)创建节点时如果节点存在抛出异常,需要自行检查节点是否存在;
(6)删除节点无法实现级联删除;

  因此,产生了两款主流的三方zk客户端,ZkClient和Curator。第一个主流的三方zk客户端是ZkClient,由Datameer的工程师开发,对Zookeeper的原生API进行了包装,实现了超时重连、Watcher反复注册等功能。像dubbo等框架对其也进行了集成使用。

  虽然ZkClient对原生API进行了封装,但也有它自身的不足之处:

  • 几乎没有参考文档;
  • 异常处理简化(抛出RuntimeException);
  • 重试机制比较难用;
  • 没有提供各种使用场景的实现;

  注:除此之外,很多依赖zookeeper的中间件或大数据组件都配备了与之相适应的zookeeper客户端,例如hbase、hadoop、fabric8等。

  因此,除了早期集成外,目前新的框架和系统很少使用ZkClient,因此本文详细解析curator。如果读者对zkclient感兴趣,可以参考https://www.jianshu.com/p/d6de2d21d744去,其官网为https://github.com/sgroschupf/zkclient,已经基本不活跃了、更新极少且star不过千。

curator依赖添加

  Curator的Maven依赖如下:一般直接使用curator-recipes就行了,如果需要自己封装一些底层些的功能的话,例如增加连接管理重试机制等,则可以引入curator-framework包。client是低级api。

            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
<!--All of the recipes listed on the ZooKeeper recipes doc (except two phase commit).--> <version>${apache-curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId>
<!-- High-level API that greatly simplifies using ZooKeeper. --> <version>${apache-curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId>
<!-- Low-level API --> <version>${apache-curator.version}</version> </dependency>

  最新版本可以从https://mvnrepository.com/artifact/org.apache.curator/curator-client查阅,不过需要注意的是,curator和zookeeper本身的依赖(尤其是zookeeper 3.4和3.5不兼容,导致的客户端也是不一样)对应关系。目前绝大多数使用2.x的版本。

典型的zk场景

Client操作

  利用Curator提供的客户端API,可以完全实现在zkCli.sh原生客户端的各种功能。值得注意的是,Curator采用流式风格API。准确的说是类似JPA化。由于针对zk/redis等的操作都相当简单,因此这种模式在这种场景下是比较合适的。如下:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;

/**
 * Curator framework‘s client test.
 * Output:
 *  $ create /zktest hello 
 *  $ ls / 
 *  [zktest, zookeeper]
 *  $ get /zktest 
 *  hello
 *  $ set /zktest world 
 *  $ get /zktest 
 *  world
 *  $ delete /zktest 
 *  $ ls / 
 *  [zookeeper]
 */
public class CuratorClientTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "10.20.30.17:2181";
    private static final String ZK_PATH = "/zktest";

    public static void main(String[] args) throws Exception {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        // 2.Client API test
        // 2.1 Create node
        String data1 = "hello";
        print("create", ZK_PATH, data1);
        client.create().
                creatingParentsIfNeeded().
                forPath(ZK_PATH, data1.getBytes());

        // 2.2 Get node and data
        print("ls", "/");
        print(client.getChildren().forPath("/"));
        print("get", ZK_PATH);
        print(client.getData().forPath(ZK_PATH));

        // 2.3 Modify data
        String data2 = "world";
        print("set", ZK_PATH, data2);
        client.setData().forPath(ZK_PATH, data2.getBytes());
        print("get", ZK_PATH);
        print(client.getData().forPath(ZK_PATH));

        // 2.4 Remove node
        print("delete", ZK_PATH);
        client.delete().forPath(ZK_PATH);
        print("ls", "/");
        print(client.getChildren().forPath("/"));
    }

    private static void print(String... cmds) {
        StringBuilder text = new StringBuilder("$ ");
        for (String cmd : cmds) {
            text.append(cmd).append(" ");
        }
        System.out.println(text.toString());
    }

    private static void print(Object result) {
        System.out.println(
                result instanceof byte[]
                    ? new String((byte[]) result)
                        : result);
    }

}

  详细的CuratorFramework功能及使用说明可参见https://curator.apache.org/curator-framework/index.html

监听器

  Curator提供了三种Watcher(Cache)来监听结点的变化:

  • Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
  • Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
  • Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.RetryNTimes;

/**
 * Curator framework watch test.
 */
public class CuratorWatcherTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_PATH = "/zktest";

    public static void main(String[] args) throws Exception {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        // 2.Register watcher
        PathChildrenCache watcher = new PathChildrenCache(
                client,
                ZK_PATH,
                true    // if cache data
        );
        watcher.getListenable().addListener((client1, event) -> {
            ChildData data = event.getData();
            if (data == null) {
                System.out.println("No data in event[" + event + "]");
            } else {
                System.out.println("Receive event: "
                        + "type=[" + event.getType() + "]"
                        + ", path=[" + data.getPath() + "]"
                        + ", data=[" + new String(data.getData()) + "]"
                        + ", stat=[" + data.getStat() + "]");
            }
        });
        watcher.start(StartMode.BUILD_INITIAL_CACHE);
        System.out.println("Register zk watcher successfully!");

        Thread.sleep(Integer.MAX_VALUE);
    }

}

输出如下:

Java: zk client start successfully!
Java: Register zk watcher successfully!

zkCli: [zk: localhost:2181(CONNECTED) 11] create /zktest/hello mydata
Java: Receive event: type=[CHILD_ADDED], path=[/zktest/hello], data=[mydata], stat=[121,121,1434001221097,1434001221097,0,0,0,0,6,0,121]

zkCli: [zk: localhost:2181(CONNECTED) 12] set /zktest/hello otherdata
Java: Receive event: type=[CHILD_UPDATED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]

zkCli: [zk: localhost:2181(CONNECTED) 13] delete /zktest/hello
Java: Receive event: type=[CHILD_REMOVED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]

 下列两个系列称为Recipe(专题,关于这个recipe应该如何翻译,LZ做了研究,直译是菜谱,肯定不对,也有叫做攻略的,貌似也不正确,所以叫专题可能确实更合适),完整的curator recipe实现可参见https://curator.apache.org/curator-recipes/index.html。

分布式协调

  一般我们称分布式锁的时候,指的是短时的分布式锁,因此一般采用redis实现,而zk下的称之为分布式协调更合理,因为它通常时间更长。比如分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。大多数的分布式协调采用临时节点+watch机制实现。除了直接采用原始的监听器自己实现外,curator实现了分布式的IPM(进程间锁)。Curator的机制为:使用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据。由于内部采用zookeeper的临时顺序节点特性,一旦客户端失去连接后,则就会自动清除该节点,redis则只能等待超时。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;

import java.util.concurrent.TimeUnit;

/**
 * Curator framework‘s distributed lock test.
 */
public class CuratorDistrLockTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_LOCK_PATH = "/zktest";

    public static void main(String[] args) throws InterruptedException {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();
        System.out.println("zk client start successfully!");

        Thread t1 = new Thread(() -> {
            doWithLock(client);
        }, "t1");
        Thread t2 = new Thread(() -> {
            doWithLock(client);
        }, "t2");

        t1.start();
        t2.start();
    }

    private static void doWithLock(CuratorFramework client) {
        InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
        try {
            if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
                System.out.println(Thread.currentThread().getName() + " hold lock");
                Thread.sleep(5000L);
                System.out.println(Thread.currentThread().getName() + " release lock");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

}

   当然实际中会更加复杂,比如只是某些接口需要全局单点,但是服务的粒度又没有拆分到独立的微服务。另外,客户端宕机后锁是否自动释放也是要考虑的,否则其他节点就无法接管。InterProcessMutex的实现分析可以参考:https://www.jianshu.com/p/5fa6a1464076

Leader选举

  在分布式系统中,不少系统也采用和zk本身一样的leader/follower架构,因此存在leader选举的问题,例如es/kafka(注:在一般分布式系统中,并不会使用到该特性)。curator就包含了对应的解决方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;

/**
 * Curator framework‘s leader election test.
 * Output:
 *  LeaderSelector-2 take leadership!
 *  LeaderSelector-2 relinquish leadership!
 *  LeaderSelector-1 take leadership!
 *  LeaderSelector-1 relinquish leadership!
 *  LeaderSelector-0 take leadership!
 *  LeaderSelector-0 relinquish leadership! 
 *      ...
 */
public class CuratorLeaderTest {

    /** Zookeeper info */
    private static final String ZK_ADDRESS = "192.168.1.100:2181";
    private static final String ZK_PATH = "/zktest";

    public static void main(String[] args) throws InterruptedException {
        LeaderSelectorListener listener = new LeaderSelectorListener() {
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println(Thread.currentThread().getName() + " take leadership!");

                // takeLeadership() method should only return when leadership is being relinquished.
                Thread.sleep(5000L);

                System.out.println(Thread.currentThread().getName() + " relinquish leadership!");
            }

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState state) {
            }
        };

        new Thread(() -> {
            registerListener(listener);
        }).start();

        new Thread(() -> {
            registerListener(listener);
        }).start();

        new Thread(() -> {
            registerListener(listener);
        }).start();

        Thread.sleep(Integer.MAX_VALUE);
    }

    private static void registerListener(LeaderSelectorListener listener) {
        // 1.Connect to zk
        CuratorFramework client = CuratorFrameworkFactory.newClient(
                ZK_ADDRESS,
                new RetryNTimes(10, 5000)
        );
        client.start();

        // 2.Ensure path
        try {
            new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 3.Register listener
        LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);
        selector.autoRequeue();
        selector.start();
    }

}

 

以上是关于zookeeper java客户端之curator详解的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper开源客户端Curator之基本功能讲解

ZooKeeper系列—— Java 客户端 Apache Curator

ZooKeeper 之Apache Curator 客户端使用

Zookeeper之Curator客户端对节点的一些监控事件的api使用

Java操作zookeeper

curator-api及zookeeper-java-api的使用总结