ZooKeeper ---- Zookeeper基本入门
Posted whc__
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ZooKeeper ---- Zookeeper基本入门相关的知识,希望对你有一定的参考价值。
ZooKeeper入门
一、ZK简介
- dubbo框架、spring cloud框架、zk注册中心
- zk实现分布式锁
ZooKeeper,简称ZK,一个分布式的,开放源码的分布式应用程序协调服务,使用Java编写,支持Java和C两种编程语言。
二、ZK内存数据类型
1. 模型结构
2. 模型的特点
- 每个子目录如/node1都被称作一个znode(节点)。这个znode是被它所在的路径唯一标识
- znode可以有子节点目录,并且每个znode可以存储数据
- znode是有版本的,每个znode中存储的数据可以有多个版本,也就是一个访问路径中可以存储多份数据
- znode可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端
三、节点的分类
-
持久节点(PERSISTENT)
是指在节点创建后,就一直存在,直到有删除操作来主动删除这个节点–不会因为创建该节点的客户端会话失效而消失
-
持久顺序节点(PERSISTENT_SEQUENTIAL)
这类节点的基本特性和上面的节点类型是一致的。额外的特性是:在ZK中,每个父节点会为它的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整型的最大值
-
临时节点(EPHEMERAL)
和持久节点不同的是,如果客户端会话失效,那么这个节点就会自动被清除掉。注意是会话失效,而非连接断开。另外,在临时节点下面不能创建子节点
-
临时顺序节点(EPHEMERAL_SEQUENTIAL)
具有临时节点特点,额外的特性是每个父节点会为它的第一级子节点维护一份时序。这点和刚才提到的持久顺序节点类似
四、安装
Docker安装
- docker pull zookeeper:3.4.14
- docker run --name zk -p 2181:2181 -d zookeeper:3.4.14
进入内部启动
- docker exec -it zk bash
- cd ./bin
- zkCli.sh
以上的安装,只是简单的安装,没有挂载目录文件和配置目录文件之类的,而且进入到zk内部发现没有zoo.cfg文件,所以如果要实现的话,需按照另外一个教程,链接如下:
五、客户端基本指令
-
ls path 查看特定节点下面的子节点
-
create path data 创建一个节点,并给节点绑定数据(默认是持久性节点)
- create path data 创建持久节点
- create -s path data 创建持久性顺序节点
- create -e path data 创建临时节点(注意:临时节点不能含有任何子节点)
- create -e -s path data 创建临时顺序节点(注意: 临时节点不能含有任何子节点)
-
stat path 查看节点状态
-
set path data 修改节点数据
-
ls2 path 查看节点下孩子和当前节点的状态
-
history 查看操作历史
-
get path 获取节点上绑定的数据信息
-
delete path 删除节点(注意: 删除节点不能含有子节点)
-
rmr path 递归删除节点
-
quit 退出当前会话(会话失效)
六、节点监听机制watch
客户端可以监测 znode节点的变化。 Zonode节点的变化触发相应的事件,然后清除对该节点的监测。当监测一个 znode节点时候,
Zookeeper会发送通知给监测节点。一个 Watch事件是一个一次性的触发器,当被设置了 Watch的数据和目录发生了改变的时候,则服务器将这个改变发送给设置了 Watch的客户端以便通知它们。
# 1. ls /path true 监听节点目录的变化
# 2. get /path true 监听节点数据的变化
七、ZooKeeper JavaAPI操作
1、Curator介绍
- Curator是Apache ZooKeeper的Java客户端库
- 常见的ZooKeeper Java API:
- 原生Java API
- ZkClient
- Curator
- Curator项目的目标是简化ZooKeeper客户端的使用
2、Curator API常用操作
2.1 建立连接
/**
*
* @param connectString 连接字符串,zk server地址和端口 "112.74.188.132:2181"
* @param sessionTimeoutMs 会话超时时间,单位ms
* @param connectionTimeoutMs 连接超时时间,单位ms
* @param retryPolicy 重试策略
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("112.74.188.132:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("whc")
.build();
2.2 添加节点
/**
* 创建节点: create持久、临时、顺序、
* 1. 基本创建: create().forPath("")
* 2. 创建节点 带有数据: create().forPath("",data)
* 3. 设置节点的类型: create().withMode().forPath("",data)
* 4. 创建多级节点 /app1/p1 : create().creatingParentsIfNeeded().forPath("",data);
*/
@Test
public void testCreate1() throws Exception {
// 1. 基本创建
// 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
String s = client.create().forPath("/app1");
System.out.println(s);
}
@Test
public void testCreate2() throws Exception {
// 2. 创建节点,带有数据存储
String s = client.create().forPath("/app2", "hehe".getBytes());
System.out.println(s);
}
@Test
public void testCreate3() throws Exception {
// 3. 设置节点的类型
// 默认类型是持久节点
String s = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
System.out.println(s);
}
@Test
public void testCreate4() throws Exception {
// 3. 创建多级节点 /app4/p1
// creatingParentsIfNeeded如果父节点不存在,则创建父节点
String s = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
System.out.println(s);
}
2.3 删除节点
/**
* 删除节点: delete deleteall
* 1. 删除单个节点: delete().forPath("/app1");
* 2. 删除带有子节点的节点: client.delete().deletingChildrenIfNeeded().forPath("/app4");
* 3. 必须成功的删除:为了防止网络抖动,本质就是重试,client.delete().guaranteed().forPath("/app2");
* 4. 回调: inBackground
* @throws Exception
*/
@Test
public void testDelete() throws Exception {
// 1. 删除单个节点
client.delete().forPath("/app1");
}
@Test
public void testDelete2() throws Exception {
// 2. 删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDelete3() throws Exception {
// 3. 必须成功的删除
client.delete().guaranteed().forPath("/app2");
}
@Test
public void testDelete4() throws Exception {
// 4. 回调
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent curatorEvent) throws Exception {
System.out.println("我被删除了");
System.out.println(curatorEvent);
}
}).forPath("/app1");
}
2.4 修改节点
/**
* 修改数据
* 1. 修改数据: setData().forPath()
* 2. 根据版本修改: setData().withVersion().forPath()
* version是通过查询出来的,目的就是为了让其它客户端或者线程不干扰我
* @throws Exception
*/
@Test
public void testSet() throws Exception {
client.setData().forPath("/app1", "whc".getBytes());
}
@Test
public void testSetForVersion() throws Exception {
// 3. 查询节点状态信息: ls -s
Stat stat = new Stat();
System.out.println(stat);
client.getData().storingStatIn(stat).forPath("/app1");
int version = stat.getVersion(); // 查询出来的
System.out.println(version);
client.setData().withVersion(version).forPath("/app1", "haha".getBytes());
}
2.5 查询节点
/**
* 查询节点:
* 1. 查询数据: get: getData().forPath()
* 2. 查询子节点: ls: getChildren().forPath()
* 3. 查询节点状态信息: ls -s: getData().storingStatIn(状态对象).forPath()
*/
@Test
public void testGet1() throws Exception {
// 1. 查询数据: get
byte[] data = client.getData().forPath("/app1");
System.out.println(new String(data));
}
@Test
public void testGet2() throws Exception {
// 2. 查询子节点: ls
List<String> path = client.getChildren().forPath("/");
System.out.println(path);
}
@Test
public void testGet3() throws Exception {
// 3. 查询节点状态信息: ls -s
Stat stat = new Stat();
System.out.println(stat);
client.getData().storingStatIn(stat).forPath("/app1");
}
2.6 Watch事件监听
- ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制是ZooKeeper实现分布式协调服务的重要特性
- ZooKeeper中引入了Watcher机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者
- ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐
- Curator引入了Cache来实现对ZooKeeper服务端事件的监听
- ZooKeeper提供了三种Watcher:
- NodeCache: 只是监听某一个特定的节点
- PathChildrenCache: 监听一个ZNode的子节点
- TreeCache: 可以监听整个树上的所有结点,类似PathChildrenCache和NodeCache的组合
-
给指定一个节点注册监听器
/** * 演示 NodeCache: 给指定一个节点注册监听器 */ @Test public void testNodeCache() throws Exception { // 1. 创建NodeCache对象 final NodeCache nodeCache = new NodeCache(client, "/app1"); // 2. 注册监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点变化了"); // 获取修改节点后的数据 byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); // 3. 开启监听,如果设置为true,则开启监听,加载缓存数据 nodeCache.start(true); while(true) { } }
-
监听某个节点的所有子节点们
/** * 演示 PathChildrenCache: 监听某个节点的所有子节点们 */ @Test public void testPathChildrenCache() throws Exception { // 1. 创建监听对象 PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true); // 2. 绑定监听器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println("子节点变化了.."); System.out.println(pathChildrenCacheEvent); // 监听子节点的数据变更,并且拿到变更后的数据 // 1. 获取类型 PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); // 2. 判断类型是否是update if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { byte[] data = pathChildrenCacheEvent.getData().getData(); System.out.println(new String(data)); } } }); // 3. 开启 pathChildrenCache.start(); while(true) { } }
-
监听某个节点自己和所有子节点们
/** * 演示 TreeCache: 监听某个节点自己和所有子节点们 */ @Test public void testTreeCache() throws Exception { // 1. 创建监听器 TreeCache treeCache = new TreeCache(client, "/app2"); // 2. 注册监听 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { System.out.println("节点变化了"); System.out.println(treeCacheEvent); } }); // 3. 开启 treeCache.start(); while (true){ } }
八、集群架构
集群(cluster)
# 集群(cluster)
- 集合同一种软件服务的多个节点同时提供服务
# 集群解决问题
- 单节点的并发访问的压力问题
- 单节点故障问题(如硬件老化,自然灾害等)
1、集群介绍
集群架构
Leader选举:
-
Serverid:服务器ID
比如有三台服务器,编号分别是1,2,3。编号越大在选举算法中的权重越大。
-
Zxid:数据ID
服务器中存放的最多数据ID,值越大说明数据越新,在选举算法中数据越新权重越大
-
在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票,则此ZooKeeper就可以成为Leader了
九、分布式锁
1、概念
-
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用
synchronized
或者lock
的方式来解决多线程间的代码同步问题
这时多线程的运行都是在同一个JVM之下,没有任何问题。 -
但当我们的应用是
分布式集群
工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。所以得靠分布式锁组件
解决问题,由它来提供一个唯一的锁。
分布式锁
:处理跨机器的进程之间的数据同步问题
2、分类
-
数据库层面实现的分布式锁:悲观锁、乐观锁
性能低
-
基于缓存实现的分布式锁:Redis、Memcache
性能好
弊端:Redis做集群后,如果Master节点在没有被其它节点同步的时候,突然宕机,那就可能存在多个人同时获取到锁的情况
-
基于ZooKeeper实现的分布式锁
最为可靠
3、ZooKeeper分布式锁原理
核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点
-
客户端获取锁时,在lock节点下创建临时顺序节点
- 为什么是临时而不是永久节点? 假如Client1创建永久节点/lock/1,在处理数据的过程中,假如Client1机器宕机,而Client1此时并没有释放锁,那么剩下的Client2、Client3都无法获取到锁。 如果创建的是临时节点/lock/1,即使Client1宕机了,意味着连接断开,即会话结束,临时节点会在会话结束之后自动的删除,为了防止锁不被释放。 - 为什么是顺序节点?
-
然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
-
如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。
-
如果发现比自己小的那个节点被删除,则客户端的
Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,
如果不是则重复以上步骤继续获取到比自己小的一个节点
并注册监听。这就是第一步中为什么创建顺序节点的原因。因为client2创建了子节点/lock/2,它监听者client1的删除操作事件,一旦client1删除了/lock/1/节点,此时client2的watcher收到了相应通知,/lock/2是此时最小的,所以client2能获取到锁。
4、ZK JavaAPI操作
Curator实现分布式锁API
在Curator中有五种锁方案:
- InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
- InterProcessMutex:分布式可重入排他锁
- InterProcessReadWriteLock:分布式读写锁
- InterProcessMultiLock:将多个锁作为单个实体管理的容器
- InterProcessSemaphoreV2:共享信号量
分布式案例-模拟12306售票
加锁实现逻辑是在12306实现的,因为共享资源是存在12306上的,让12306充当ZooKeeper的客户端。
4.1 代码实现
模拟携程、飞猪抢夺票资源
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
// 创建客户端
Thread t1 = new Thread(ticket12306, "携程");
Thread t2 = new Thread(ticket12306, "飞猪");
t1.start();
t2.start();
}
}
模拟12306提供卖票功能,同时12306充当ZK客户端,最后实现分布式锁
public class Ticket12306 implements Runnable {
private int tickets = 10; // 数据库的票数
private InterProcessMutex lock;
// 初始化lock
public Ticket12306() {
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("112.74.188.132:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
// 开启连接
client.start();
lock = new InterProcessMutex(client, "/lock");
}
@Override
public void run() {
while(true) {
// 获取锁
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets > 0) {
System.out.println(Thread.currentThread() + ":" + tickets);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
verride
public void run() {
while(true) {
// 获取锁
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets > 0如果有人问你ZooKeeper是什么,就把这篇文章发给他。
[云原生专题-54]:Kubesphere云治理-操作-通过K8S的应用仓库一键部署微服务应用- 分布式协调服务中间件zookeeper的安装与部署
Zookeeper -- 初识ZookeeperZookeeper的安装和配置Zookeeper命令操作(Zookeeper数据模型 Zookeeper服务端 / 客户端常用命令)