Zookeeper---java客户端的使用
Posted bluedarkni
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper---java客户端的使用相关的知识,希望对你有一定的参考价值。
前面介绍了zk指令的使用,这里说一下java客户端中怎么使用这些指令
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.5</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency>
客户端创建参数:
1.初始化连接:org.apache.zookeeper.ZooKeeper,实例化该类之后将会自动与ZK建立连接。构造参数说明如下:
参数名称 |
类型 |
说明 |
connectString |
String |
连接串,包括ip+端口 ,集群模式下用逗号隔开 192.168.0.101:2181,192.168.0.67:2181 |
sessionTimeout |
int |
会话超时时间,该值不能超过服务端所设置的 minSessionTimeout 和maxSessionTimeout |
watcher |
Watcher |
会话监听器,服务端事件将会触该监听 |
sessionId |
long |
自定义会话ID |
sessionPasswd |
byte[] |
会话密码 |
canBeReadOnly |
boolean |
该连接是否为只读的 |
hostProvider |
HostProvider |
服务端地址提供者,指示客户端如何选择某个服务来调用,默认采用StaticHostProvider实现 |
2. org.apache.zookeeper.ZooKeeper#create() 创建节点 创建节点的时候需要注意设置ACL权限,五个权限位:
int READ = 1 << 0;
int WRITE = 1 << 1;
int CREATE = 1 << 2;
int DELETE = 1 << 3;
int ADMIN = 1 << 4;
int ALL = READ | WRITE | CREATE | DELETE | ADMIN;
3.org.apache.zookeeper.ZooKeeper#getData() 查看节点数据/添加数据变化监听
4.org.apache.zookeeper.ZooKeeper#getChildren() 查看子节点/添加子节点变化监听
代码demo:
package com.nijunyang.zookeeper.demo; import org.apache.zookeeper.*; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * Description: * Created by nijunyang on 2020/10/27 21:36 */ public class ZkClientDataChange { ZooKeeper zkClient; @Before public void before() throws IOException { //集群用,分割 String connectString = "192.168.0.67:2181"; zkClient = new ZooKeeper(connectString, 40000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath()); System.out.println(event); } }); } //获取数据 @Test public void getData1() throws KeeperException, InterruptedException { byte[] data = zkClient.getData("/njy", false, null); System.out.println(new String(data)); } //添加监听 @Test public void getData2() throws KeeperException, InterruptedException { //直接调用初始化的监听 byte[] data = zkClient.getData("/njy", true, null); System.out.println(new String(data)); Thread.sleep(Integer.MAX_VALUE); } @Test public void getData3() throws KeeperException, InterruptedException { //Stat 会填充带回来 Stat stat = new Stat(); //添加自定义监听 byte[] data = zkClient.getData("/njy", new Watcher() { @Override public void process(WatchedEvent event) { try { //重复添加监听 zkClient.getData(event.getPath(), this, null); } catch (Exception e) { e.printStackTrace(); } System.out.println(event.getPath()); } }, stat); System.out.println(stat); Thread.sleep(Long.MAX_VALUE); } //带回调 @Test public void getData4() throws KeeperException, InterruptedException { zkClient.getData("/njy", false, new AsyncCallback.DataCallback() { @Override public void processResult(int rc, java.lang.String path, Object ctx, byte[] data, Stat stat) { System.out.println(new String(data)); System.out.println(stat); } }, ""); Thread.sleep(Long.MAX_VALUE); } //获取子节点 @Test public void getChild() throws KeeperException, InterruptedException { List<String> children = zkClient.getChildren("/njy", false); children.stream().forEach(System.out::println); } //监听子节点变化 @Test public void getChild2() throws KeeperException, InterruptedException { List<String> children = zkClient.getChildren("/njy", event -> { System.out.println(event.getPath()); try { zkClient.getChildren(event.getPath(), false); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }); children.stream().forEach(System.out::println); Thread.sleep(Long.MAX_VALUE); } //持续监听 @Test public void getChild3() throws KeeperException, InterruptedException { Stat stat = new Stat(); List<String> children = zkClient.getChildren("/njy", new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getPath()); try { zkClient.getChildren(event.getPath(),this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }, stat); children.stream().forEach(System.out::println); Thread.sleep(Long.MAX_VALUE); } //创建节点 @Test public void createNode() throws KeeperException, InterruptedException { List<ACL> list = new ArrayList<>(); // int perm = ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ;//cdwra // int READ = 1 << 0; // int WRITE = 1 << 1; // int CREATE = 1 << 2; // int DELETE = 1 << 3; // int ADMIN = 1 << 4; // int ALL = READ | WRITE | CREATE | DELETE | ADMIN; int perm = ZooDefs.Perms.ALL; //ACL权限 ACL acl = new ACL(perm, new Id("world", "anyone")); // ACL acl2 = new ACL(perm, new Id("ip", "192.168.0.67")); // ACL acl3 = new ACL(perm, new Id("ip", "192.168.0.101")); list.add(acl); // list.add(acl2); // list.add(acl3); zkClient.create("/njy/njyn1", "hello".getBytes(), list, CreateMode.PERSISTENT); } }
以上是关于Zookeeper---java客户端的使用的主要内容,如果未能解决你的问题,请参考以下文章
2021年大数据ZooKeeper:ZooKeeper Java API操作
canal挂载在zookeeper,java客户端client,连接报超时