java中操作zookeeper,curator操作zookeeper

Posted z街角的风铃y

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java中操作zookeeper,curator操作zookeeper相关的知识,希望对你有一定的参考价值。

2.4 java中操作zookeeper

引入依赖

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.14</version>
    </dependency>
</

1

创建zookeeper会话

org.apache.zookeeper.ZooKeeper类的构造方法用于创建zookeeper客户端与服务端之间的会话。该类提供了如下几个构造方法:

 

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

构造方法参数说明:

 

connectString:指zk的服物器列表,以英文输入法下逗号分割的host:port,比如?192.168.1.1:2181, 192.168.1.2:2181,也可以通过在后面跟着根目录,表示此客户端的操作都是在此根目录下,比如:比如192.168.1.1:2181,192.168.1.2:2181/zk-book,表示此客户端操作的节点都是在/zk-book根目录下,比如创建/foo/bar,实际完整路径为/zk-book/foo/bar;

sessionTimeout:会话超时时间,单位是毫秒,当在这个时间内没有收到心跳检测,会话就会失效;

watcher:注册的watcher,null表示不设置;

canBeReadOnly:用于标识当前会话是否支持”read-only”模式? ”,“read-only”模式是指当zk集群中的某台机器与集群中过半以上的机器网络端口不同,则此机器将不会接受客户端的任何读写请求,但是,有时候,我们希望继续提供读请求,因此设置此参数为true, 即客户端还以从与集群中半数以上节点网络不通的机器节点中读?数据;

sessionId和sessionPasswd:分别代表会话ID和会话密钥,这两个个参数一起可以唯一确定一个会话,客户端通过这两个参数可以实现客户端会话复用;

创建zookeeper节点

org.apache.zookeeper.ZooKeeper类提供了如下创建zk节点的api:

 

public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)

public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)

1

2

第一个方法以同步的方式创建节点,第二个方法以异步的方式创建节点,需要注意不论同步或异步都不支持递归创建节点,当节点已经存在时,会抛出NodeExistsException异常。

 

create方法参数说明:

 

path:被创建的节点路径,比如:/zk-book/foo;

data[]:节点中的数据,是一个字节数组;

acl:acl策略

createMode:节点类型,枚举类型,有四种选择:持久(PERSISTENT)、持久顺序(PERSISTENT_SEQUENTIAL)、临时(EPHEMERAL)、临时顺序(EPHEMERAL_SEQUENTIAL);

cb:异步回调函数,需要实现接StringCallback接口,当服物器端创建完成后,客户端会自动调用 这个对象的方法processResult;

ctx:用于传递一个对象,可以在回调方法执行的时候使用,通常用于传递业务的上下文信息;

其中org.apache.zookeeper.data.ACL类中有两个成员:

 

private int perms;

private org.apache.zookeeper.data.Id id;

1

2

perms成员是ACL组成Scheme:id:permission中的permission,zk中perms定义如下:

 

public interface Perms {

        int READ = 1 << 0;

 

        int WRITE = 1 << 1;

 

        int CREATE = 1 << 2;

 

        int DELETE = 1 << 3;

 

        int ADMIN = 1 << 4;

 

        int ALL = READ | WRITE | CREATE | DELETE | ADMIN;

    }

org.apache.zookeeper.data.Id类中定义了如下成员:

 

private String scheme;

private String id;

1

2

分别代表ACL组成Scheme:id:permission中的Scheme和id。(ACL参考文档[zookeeper] zookeeper系列二:zookeeper持久节点、临时节点及ACL )

其中org.apache.zookeeper.ZooDefs.Ids接口中有预先定义的几种ACL策略:

 

public interface Ids {

        /**

         * This Id represents anyone.

         */

        public final Id ANYONE_ID_UNSAFE = new Id("world", "anyone");

 

        /**

         * This Id is only usable to set ACLs. It will get substituted with the

         * Id‘s the client authenticated with.

         */

        public final Id AUTH_IDS = new Id("auth", "");

 

        /**

         * This is a completely open ACL .

         * 相当于 world:anyone:cdrwa

         */

        public final ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList<ACL>(

                Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));

 

        /**

         * This ACL gives the creators authentication id‘s all permissions.

         * 相当于  相?于auth:用户:密码,但是需要通过ZooKeeper的addAuthInfo添加对应的用户和密码对

         */

        public final ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList<ACL>(

                Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));

 

        /**

         * This ACL gives the world the ability to read.

         * 相当于world:anyone:r,即所有人拥有读权限

         */

        public final ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList<ACL>(

                Collections

                        .singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));

    }

删除zookeeper节点

org.apache.zookeeper.ZooKeeper类提供了如下删除zk节点的api:

 

// 以同步的方式删除节点

public void delete(final String path, int version)

        throws InterruptedException, KeeperException

// 以异步的方式删除节点,如果写测试代码,客户端主线程不能退出,否则可能请求没有发到服物器或者异步回调不成功

public void delete(final String path, int version, VoidCallback cb, Object ctx)

1

2

3

4

5

参数说明:

 

path:被删除节点的路径

version:节点的数据版本,如果指定的版本不是最新版本,将会报错

cb:异步回调函数

ctx:传递的上下文信息,即操作之前的信息传递到删除之后的异步回调函数里面

获取zookeeper子节点

org.apache.zookeeper.ZooKeeper类提供了如下获取zk子节点的api:

 

public List<String> getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException

public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException

public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)

public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)

public List<String> getChildren(final String path, Watcher watcher, Stat stat)

public List<String> getChildren(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException

public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)

public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)

参数说明:

 

path:数据节点路径,比如?/zk-book/foo,获取该路径下的子节点列表

watcher:给节点设置的watcher,如果path对应节点的子节点数量发生变化,将会得到通知,允许?null

watch:使用使用默认watch,true的话当删除path节点或path子节点数量发生变化则默认watch或得到通知

stat:指定数据节点的状态信息

cb:异步回调函数

ctx:用于传递一个对象,可以在回调方法执行的时候使用,通常用于传递业务的上文信息

获取zookeeper节点数据

org.apache.zookeeper.ZooKeeper类提供了如下获取zk节点数据的api:

 

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException

public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException

public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)

public void getData(String path, boolean watch, DataCallback cb, Object ctx)

参数说明:

 

path:数据节点的路径,比如:/zk-book/foo,获取该路径节点的数据;

watcher:设置watcher后,如果path对应节点的数据发生变化(设置新的数据或删除节点),将会得到通知,允许?null;

watch:是否使用默认的watcher;

stat:获取到的数据节点的状态信息将会保存到stat变量中,stat定义如下成员:

private long czxid;

private long mzxid;

private long ctime;

private long mtime;

private int version;

private int cversion;

private int aversion;

private long ephemeralOwner;

private int dataLength;

private int numChildren;

private long pzxid;

代表了数据节点的状态信息;

cb:异步回调函数;

ctx:用于传递一个对象,可以在回调方法执行的时候用,通常用于传递业务的上下文信息;

修改zookeeper节点数据

org.apache.zookeeper.ZooKeeper类提供了如下修改zk节点数据的api:

 

public Stat setData(final String path, byte data[], int version) throws KeeperException, InterruptedException

public void setData(final String path, byte data[], int version, StatCallback cb, Object ctx)

参数说明:

 

path:被修改的节点路径;

data:新的数据

version:指定的数据节点的版本,如果指定的版本不是最新版本,将会报错;

cb:异步回调函数;

ctx:传递的上下文信息,即操作之前的信息传递到删除之后的异步回调函数里面;

检查zookeeper节点是否存在

org.apache.zookeeper.ZooKeeper类提供了如下检查zk节点是否存在的api:

 

public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException

public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException

public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)

public void exists(String path, boolean watch, StatCallback cb, Object ctx)

参数说明:

 

path:数据节点的路径;

watcher:需要注册的watcher,当监听的节点被创建、被删除或者被更新时该watcher会得到通知;

watch:是否使用默认的watcher;

cb:异步回调函数;

ctx:用于传递一个对象,可以在回调方法执行的时候用,通常用于传递业务的上下文信息;

zookeeper API使用示例

如下示例演示了zookeeper api的使用:

 

package com.ctrip.flight.test.zookeeper;

 

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.ACL;

import org.apache.zookeeper.data.Id;

import org.apache.zookeeper.data.Stat;

import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

 

import java.util.ArrayList;

import java.util.List;

 

public class ZkClientTest {

 

    private static final String PARENT_PATH = "/zkClientTest";

 

    private static final String CHILD_PATH = "/zkClientTest/childNodeTest";

 

    private static final String IDENTITY = "zhangsan:123456";

 

    public static void main(String[] args) {

        try {

            DefaultWatcher defaultWatcher = new DefaultWatcher();

 

            ChildrenWatcher childrenWatcher = new ChildrenWatcher();

 

            ParentWatcher parentWatcher = new ParentWatcher();

 

            // 创建会话

            ZooKeeper client = new ZooKeeper("192.168.0.102:2181,192.168.0.102:2182,192.168.0.102:2183", 30000, defaultWatcher);

 

            client.addAuthInfo("digest", IDENTITY.getBytes());

 

            Stat stat = client.exists(PARENT_PATH, false);

            if (null != stat) {

                client.delete(PARENT_PATH, -1);

            }

 

            // 创建节点,临时节点不能有子节点,所以父节点是持久节点

            client.create(PARENT_PATH, "zkClientTestData_v1".getBytes(), getAcl(), CreateMode.PERSISTENT);

 

            // 创建子节点

            client.create(CHILD_PATH, "childNodeData_v1".getBytes(), getAcl(), CreateMode.EPHEMERAL);

 

            // 获取子节点信息

            Stat childStat = new Stat();

            List<String> childs = client.getChildren(PARENT_PATH, childrenWatcher, childStat);

            System.out.println(PARENT_PATH + "‘s childs:" + childs);

            System.out.println(PARENT_PATH + "‘s stat:" + childStat);

 

            Thread.sleep(1000);

 

            // 获取父节点数据

            Stat parentStat = new Stat();

            byte[] parentData = client.getData(PARENT_PATH, parentWatcher, parentStat);

            System.out.println(PARENT_PATH + "‘s data: " + new String(parentData));

            System.out.println(PARENT_PATH + "‘s stat: " + parentStat);

 

            Thread.sleep(1000);

 

            // 设置子节点数据

            childStat = client.setData(CHILD_PATH, "childNodeData_v2".getBytes(), -1);

            System.out.println(CHILD_PATH + "‘s stat:" + childStat);

            byte[] childData = client.getData(CHILD_PATH, false, childStat);

            System.out.println(CHILD_PATH + "‘s data:" + new String(childData));

 

            Thread.sleep(1000);

 

            // 删除子节点

            client.delete(CHILD_PATH, -1);

            // 判断子节点是否存在

            childStat = client.exists(CHILD_PATH, false);

            System.out.println(CHILD_PATH + " is exist: " + (childStat != null));

 

            client.delete(PARENT_PATH, -1);

 

            client.close();

 

            Thread.sleep(1000);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

 

    /**

     * ACL格式为:schema:id:permission

     * @return

     */

    private static List<ACL> getAcl() throws Exception {

        List<ACL> acls = new ArrayList<>();

 

        // 指定schema

        String scheme = "auth";

        // 指定id

        // String identity = "zhangsan:123456";

        Id id = new Id(scheme, DigestAuthenticationProvider.generateDigest(IDENTITY));

 

        // Perms.ALL的权限为crdwa

        ACL acl = new ACL(ZooDefs.Perms.ALL, id);

 

        acls.add(acl);

 

        return acls;

    }

}

Watcher的使用

Watcher的原理

 

分别是zookeeper服务端、客户端以及客户端的watchManager。

如图所示,客户端向zk注册watcher的同时,会将客户端的watcher对象存储在客户端的WatchManager中;zk服务器触发watch事件后,会向客户端发送通知,客户端线程从watchManager中取出对应watcher执行。

 

客户端如何实现事件通知的动作

客户端只需定义一个类实现org.apache.zookeeper.Watcher接口并实现接口中的如下方法:

 

abstract public void process(WatchedEvent event);

1

即可在得到通知后执行相应的动作。参数org.apache.zookeeper.WatchedEvent是zk服务端传过来的事件,有三个成员:

 

final private KeeperState keeperState; // 通知状态

final private EventType eventType; // 事件类型

private String path; // 哪个节点发生的时间

1

2

3

分别代表通知的状态、事件类型和发生事件的节点。

 

keeperState是个枚举对象,代表客户端和zk服务器的链接状态,定义如下:

 

/**

 * Enumeration of states the ZooKeeper may be at the event

 */

 public enum KeeperState {

      /** Unused, this state is never generated by the server */

      @Deprecated

      Unknown (-1),

 

      /** The client is in the disconnected state - it is not connected

       * to any server in the ensemble. */

       Disconnected (0),

 

      /** Unused, this state is never generated by the server */

       @Deprecated

       NoSyncConnected (1),

 

     /** The client is in the connected state - it is connected

      * to a server in the ensemble (one of the servers specified

      * in the host connection parameter during ZooKeeper client

      * creation).

      * /

      SyncConnected (3),

 

      /**

       * Auth failed state

       */

       AuthFailed (4),

 

      /**

       * The client is connected to a read-only server, that is the

       * server which is not currently connected to the majority.

       * The only operations allowed after receiving this state is

       * read operations.

       * This state is generated for read-only clients only since

       * read/write clients aren‘t allowed to connect to r/o servers.

       */

       ConnectedReadOnly (5),

 

       /**

        * SaslAuthenticated: used to notify clients that they are SASL-authenticated,

        * so that they can perform Zookeeper actions with their SASL-authorized permissions.

        */

        SaslAuthenticated(6),

 

       /** The serving cluster has expired this session. The ZooKeeper

        * client connection (the session) is no longer valid. You must

        * create a new client connection (instantiate a new ZooKeeper

        * instance) if you with to access the ensemble.

        */

        Expired (-112);

 

        private final int intValue;     // Integer representation of value

                                        // for sending over wire

 

        KeeperState(int intValue) {

            this.intValue = intValue;

        }

 

        public int getIntValue() {

            return intValue;

        }

 

        public static KeeperState fromInt(int intValue) {

              switch(intValue) {

                  case   -1: return KeeperState.Unknown;

                  case    0: return KeeperState.Disconnected;

                  case    1: return KeeperState.NoSyncConnected;

                  case    3: return KeeperState.SyncConnected;

                  case    4: return KeeperState.AuthFailed;

                  case    5: return KeeperState.ConnectedReadOnly;

                  case    6: return KeeperState.SaslAuthenticated;

                  case -112: return KeeperState.Expired;

 

                  default:

                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");

               }

        }

 }

eventType也是个枚举类型,代表节点发生的事件类型,比如创建新的子节点、改变节点数据等,定义如下:

 

/**

 * Enumeration of types of events that may occur on the ZooKeeper

 */

 public enum EventType {

       None (-1),

       NodeCreated (1),

       NodeDeleted (2),

       NodeDataChanged (3),

       NodeChildrenChanged (4),

       DataWatchRemoved (5),

       ChildWatchRemoved (6);

 

       private final int intValue;     // Integer representation of value

                                       // for sending over wire

 

       EventType(int intValue) {

            this.intValue = intValue;

       }

 

       public int getIntValue() {

            return intValue;

       }

 

       public static EventType fromInt(int intValue) {

            switch(intValue) {

                case -1: return EventType.None;

                case  1: return EventType.NodeCreated;

                case  2: return EventType.NodeDeleted;

                case  3: return EventType.NodeDataChanged;

                case  4: return EventType.NodeChildrenChanged;

                case  5: return EventType.DataWatchRemoved;

                case  6: return EventType.ChildWatchRemoved;

 

                default:

                         throw new RuntimeException("Invalid integer value for conversion to EventType");

            }

       }          

}

keeperState和eventType对应关系如下所示:

 

对于NodeDataChanged事件:无论节点数据发生变化还是数据版本发生变化都会触发(即使被更新数据与新数据一样,数据版本都会发生变化)。

 

对于NodeChildrenChanged事件:新增和删除子节点会触发该事件类型。

 

需要注意的是:WatchedEvent只是事件相关的通知,并没有对应数据节点的原始数据内容及变更后的新数据内容,因此如果需要知道变更前的数据或变更后的新数据,需要业务保存变更前的数据和调用接口获取新的数据

 

如何注册watcher

watcher注册api

可以在创建zk客户端实例的时候注册watcher(构造方法中注册watcher):

 

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,ZKClientConfig conf)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig conf)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider)

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

ZooKeeper的构造方法中传入的watcher将会作为整个zk会话期间的默认watcher,该watcher会一直保存为客户端ZKWatchManager的defaultWatcher成员,如果有其他的设置,这个watcher会被覆盖。

 

除了可以通过ZooKeeper类的构造方法注册watcher外,还可以通过ZooKeeper类中其他一些api来注册watcher,只不过这些api注册的watcher就不是默认watcher了(以下每个注册watcher的方法有很多个重载的方法,就不一一列举出来)。

 

public List<String> getChildren(final String path, Watcher watcher)

// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher

public List<String> getChildren(String path, boolean watch)

// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher

public byte[] getData(String path, boolean watch, Stat stat)

public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx)

// boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher

public Stat exists(String path, boolean watch)

public Stat exists(final String path, Watcher watcher)

watcher注册示例代码

本示例中使用zookeeper自带客户端演示watcher的使用,zookeeper自带客户端有一点需要注意:

 

Watcher设置后,一旦触发一次即会失效,如果需要一直监听,则需要再注册

 

定义默认watcher:

 

/**

 * 测试默认watcher

 */

public class DefaultWatcher implements Watcher {

 

    @Override

    public void process(WatchedEvent event) {

 

        System.out.println("==========DefaultWatcher start==============");

 

        System.out.println("DefaultWatcher state: " + event.getState().name());

 

        System.out.println("DefaultWatcher type: " + event.getType().name());

 

        System.out.println("DefaultWatcher path: " + event.getPath());

 

        System.out.println("==========DefaultWatcher end==============");

    }

}

定义监听子节点变化的watcher:

 

/**

 * 用于监听子节点变化的watcher

 */

public class ChildrenWatcher implements Watcher {

 

    @Override

    public void process(WatchedEvent event) {

 

        System.out.println("==========ChildrenWatcher start==============");

 

        System.out.println("ChildrenWatcher state: " + event.getState().name());

 

        System.out.println("ChildrenWatcher type: " + event.getType().name());

 

        System.out.println("ChildrenWatcher path: " + event.getPath());

 

        System.out.println("==========ChildrenWatcher end==============");

    }

}

定义监听节点变化的watcher:

 

public class DataWatcher implements Watcher {

 

    @Override

    public void process(WatchedEvent event) {

 

        System.out.println("==========DataWatcher start==============");

 

        System.out.println("DataWatcher state: " + event.getState().name());

 

        System.out.println("DataWatcher type: " + event.getType().name());

 

        System.out.println("DataWatcher path: " + event.getPath());

 

        System.out.println("==========DataWatcher end==============");

    }

}

watcher测试代码:

 

public class WatcherTest {

 

    /**

     * 链接zk服务端的地址

     */

    private static final String CONNECT_STRING = "192.168.0.113:2181";

 

    public static void main(String[] args) {

 

        // 除了默认watcher外其他watcher一旦触发就会失效,需要充新注册,本示例中因为

        // 还未想到比较好的重新注册watcher方式(考虑到如果在Watcher中持有一个zk客户端的

        // 实例可能存在循环引用的问题),因此暂不实现watcher失效后重新注册watcher的问题,

        // 后续可以查阅curator重新注册watcher的实现方法。

 

        // 默认watcher

        DefaultWatcher defaultWatcher = new DefaultWatcher();

        // 监听子节点变化的watcher

        ChildrenWatcher childrenWatcher = new ChildrenWatcher();

        // 监听节点数据变化的watcher

        DataWatcher dataWatcher = new DataWatcher();

        try {

            // 创建zk客户端,并注册默认watcher

            ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STRING, 100000, defaultWatcher);

 

            // 让默认watcher监听 /GetChildren 节点的子节点变化

            // zooKeeper.getChildren("/GetChildren", true);

 

            // 让childrenWatcher监听 /GetChildren 节点的子节点变化(默认watcher不再监听该节点子节点变化)

            zooKeeper.getChildren("/GetChildren", childrenWatcher);

 

            // 让dataWatcher监听 /GetChildren 节点本省的变化(默认watcher不再监听该节点变化)

            zooKeeper.getData("/GetChildren", dataWatcher, null);

 

            TimeUnit.SECONDS.sleep(1000000);

        } catch (Exception ex) {

            ex.printStackTrace();

        }

    }

}

 

  1. 使用开源curator操作zookeeper

zookeeper原生api的不足

zookeeper原生api存在以下不足之处:

 

连接的创建是异步的,需要开发人员自行编码实现等待;

连接没有自动的超时重连机制;

Zk本身不提供序列化机制,需要开发人员自行指定,从而实现数据的序列化和反序列化;

Watcher注册一次只会生效一次,需要不断的重复注册;

Watcher本身的使用方式不符合java本身的术语,如果采用监听器的方式,更容易理解;

不支持递归创建树形节点;

zookeeper第三方开源客户端

zookeeper的第三方开源客户端主要有zkClient和Curator。其中zkClient解决了session会话超时重连、Watcher反复注册等问题,提供了更加简洁的api,但zkClient社区不活跃,文档不够完善。而Curator是Apache基金会的顶级项目之一,它解决了session会话超时重连、Watcher反复注册、NodeExitsException异常等问题,Curator具有更加完善的文档,因此我们这里只学习Curator的使用。

 

Curator客户端api介绍

Curator包含了如下几个包:

 

curator-framework:对zookeeper底层api的一些封装;

curator-client:提供一些客户端的操作,如重试策略等;

curator-recipes:封装了一些高级特性,如Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

首先我们在gradle中引入curator的依赖就行:

 

 

curator提供了一种类似jdk8中stream一样的流式操作。

 

创建zookeeper会话

Curator中org.apache.curator.framework.CuratorFrameworkFactory类提供了如下两个创建zookeeper会话的方法:

 

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)

public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

该方法返回一个org.apache.curator.framework.CuratorFramework类型的对象,参数说明如下:

 

connectString:逗号分开的ip:port对;

sessionTimeoutMs:会话超时时间,单位为毫秒,默认是60000ms,指连接建立完后多久没有收到心跳检测,超过该时间即为会话超时;

connectionTimeoutMs:连接创建超时时间,单位为毫秒,默认是15000ms,指客户端与服务端建立连接时多长时间没连接上就算超时;

retryPolicy:重试策略,retryPolicy的类型定义如下

   /**

    * Abstracts the policy to use when retrying connections

    */

    public interface RetryPolicy

    {

             /**

             * Called when an operation has failed for some reason. This method should return

             * true to make another attempt.

             *

               *

              * @param retryCount the number of times retried so far (0 the first time),第几次重试

              * @param elapsedTimeMs the elapsed time in ms since the operation was attempted,到当前重试时刻总的重试时间

              * @param sleeper use this to sleep - DO NOT call Thread.sleep,重试策略

              * @return true/false

              */

              public boolean      allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);

    }

allowRetry返回true继续重试,返回false不再重试

可以通过实现该接口来自定义策略,curator已经为我们提供了若干重试策略:

ExponentialBackoffRetry:该重试策略随着重试次数的增加,sleep的时间呈指数增长,该提供了两个构造方法

  public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)

  public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

第retryCount次重试的sleep时间计算方式为:baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))),如果该值大于maxSleepMs,则sleep时间为maxSleepMs,如果重试次数大于maxRetries,则不再重试;

RetryNTimes:该重试策略重试指定次数,每次sleep固定的时间,构造方法如下

public RetryNTimes(int n, int sleepMsBetweenRetries)

n是重试的次数,sleepMsBetweenRetries是sleep的时间;

RetryOneTime:该重试策略只重试一次

RetryUntilElapsed:该重试策略对重试次数不做限制,但对总的重试时间做限制,构造方法如下

public RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

1

maxElapsedTimeMs是最大的重试时间,sleepMsBetweenRetries是sleep的时间间隔;

通过newClient获得CuratorFramework对象后我们就可以进行各种操作了。

除了newClient,CuratorFrameworkFactory还提供了一种Builder的方式来创建CuratorFramework对象,如下的示例所示:

 

RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 5);

CuratorFramework client =  CuratorFrameworkFactory.builder()

                .connectString("192.168.0.102:2181,192.168.0.102:2182,192.168.0.102:2183")

                .sessionTimeoutMs(30000).connectionTimeoutMs(15000)

                .retryPolicy(retryPolicy)

                .namespace("curatorTest")

                .build();

创建zookeeper节点

在curator中无论执行何种操作都必须先获得一个构建该操作的包装类(Builder对象),创建zookeeper节点需要先获得一个org.apache.curator.framework.api.CreateBuilder(实际上是CreateBuilder的实现类CreateBuilderImpl)对象,然后用这个对象来构建创建节点的操作,CreateBuilderImpl中常见的操作如下:

 

// 递归创建(持久)父目录

public ProtectACLCreateModeStatPathAndBytesable<String> creatingParentsIfNeeded()

// 设置创建节点的属性

public ACLBackgroundPathAndBytesable<String> withMode(CreateMode mode)

// 设置节点的acl属性

public ACLBackgroundPathAndBytesable<String> withACL(List<ACL> aclList, boolean applyToParents)

// 指定创建节点的路径和节点上的数据

public String forPath(final String givenPath, byte[] data) throws Exception

如下所示为创建一个节点的示例:

 

String test1Data = client.create()

                .creatingParentsIfNeeded()

                .withMode(CreateMode.PERSISTENT)

                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

                .forPath("/curatorTest/test1", "test1".getBytes());

删除zookeeper节点

同理先调用CuratorFramework的delete()获取构建删除操作的DeleteBuilder(实际上为DeleteBuilderImpl),DeleteBuilderImpl提供了如下方法来构建删除操作:

 

// 指定要删除数据的版本号

public BackgroundPathable<Void> withVersion(int version)

// 确保数据被删除,本质上就是重试,当删除失败时重新发起删除操作

public ChildrenDeletable guaranteed()

// 指定删除的节点

public Void forPath(String path) throws Exception

// 递归删除子节点

public BackgroundVersionable deletingChildrenIfNeeded()

读取zookeeper节点数据

同理先调用CuratorFramework的getData()获取构建获取数据操作的GetDataBuilder(实际上为GetDataBuilderImpl),GetDataBuilderImpl提供了如下方法来构建读取操作:

 

// 将节点状态信息保存到stat

public WatchPathable<byte[]> storingStatIn(Stat stat)

// 指定节点路径

public byte[] forPath(String path) throws Exception

如下示例为获取节点数据:

 

Stat test1Stat = new Stat();

byte[] test1DataBytes = client.getData().storingStatIn(test1Stat).forPath("/curatorTest/test1");

System.out.println("test1 data: " + new String(test1DataBytes));

更新zookeeper节点数据

同理先调用CuratorFramework的setData()获取构建获取数据操作的SetDataBuilder(实际上为SetDataBuilderImpl),SetDataBuilderImpl提供了如下方法来构建更新操作:

 

// 指定版本号

public BackgroundPathAndBytesable<Stat> withVersion(int version)

// 指定节点路径和要更新的数据

public Stat forPath(String path, byte[] data) throws Exception

示例程序:

 

test1Stat = client.setData()

                .withVersion(-1)

                .forPath("/curatorTest/test1", "test1DataV2".getBytes());

读取zookeeper子节点

同理先调用CuratorFramework的getChildren()获取构建获取子节点数据操作的GetChildrenBuilder(实际上为GetChildrenBuilderImpl),GetChildrenBuilderImpl提供了如下方法来构建更新操作:

 

// 把服务器端获取到的状态数据存储到stat对象中

public WatchPathable<List<String>> storingStatIn(Stat stat)

// 指定获取子节点数据的节点路径

public List<String> forPath(String path) throws Exception

// 设置watcher,类似于zookeeper本身的api,也只能使用一次

public BackgroundPathable<List<String>> usingWatcher(Watcher watcher)

public BackgroundPathable<List<String>> usingWatcher(CuratorWatcher watcher)

示例程序:

 

Stat childStat = new Stat();

List<String> childs = client.getChildren().storingStatIn(childStat).forPath("/curatorTest");

curator中关于异步操作

curator为所有操作都提供了异步执行的版本,只需要在构建操作的方法链中添加如下操作之一即可:

 

public ErrorListenerPathable<List<String>> inBackground()

public ErrorListenerPathable<List<String>> inBackground(Object context)

public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback)

public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context)

public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Executor executor)

public ErrorListenerPathable<List<String>> inBackground(BackgroundCallback callback, Object context, Executor executor)

如下示例程序为使用异步执行删除操作:

 

client.delete()

          .guaranteed()

          .withVersion(-1)

          .inBackground(((client1, event) -> {

                    System.out.println(event.getPath() + ", data=" + event.getData());

                    System.out.println("event type=" + event.getType());

                    System.out.println("event code=" + event.getResultCode());

           }))

           .forPath("/curatorTest/test1");

curator中的NodeCache

NodeCache会将某一路径的节点(节点本身)在本地缓存一份,当zookeeper中相应路径的节点发生更新、创建或者删除操作时,NodeCache将会得到响应,并且会将最新的数据拉到本地缓存中,NodeCache只会监听路径本身的变化,并不会监听子节点的变化。我们可以通过NodeCache注册一个监听器来获取发生变化的通知。NodeCache提供了如下构造函数:

 

public NodeCache(CuratorFramework client, String path)

public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

1

2

参数说明:

 

client: curator客户端;

path: 需要缓存的节点路径;

dataIsCompressed:是否压缩节点下的数据;

NodeCache提供了一个如下类型的监听器容器,只要往容器中添加监听器,当节点发生变更时,容器中的监听器都将得到通知。

 

private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();

1

NodeCache缓存数据及添加Listener的示例代码如下:

 

NodeCache nodeCache = new NodeCache(client, "/curatorTest/test1");

// 是否立即拉取/curatorTest/test1节点下的数据缓存到本地

nodeCache.start(true);

// 添加listener

nodeCache.getListenable().addListener(() -> {

        ChildData childData = nodeCache.getCurrentData();

        if (null != childData) {

             System.out.println("path=" + childData.getPath() + ", data=" + childData.getData() + ";");

        }

});

Note: NodeCache只会缓存节点本身的数据和状态,并不会缓存节点下的子节点信息,所以如果我们在节点下创建子节点,NodeCache中的Listener是不会得到通知的*

 

curator中的PathChildrenCache

PathChildrenCache会将指定路径节点下的所有子节点缓存在本地,但不会缓存节点本身的信息,当执行新增(CHILD_ADDED)、删除(CHILD_REMOVED)、更新(CHILD_UPDATED)指定节点下的子节点等操作时,PathChildrenCache中的Listener将会得到通知,PathChildrenCache提供了如下几个构造函数:

 

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

参数说明:

 

client:curator客户端;

path:缓存的节点路径;

cacheData:除了缓存节点状态外是否缓存节点数据,如果为true,那么客户端在接收到节点列表变更的同时,也能够获取到节点的数据内容,如果为false,则无法获取到数据内容;

threadFactory:线程池工厂,当内部需要开启新的线程执行时,使用该线程池工厂来创建线程;

dataIsCompressed:是否压缩节点数据;

executorService:线程池;

PathChildrenCache通过start方法可以传入三种启动模式,这三种启动模式定义在org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode中:

 

NORMAL:异步初始化cache;

BUILD_INITIAL_CACHE:同步初始化cache,以及创建cache后,就从服务器拉取对应的数据;

POST_INITIALIZED_EVENT:异步初始化cache,初始化完成触发PathChildrenCacheEvent.Type#INITIALIZED事件,cache中Listener会收到该事件的通知;

PathChildrenCache示例代码如下:

 

PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/curatorTest", true);

// startMode为BUILD_INITIAL_CACHE,cache是初始化完成会发送INITIALIZED事件

pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

System.out.println(pathChildrenCache.getCurrentData().size());

pathChildrenCache.getListenable().addListener(((client1, event) -> {

            ChildData data = event.getData();

            switch (event.getType()) {

                case INITIALIZED:

                    System.out.println("子节点cache初始化完成(StartMode为POST_INITIALIZED_EVENT的情况)");

                    System.out.println("INITIALIZED: " + pathChildrenCache.getCurrentData().size());

                    break;

                case CHILD_ADDED:

                    System.out.println("添加子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));

                    break;

                case CHILD_UPDATED:

                    System.out.println("更新子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));

                    break;

                case CHILD_REMOVED:

                    System.out.println("删除子节点,path=" + data.getPath());

                    break;

                default:

                    System.out.println(event.getType());

            }

}));

curator完整示例代码

如下所示为演示curator使用的完整示例代码:

public class CuratorTest {

    public static void main(String[] args) throws Exception {

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 5);

        CuratorFramework client =  CuratorFrameworkFactory.builder()

                .connectString("192.168.0.104:2181,192.168.0.104:2182,192.168.0.104:2183")

                .sessionTimeoutMs(30000).connectionTimeoutMs(15000)

                .retryPolicy(retryPolicy)

                //.namespace("curatorTest")

                .build();

        client.start();

 

        // 判断节点是否存在,存在则先删除节点

        Stat test1Stat = client.checkExists().forPath("/curatorTest/test1");

        if (null != test1Stat) {

            client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/curatorTest/test1");

        }

 

        // 创建节点

        String test1Data = client.create()

                .creatingParentsIfNeeded()

                .withMode(CreateMode.PERSISTENT)

                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)

                .forPath("/curatorTest/test1", "test1DataV1".getBytes());

 

        // 获取节点信息

        test1Stat = new Stat();

        byte[] test1DataBytes = client.getData().storingStatIn(test1Stat).forPath("/curatorTest/test1");

        System.out.println("test1 stat: " + test1Stat);

        System.out.println("test1 data: " + new String(test1DataBytes));

 

        // 更新节点数据

        test1Stat = client.setData()

                .withVersion(-1)

                .forPath("/curatorTest/test1", "test1DataV2".getBytes());

        System.out.println("test1 stat: " + test1Stat);

 

        // 获取所有子节点

        Stat childStat = new Stat();

        List<String> childs = client.getChildren().storingStatIn(childStat).forPath("/curatorTest");

        System.out.println("curatorTest childs: " + childs);

 

//        client.delete()

//                .guaranteed()

//                .withVersion(-1)

//                .inBackground(((client1, event) -> {

//                    System.out.println(event.getPath() + ", data=" + event.getData());

//                    System.out.println("event type=" + event.getType());

//                    System.out.println("event code=" + event.getResultCode());

//                }))

//                .forPath("/curatorTest/test1");

 

        // 缓存节点

        NodeCache nodeCache = new NodeCache(client, "/curatorTest/test1");

        nodeCache.start(true);

        nodeCache.getListenable().addListener(() -> {

            System.out.println("NodeCache:");

            ChildData childData = nodeCache.getCurrentData();

            if (null != childData) {

                System.out.println("path=" + childData.getPath() + ", data=" + new String(childData.getData()) + ";");

            }

        });

 

 

        // 缓存子节点

        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/curatorTest", true);

        // startMode为BUILD_INITIAL_CACHE,cache是初始化完成会发送INITIALIZED事件

        pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);

        System.out.println(pathChildrenCache.getCurrentData().size());

        pathChildrenCache.getListenable().addListener(((client1, event) -> {

            ChildData data = event.getData();

            switch (event.getType()) {

                case INITIALIZED:

                    System.out.println("子节点cache初始化完成(StartMode为POST_INITIALIZED_EVENT的情况)");

                    System.out.println("INITIALIZED: " + pathChildrenCache.getCurrentData().size());

                    break;

                case CHILD_ADDED:

                    System.out.println("添加子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));

                    break;

                case CHILD_UPDATED:

                    System.out.println("更新子节点,path=" + data.getPath() + ", data=" + new String(data.getData()));

                    break;

                case CHILD_REMOVED:

                    System.out.println("删除子节点,path=" + data.getPath());

                    break;

                default:

                    System.out.println(event.getType());

            }

        }));

 

        Thread.sleep(20000000);

    }

}

以上是关于java中操作zookeeper,curator操作zookeeper的主要内容,如果未能解决你的问题,请参考以下文章

javaAPI操作-Zookeeper

2021年大数据ZooKeeper:ZooKeeper Java API操作

zookeeper概念应用场景数据组织集群搭建客户端操作Java客户端curator

基于Curator的Zookeeper操作实战

Curator的使用

zookeeper java客户端之curator详解