zookeeper的使用(java)

Posted FreeFly辉

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper的使用(java)相关的知识,希望对你有一定的参考价值。

下载zookeeper

官网直接下就行,地址:

https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz

下载后解压,到bin目录下找到zkServer.cmd双击运行即可

第一次双击运行时会闪退,可看以下博客解决

https://blog.csdn.net/qq279862451/article/details/79083522

如果你从未使用过,请先双击zkCli.cmd进行命令行敲打学习

可搜 zk指令,这里贴一篇博客: https://blog.csdn.net/jiangtianjiao/article/details/89239354

zk中的acl权限控制可参考:
https://www.pianshen.com/article/7476507438/

接下来是java客户端的操作,maven依赖放在了最后

首先启动zk


上代码:

public class ZookeeperClient {
    public static final String ADDRESS = "127.0.0.1:2181";
    public static void main(String[] args) throws Exception {
        //创建连接 地址,会话超时时间,连接监听器
        ZooKeeper zooKeeper = new ZooKeeper(ADDRESS,5000,new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("有监听事件进来了,事件信息 " + watchedEvent);
            }
        });

        //由于new 完Zookeeper对象就会创建,而不会在意是否连接上了,所以这里阻塞以下,直至连接成功(网上很多例子用的是CountDownLatch方式)
        while (zooKeeper.getState() != ZooKeeper.States.CONNECTED){
            System.out.println("连接中...");
            Thread.sleep(300);
        }

        /**
         *   创建节点参数
         *   节点路径 相当于文件目录一样,zk根节点是 / ,注意创建节点时,一定要以 / 开头
         *   节点数据 节点想要保存的数据,(如果附带服务ip端口信息,zk就当作了注册中心,客户端可拉取服务信息进行访问)
         *   节点权限 zk的acl权限,这里设置的是所有人可读,具体权限前面放的地址有解释
         *   节点类型(永久的、临时的、顺序性的,根据枚举名自行翻译。不了解节点类型,请先根据前面连接熟悉一下,或自行搜索了解一下)
        */

        String path1 = zooKeeper.create("/client", "test-data1".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("success create znode:"+ path1);

        zooKeeper.close();
    }

}

运行后,打开zk客户端,进行指令查看,如下:

创建节点时,附带acl权限

上代码:
public class ZookeeperClient {
    public static final String ADDRESS = "127.0.0.1:2181";
    public static void main(String[] args) throws Exception {
        //创建连接 地址,会话超时时间,连接监听器
        ZooKeeper zooKeeper = new ZooKeeper(ADDRESS,5000,new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("有监听事件进来了,事件信息 " + watchedEvent);
            }
        });

        //由于new 完Zookeeper对象就会创建,而不会在意是否连接上了,所以这里阻塞以下,直至连接成功(网上很多例子用的是CountDownLatch方式)
        while (zooKeeper.getState() != ZooKeeper.States.CONNECTED){
            System.out.println("连接中...");
            Thread.sleep(300);
        }

        /**
         *   创建节点参数
         *   节点路径 相当于文件目录一样,zk根节点是 / ,注意创建节点时,一定要以 / 开头
         *   节点数据 节点想要保存的数据,(如果附带服务ip端口信息,zk就当作了注册中心,客户端可拉取服务信息进行访问)
         *   节点权限 zk的acl权限,这里设置的是所有人可读,具体权限前面放的地址有解释
         *   节点类型(永久的、临时的、顺序性的,根据枚举名自行翻译。不了解节点类型,请先根据前面连接熟悉一下,或自行搜索了解一下)
        */


        List<ACL> acls = new ArrayList<ACL>();  // 权限列表
        // 第一个参数是权限scheme,第二个参数是加密后的用户名和密码
        Id user1 = new Id("digest", DigestAuthenticationProvider.generateDigest("admin1:admin1"));
        Id user2 = new Id("digest", DigestAuthenticationProvider.generateDigest("admin2:admin2"));
        acls.add(new ACL(ZooDefs.Perms.ALL, user1));  // 给予所有权限
        acls.add(new ACL(ZooDefs.Perms.READ, user2));  // 只给予读权限
        // 多个权限的给予方式,使用 | 位运算符, 例如 acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE, user2))

        // 使用自定义的权限列表去创建节点
        String path1 = zooKeeper.create("/client-acl", "acl-test".getBytes(), acls, CreateMode.PERSISTENT);
        System.out.println("success create znode:"+ path1);

        //添加权限,否则get数据时就会报错。因为创建节点 /client-acl 时添加了自定义权限
        zooKeeper.addAuthInfo("digest","admin1:admin1".getBytes());
        byte[] data = zooKeeper.getData("/client-acl",null,null);
        System.out.println(new String(data));

        zooKeeper.close();
    }
}

此时zkCli.cmd打开的客户端访问结果:

以上做到了创建连接,创建节点,获取节点数据,设置权限操作

想想可以做哪些事情,例如我启动一个web程序,程序启动时向zookeeper中创建临时节点,附带启动的ip,端口,服务名等信息. 此时编写一个网关程序,网关从zookeeper中拉取服务信息,根据注册的服务转发请求,是不是就实现了一个网关远程调用不同服务的效果(dubbo原理)

当前就像mysql一样,有原生驱动包,但流行的工具必然有人搞框架

这里以zk框架CuratorFramework做例子

下面是编写的工具类,封装了增,删,查,监听的操作(注释解释):
public class MycuratorClient {

    /**
     *  RetryPolicy 接口的几个实现子类:
     *     BoundedExponentialBackoffRetry 有界指数退避重试
     *     ExponentialBackoffRetry 指数退避重试
     *     RetryForever 一直重试
     *     RetryNTimes 重试n次
     *     RetryOneTime 重试一次
     *     RetryUntilElapsed 重试直到过期
     *     SessionFailedRetryPolicy 会话失败重试策略
     *     SleepingRetry 睡眠重试
     */

    private final CuratorFramework zkClient;

    private static final MycuratorClient mycuratorClient = new MycuratorClient();

    private MycuratorClient(){
        zkClient = CuratorFrameworkFactory.builder()
                        .connectString("127.0.0.1:2181")
                        .sessionTimeoutMs(10000).connectionTimeoutMs(30000)
                        .authorization("digest", ("admin:admin").getBytes()) //使用用户名/密码进行auth授权
                        .retryPolicy(new ExponentialBackoffRetry(100, 6))  //重试策略
                        .build();
        zkClient.start();
    }

    public static final MycuratorClient getInstance(){
        return mycuratorClient;
    }

    public boolean isReady(){
        return zkClient.getState().equals(CuratorFrameworkState.STARTED);
    }

    public void wacherNode(String path,TreeCacheListener listener){
//        NodeCache:节点缓存可用于ZNode节点的监听
//        PathCache:子节点缓存可用于ZNode的子节点的监听
//        TreeCache:树缓存时PathCache的增强,不光能监听子节点,还能监听ZNode节点自身
        new TreeCache(zkClient,path).getListenable().addListener(listener);
    }

    public void watchPathChildrenNode(String path, PathChildrenCacheListener listener) throws Exception {
        PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, path, true);
        //BUILD_INITIAL_CACHE 代表使用同步的方式进行缓存初始化。
        pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        pathChildrenCache.getListenable().addListener(listener);
    }

    public void createNode(String path,Object data) throws Exception{
        zkClient.create().forPath(path,JSONObject.toJSONString(data).getBytes());
//        zkClient.create().withMode(CreateMode.PERSISTENT).forPath();//自定义有效状态
//        zkClient.create().withTtl().forPath() //ttl顺序性的
    }

    public void createNodeWithAcl(String path,Object data,List<ACL> acls) throws Exception{
        zkClient.create().withACL(acls).forPath(path,JSONObject.toJSONString(data).getBytes());
    }

    public void deleteNode(String path) throws Exception{
        zkClient.delete().deletingChildrenIfNeeded().forPath(path);
    }

    public void setData(String path,Object data) throws Exception{
        zkClient.setData().forPath(path,JSONObject.toJSONString(data).getBytes());
    }

    public String getData(String path) throws Exception{
        return new String(zkClient.getData().forPath(path));
    }

    public List<String> getChildren(String path) throws Exception{
        return zkClient.getChildren().forPath(path);
    }
}

测试类:

public class CuratorClientTest {
    public static void main(String[] args) throws Exception{
        MycuratorClient instance = MycuratorClient.getInstance();
        instance.createNode("/curator-test",new UserService("1.0","UserService","127.0.0.1",8899));
        instance.watchPathChildrenNode("/curator-test",new PathChildrenCacheListener() {
            //如果对应的path设置了权限,且连接时未授权,那么此listener不会报错,但监听不到响应事件
            @Override
            public void childEvent(CuratorFramework curatorFramework,PathChildrenCacheEvent pathChildrenCacheEvent){
                System.out.println(pathChildrenCacheEvent.getType());
                System.out.println(pathChildrenCacheEvent.getData().getPath());
            }
        });

        ArrayList<ACL> acls = new ArrayList<>();
        Id user = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin"));
        acls.add(new ACL(ZooDefs.Perms.ALL,user));

        instance.createNodeWithAcl("/curator-test/test1",new UserService("1.1","UserService","127.0.0.1",8899),acls);
        instance.createNodeWithAcl("/curator-test/test2",new UserService("1.2","UserService","127.0.0.1",8899),acls);
        instance.createNodeWithAcl("/curator-test/test3",new UserService("1.3","UserService","127.0.0.1",8899),acls);
        
        List<String> children = instance.getChildren("/curator-test");
        for (String path : children){
            System.out.println(path);
        }
        Thread.sleep(5000);
    }
}

这里测试类添加了节点监听, 通过权限创建节点,运行后打印如下:

可以看出监听有的并不是全部打印在获取的前面,说明监听是有延迟的,这也是代码最后Thread.sleep(5000)的原因,不加这一块,程序退出时可能会监听不到最后一个事件.

此时通过zkCli.cmd查看数据,如下

到此你应该明白zookeeper作为注册中心的原因了

  • 它保存key-value形式的节点和数据(数据不会很大,毕竟不是作为数据库)
  • 拥有权限控制
  • 客户端可监听,可用于轮询服务上下线
  • 自身可搭建集群,而且很完善的集群leader选举

分布式调用流程

  • 客户端向zookeeper注册自己的服务信息(相对于zk,所有注册程序都是客户端)
  • 同一服务可注册多个,通过服务名或节点目录进行归类(可用于负载均衡,每次处理同一功能,但用不同的服务器)
  • 不同功能服务可创建自己不同的目录用于区分
  • 编写一个统一网关程序(可用tomcat,或者netty),通过客户端的请求,到zookeeper中找到对应服务进行请求,返回数据
至此就将一个服务可以开启多份,不同服务进行拆分运行到不同的服务器上,但通过网关提供了统一的 请求接口.

想想为什么需要网关,而不是客户端直接到zookeeper中自己去拉取服务?
首先如果B/S,客户端就不支持连接zookeeper.
如果是C/S,你总不可能让所有用户都去访问zookeeper吧,那样会增加zookeeper的压力.毕竟客户端数量远比服务端多的多

maven

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
</dependency>

以上是关于zookeeper的使用(java)的主要内容,如果未能解决你的问题,请参考以下文章

如何在片段中使用 GetJsonFromUrlTask​​.java

# Java 常用代码片段

# Java 常用代码片段

创建片段而不从 java 代码实例化它

Zookeeper客户端java代码操作

Java代码操作zookeeper