zookeeper的使用(java)
Posted FreeFly辉
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper的使用(java)相关的知识,希望对你有一定的参考价值。
下载zookeeper
官网直接下就行,地址:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
下载后解压,到bin目录下找到zkServer.cmd双击运行即可
第一次双击运行时会闪退,可看以下博客解决
https://blog.csdn.net/qq279862451/article/details/79083522如果你从未使用过,请先双击zkCli.cmd进行命令行敲打学习
可搜 zk指令,这里贴一篇博客: https://blog.csdn.net/jiangtianjiao/article/details/89239354zk中的acl权限控制可参考:
https://www.pianshen.com/article/7476507438/
接下来是java客户端的操作,maven依赖放在了最后
首先启动zk
上代码:
public class ZookeeperClient
public static final String ADDRESS = "127.0.0.1:2181";
public static void main(String[] args) throws Exception
//创建连接 地址,会话超时时间,连接监听器
ZooKeeper zooKeeper = new ZooKeeper(ADDRESS,5000,new Watcher()
@Override
public void process(WatchedEvent watchedEvent)
System.out.println("有监听事件进来了,事件信息 " + watchedEvent);
);
//由于new 完Zookeeper对象就会创建,而不会在意是否连接上了,所以这里阻塞以下,直至连接成功(网上很多例子用的是CountDownLatch方式)
while (zooKeeper.getState() != ZooKeeper.States.CONNECTED)
System.out.println("连接中...");
Thread.sleep(300);
/**
* 创建节点参数
* 节点路径 相当于文件目录一样,zk根节点是 / ,注意创建节点时,一定要以 / 开头
* 节点数据 节点想要保存的数据,(如果附带服务ip端口信息,zk就当作了注册中心,客户端可拉取服务信息进行访问)
* 节点权限 zk的acl权限,这里设置的是所有人可读,具体权限前面放的地址有解释
* 节点类型(永久的、临时的、顺序性的,根据枚举名自行翻译。不了解节点类型,请先根据前面连接熟悉一下,或自行搜索了解一下)
*/
String path1 = zooKeeper.create("/client", "test-data1".getBytes(), ZooDefs.Ids.READ_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("success create znode:"+ path1);
zooKeeper.close();
运行后,打开zk客户端,进行指令查看,如下:
创建节点时,附带acl权限
上代码:public class ZookeeperClient
public static final String ADDRESS = "127.0.0.1:2181";
public static void main(String[] args) throws Exception
//创建连接 地址,会话超时时间,连接监听器
ZooKeeper zooKeeper = new ZooKeeper(ADDRESS,5000,new Watcher()
@Override
public void process(WatchedEvent watchedEvent)
System.out.println("有监听事件进来了,事件信息 " + watchedEvent);
);
//由于new 完Zookeeper对象就会创建,而不会在意是否连接上了,所以这里阻塞以下,直至连接成功(网上很多例子用的是CountDownLatch方式)
while (zooKeeper.getState() != ZooKeeper.States.CONNECTED)
System.out.println("连接中...");
Thread.sleep(300);
/**
* 创建节点参数
* 节点路径 相当于文件目录一样,zk根节点是 / ,注意创建节点时,一定要以 / 开头
* 节点数据 节点想要保存的数据,(如果附带服务ip端口信息,zk就当作了注册中心,客户端可拉取服务信息进行访问)
* 节点权限 zk的acl权限,这里设置的是所有人可读,具体权限前面放的地址有解释
* 节点类型(永久的、临时的、顺序性的,根据枚举名自行翻译。不了解节点类型,请先根据前面连接熟悉一下,或自行搜索了解一下)
*/
List<ACL> acls = new ArrayList<ACL>(); // 权限列表
// 第一个参数是权限scheme,第二个参数是加密后的用户名和密码
Id user1 = new Id("digest", DigestAuthenticationProvider.generateDigest("admin1:admin1"));
Id user2 = new Id("digest", DigestAuthenticationProvider.generateDigest("admin2:admin2"));
acls.add(new ACL(ZooDefs.Perms.ALL, user1)); // 给予所有权限
acls.add(new ACL(ZooDefs.Perms.READ, user2)); // 只给予读权限
// 多个权限的给予方式,使用 | 位运算符, 例如 acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.CREATE, user2))
// 使用自定义的权限列表去创建节点
String path1 = zooKeeper.create("/client-acl", "acl-test".getBytes(), acls, CreateMode.PERSISTENT);
System.out.println("success create znode:"+ path1);
//添加权限,否则get数据时就会报错。因为创建节点 /client-acl 时添加了自定义权限
zooKeeper.addAuthInfo("digest","admin1:admin1".getBytes());
byte[] data = zooKeeper.getData("/client-acl",null,null);
System.out.println(new String(data));
zooKeeper.close();
此时zkCli.cmd打开的客户端访问结果:
以上做到了创建连接,创建节点,获取节点数据,设置权限操作
想想可以做哪些事情,例如我启动一个web程序,程序启动时向zookeeper中创建临时节点,附带启动的ip,端口,服务名等信息. 此时编写一个网关程序,网关从zookeeper中拉取服务信息,根据注册的服务转发请求,是不是就实现了一个网关远程调用不同服务的效果(dubbo原理)当前就像mysql一样,有原生驱动包,但流行的工具必然有人搞框架
这里以zk框架CuratorFramework做例子
下面是编写的工具类,封装了增,删,查,监听的操作(注释解释):public class MycuratorClient
/**
* RetryPolicy 接口的几个实现子类:
* BoundedExponentialBackoffRetry 有界指数退避重试
* ExponentialBackoffRetry 指数退避重试
* RetryForever 一直重试
* RetryNTimes 重试n次
* RetryOneTime 重试一次
* RetryUntilElapsed 重试直到过期
* SessionFailedRetryPolicy 会话失败重试策略
* SleepingRetry 睡眠重试
*/
private final CuratorFramework zkClient;
private static final MycuratorClient mycuratorClient = new MycuratorClient();
private MycuratorClient()
zkClient = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(10000).connectionTimeoutMs(30000)
.authorization("digest", ("admin:admin").getBytes()) //使用用户名/密码进行auth授权
.retryPolicy(new ExponentialBackoffRetry(100, 6)) //重试策略
.build();
zkClient.start();
public static final MycuratorClient getInstance()
return mycuratorClient;
public boolean isReady()
return zkClient.getState().equals(CuratorFrameworkState.STARTED);
public void wacherNode(String path,TreeCacheListener listener)
// NodeCache:节点缓存可用于ZNode节点的监听
// PathCache:子节点缓存可用于ZNode的子节点的监听
// TreeCache:树缓存时PathCache的增强,不光能监听子节点,还能监听ZNode节点自身
new TreeCache(zkClient,path).getListenable().addListener(listener);
public void watchPathChildrenNode(String path, PathChildrenCacheListener listener) throws Exception
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, path, true);
//BUILD_INITIAL_CACHE 代表使用同步的方式进行缓存初始化。
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
pathChildrenCache.getListenable().addListener(listener);
public void createNode(String path,Object data) throws Exception
zkClient.create().forPath(path,JSONObject.toJSONString(data).getBytes());
// zkClient.create().withMode(CreateMode.PERSISTENT).forPath();//自定义有效状态
// zkClient.create().withTtl().forPath() //ttl顺序性的
public void createNodeWithAcl(String path,Object data,List<ACL> acls) throws Exception
zkClient.create().withACL(acls).forPath(path,JSONObject.toJSONString(data).getBytes());
public void deleteNode(String path) throws Exception
zkClient.delete().deletingChildrenIfNeeded().forPath(path);
public void setData(String path,Object data) throws Exception
zkClient.setData().forPath(path,JSONObject.toJSONString(data).getBytes());
public String getData(String path) throws Exception
return new String(zkClient.getData().forPath(path));
public List<String> getChildren(String path) throws Exception
return zkClient.getChildren().forPath(path);
测试类:
public class CuratorClientTest
public static void main(String[] args) throws Exception
MycuratorClient instance = MycuratorClient.getInstance();
instance.createNode("/curator-test",new UserService("1.0","UserService","127.0.0.1",8899));
instance.watchPathChildrenNode("/curator-test",new PathChildrenCacheListener()
//如果对应的path设置了权限,且连接时未授权,那么此listener不会报错,但监听不到响应事件
@Override
public void childEvent(CuratorFramework curatorFramework,PathChildrenCacheEvent pathChildrenCacheEvent)
System.out.println(pathChildrenCacheEvent.getType());
System.out.println(pathChildrenCacheEvent.getData().getPath());
);
ArrayList<ACL> acls = new ArrayList<>();
Id user = new Id("digest", DigestAuthenticationProvider.generateDigest("admin:admin"));
acls.add(new ACL(ZooDefs.Perms.ALL,user));
instance.createNodeWithAcl("/curator-test/test1",new UserService("1.1","UserService","127.0.0.1",8899),acls);
instance.createNodeWithAcl("/curator-test/test2",new UserService("1.2","UserService","127.0.0.1",8899),acls);
instance.createNodeWithAcl("/curator-test/test3",new UserService("1.3","UserService","127.0.0.1",8899),acls);
List<String> children = instance.getChildren("/curator-test");
for (String path : children)
System.out.println(path);
Thread.sleep(5000);
这里测试类添加了节点监听, 通过权限创建节点,运行后打印如下:
可以看出监听有的并不是全部打印在获取的前面,说明监听是有延迟的,这也是代码最后Thread.sleep(5000)的原因,不加这一块,程序退出时可能会监听不到最后一个事件.
此时通过zkCli.cmd查看数据,如下
到此你应该明白zookeeper作为注册中心的原因了
- 它保存key-value形式的节点和数据(数据不会很大,毕竟不是作为数据库)
- 拥有权限控制
- 客户端可监听,可用于轮询服务上下线
- 自身可搭建集群,而且很完善的集群leader选举
分布式调用流程
- 客户端向zookeeper注册自己的服务信息(相对于zk,所有注册程序都是客户端)
- 同一服务可注册多个,通过服务名或节点目录进行归类(可用于负载均衡,每次处理同一功能,但用不同的服务器)
- 不同功能服务可创建自己不同的目录用于区分
- 编写一个统一网关程序(可用tomcat,或者netty),通过客户端的请求,到zookeeper中找到对应服务进行请求,返回数据
想想为什么需要网关,而不是客户端直接到zookeeper中自己去拉取服务?
首先如果B/S,客户端就不支持连接zookeeper.
如果是C/S,你总不可能让所有用户都去访问zookeeper吧,那样会增加zookeeper的压力.毕竟客户端数量远比服务端多的多
maven
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
以上是关于zookeeper的使用(java)的主要内容,如果未能解决你的问题,请参考以下文章