Zookeeper---java客户端的使用

Posted bluedarkni

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper---java客户端的使用相关的知识,希望对你有一定的参考价值。

前面介绍了zk指令的使用,这里说一下java客户端中怎么使用这些指令

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.5</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

 

客户端创建参数:

1.初始化连接:org.apache.zookeeper.ZooKeeper,实例化该类之后将会自动与ZK建立连接。构造参数说明如下:

参数名称

类型

说明

connectString

String

连接串,包括ip+端口 ,集群模式下用逗号隔开

192.168.0.101:2181,192.168.0.67:2181

sessionTimeout

int

会话超时时间,该值不能超过服务端所设置的

minSessionTimeout 和maxSessionTimeout

watcher

Watcher

会话监听器,服务端事件将会触该监听

sessionId

long

自定义会话ID

sessionPasswd

byte[]

会话密码

canBeReadOnly

boolean

该连接是否为只读的

hostProvider

HostProvider

服务端地址提供者,指示客户端如何选择某个服务来调用,默认采用StaticHostProvider实现

 

2. org.apache.zookeeper.ZooKeeper#create() 创建节点 创建节点的时候需要注意设置ACL权限,五个权限位:

    int READ = 1 << 0;

    int WRITE = 1 << 1;

    int CREATE = 1 << 2;

    int DELETE = 1 << 3;

    int ADMIN = 1 << 4;

    int ALL = READ | WRITE | CREATE | DELETE | ADMIN;

3.org.apache.zookeeper.ZooKeeper#getData() 查看节点数据/添加数据变化监听

4.org.apache.zookeeper.ZooKeeper#getChildren() 查看子节点/添加子节点变化监听

 

代码demo:

package com.nijunyang.zookeeper.demo;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * Description:
 * Created by nijunyang on 2020/10/27 21:36
 */
public class ZkClientDataChange {

    ZooKeeper zkClient;

    @Before
    public void before() throws IOException {
        //集群用,分割
        String connectString = "192.168.0.67:2181";
        zkClient = new ZooKeeper(connectString, 40000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event.getPath());
                System.out.println(event);
            }
        });
    }

    //获取数据
    @Test
    public void getData1() throws KeeperException, InterruptedException {
        byte[] data = zkClient.getData("/njy", false, null);
        System.out.println(new String(data));
    }

    //添加监听
    @Test
    public void getData2() throws KeeperException, InterruptedException {
        //直接调用初始化的监听
        byte[] data = zkClient.getData("/njy", true, null);
        System.out.println(new String(data));
        Thread.sleep(Integer.MAX_VALUE);
    }
    
    @Test
    public void getData3() throws KeeperException, InterruptedException {
        //Stat 会填充带回来
        Stat stat = new Stat();
        //添加自定义监听
        byte[] data = zkClient.getData("/njy", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    //重复添加监听
                    zkClient.getData(event.getPath(), this, null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println(event.getPath());
            }
        }, stat);
        System.out.println(stat);
        Thread.sleep(Long.MAX_VALUE);
    }

    //带回调
    @Test
    public void getData4() throws KeeperException, InterruptedException {
        zkClient.getData("/njy", false, new AsyncCallback.DataCallback() {
            @Override
            public void processResult(int rc, java.lang.String path, Object ctx, byte[] data, Stat stat) {
                System.out.println(new String(data));
                System.out.println(stat);
            }
        }, "");
        Thread.sleep(Long.MAX_VALUE);
    }

    //获取子节点
    @Test
    public void getChild() throws KeeperException, InterruptedException {
        List<String> children = zkClient.getChildren("/njy", false);
        children.stream().forEach(System.out::println);
    }

    //监听子节点变化
    @Test
    public void getChild2() throws KeeperException, InterruptedException {
        List<String> children = zkClient.getChildren("/njy", event -> {
            System.out.println(event.getPath());
            try {
                zkClient.getChildren(event.getPath(), false);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        children.stream().forEach(System.out::println);
        Thread.sleep(Long.MAX_VALUE);
    }

    //持续监听
    @Test
    public void getChild3() throws KeeperException, InterruptedException {
        Stat stat = new Stat();
        List<String> children = zkClient.getChildren("/njy", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println(event.getPath());
                try {
                    zkClient.getChildren(event.getPath(),this);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, stat);
        children.stream().forEach(System.out::println);
        Thread.sleep(Long.MAX_VALUE);
    }
    
    //创建节点
    @Test
    public void createNode() throws KeeperException, InterruptedException {
        List<ACL> list = new ArrayList<>();
//        int perm = ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ;//cdwra
//        int READ = 1 << 0;
//        int WRITE = 1 << 1;
//        int CREATE = 1 << 2;
//        int DELETE = 1 << 3;
//        int ADMIN = 1 << 4;
//        int ALL = READ | WRITE | CREATE | DELETE | ADMIN;
        int perm = ZooDefs.Perms.ALL;
        //ACL权限
        ACL acl = new ACL(perm, new Id("world", "anyone"));
//        ACL acl2 = new ACL(perm, new Id("ip", "192.168.0.67"));
//        ACL acl3 = new ACL(perm, new Id("ip", "192.168.0.101"));
        list.add(acl);
//        list.add(acl2);
//        list.add(acl3);
        zkClient.create("/njy/njyn1", "hello".getBytes(), list, CreateMode.PERSISTENT);
    }
}

 

以上是关于Zookeeper---java客户端的使用的主要内容,如果未能解决你的问题,请参考以下文章

2021年大数据ZooKeeper:ZooKeeper Java API操作

canal挂载在zookeeper,java客户端client,连接报超时

ZooKeeper Java Example(官方例子)

客户端的客户端密码验证失败,授权代码流中的客户端密码无效

使用 xhprof 分析 PHP 代码时,如何防止它破坏来自 JavaScript 客户端的 Ajax 调用?

python的select服务端的代码和客户端的代码