三天学会ZooKeeper第二天(全网最细)
Posted 活跃的咸鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了三天学会ZooKeeper第二天(全网最细)相关的知识,希望对你有一定的参考价值。
ZooKeeper
本篇文章参考黑马程序员ZooKeeper教程
三天学会ZooKeeper第一天(全网最细)
三天学会ZooKeeper第三天(全网最细)
Zookeeper session 基本原理
客户端与服务端之间的连接是基于 TCP 长连接,client 端连接 server 端默认的 2181 端口,也就是 session 会话。从第一次连接建立开始,客户端开始会话的生命周期,客户端向服务端的ping包请求,每个会话都可以设置一个超时时间。
Session 的创建
sessionID: 会话ID,用来唯一标识一个会话,每次客户端创建会话的时候,zookeeper 都会为其分配一个全局唯一的 sessionID。zookeeper 创建 sessionID SessionTrackerImpl 中的源码。
Timeout:会话超时时间。客户端在构造 Zookeeper 实例时候,向服务端发送配置的超时时间,server 端会根据自己的超时时间限制最终确认会话的超时时间。
TickTime:下次会话超时时间点,默认 2000 毫秒。可在 zoo.cfg 配置文件中配置,便于 server 端对 session 会话实行分桶策略管理。
isClosing:该属性标记一个会话是否已经被关闭,当 server 端检测到会话已经超时失效,该会话标记为"已关闭",不再处理该会话的新请求。
Session 的状态
connecting:连接中,session 一旦建立,状态就是 connecting 状态,时间很短。
connected:已连接,连接成功之后的状态。
closed:已关闭,发生在 session 过期,一般由于网络故障客户端重连失败,服务器宕机或者客户端主动断开。
会话超时管理(分桶策略+会话激活)
zookeeper 的 leader 服务器再运行期间定时进行会话超时检查,时间间隔是 ExpirationInterval,单位是毫秒,默认值是 tickTime,每隔 tickTime 进行一次会话超时检查。
ExpirationTime 的计算方式:
ExpirationTime = CurrentTime + SessionTimeout;
ExpirationTime = (ExpirationTime / ExpirationInterval + 1) * ExpirationInterval;
在 zookeeper 运行过程中,客户端会在会话超时过期范围内向服务器发送请求(包括读和写)或者 ping 请求,俗称心跳检测完成会话激活,从而来保持会话的有效性。
会话激活流程:
激活后进行迁移会话的过程,然后开始新一轮:
zookeeper 事件监听机制
watcher概念
zookeeper提供了数据的发布/订阅功能,多个订阅者可同时监听某一特定主题对象,当该主题对象的自身状态发生变化时(例如节点内容改变、节点下的子节点列表改变等),会实时、主动通知所有订阅者zookeeper采用了Watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生变化时会异步通知客户端,因此客户端不必在Watcher注册后轮询阻塞,从而减轻了客户端压力。watcher机制实际上与观察者模式类似,也可看作是一种观察者模式在分布式场景下的实现方式。
watcher架构
- Zookeeper服务端
- Zookeeper客户端
- 客户端的ZKWatchManager对象
客户端首先将Watcher注册到服务端,同时将Watcher对象保存到客户端的Watch管理器中。当ZooKeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的Watch管理器会触发相关Watcher来回调相应处理逻辑,从而完成整体的数据发布/订阅流程。
watcher特性
watcher接口设计
Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。
Watcher内部包含了两个枚举类:KeeperState、EventType
Watcher通知状态(KeeperState)
KeeperState是客户端与服务端连接状态发生变化时对应的通知类型。路径为
org.apache.zookeeper.Watcher.Event.KeeperState,是一个枚举类,其枚举属性
如下:
Watcher事件类型(EventType)
EventType是数据节点(znode)发生变化时对应的通知类型。EventType变化时
KeeperState永远处于SyncConnected通知状态下;当KeeperState发生变化时,EventType永远为None。其路径为org.apache.zookeeper.Watcher.Event.EventType,是一个枚举类,枚举属性如下:
注:客户端接收到的相关事件通知中只包含状态及类型等信息,不包括节点变化前后的具体内容,变化前的数据需业务自身存储,变化后的数据需调用get等方法重新获取;
捕获相应的事件
下表以node-x节点为例,说明调用的注册方法和可监听事件间的关系:
客服端与服务器的连接状态
KeeperState 通知状态
SyncConnected:客户端与服务器正常连接时
Disconnected:客户端与服务器断开连接时
Expired:会话session失效时
AuthFailed:身份认证失败时
事件类型为:None
public class ZKConnectionWatcher implements Watcher {
private static String connectString = "192.168.226.144:2181";
// 计数器对象
static CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接对象
static ZooKeeper zooKeeper;
@Override
public void process(WatchedEvent event) {
try {
// 事件类型
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功!");
countDownLatch.countDown();
} else if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("断开连接!");
} else if (event.getState() == Event.KeeperState.Expired) {
System.out.println("会话超时!");
zooKeeper = new ZooKeeper(connectString, 5000, new ZKConnectionWatcher());
} else if (event.getState() == Event.KeeperState.AuthFailed) {
System.out.println("认证失败!");
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static void main(String[] args) {
try {
zooKeeper = new ZooKeeper(connectString, 5000, new ZKConnectionWatcher());
// 阻塞线程等待连接的创建
countDownLatch.await();
// 会话id
System.out.println(zooKeeper.getSessionId());
// 添加授权用户
zooKeeper.addAuthInfo("digest", "zzn:1234561".getBytes());
byte[] bs = zooKeeper.getData("/node1", false, null);
System.out.println(new String(bs));
Thread.sleep(50000);
zooKeeper.close();
System.out.println("结束");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
检查节点是否存在
// 使用连接对象的监视器
exists(String path, boolean b)
// 自定义监视器
exists(String path, Watcher w)
// NodeCreated:节点创建
// NodeDeleted:节点删除
// NodeDataChanged:节点内容发生变化
path- znode路径。
b- 是否使用连接对象中注册的监视器。
w-监视器对象。
public class ZKWatcherExists {
public static String IP = "192.168.226.144:2181";
ZooKeeper zooKeeper = null;
@Before
public void before() throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接zookeeper客户端
zooKeeper = new ZooKeeper(IP, 6000, event -> {
System.out.println("连接对象的参数!");
// 连接成功
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
});
countDownLatch.await();
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
@Test
public void watcherExists1() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:使用连接对象中的watcher
Stat exists = zooKeeper.exists("/watcher/node1", true);
System.out.println(exists == null ? "节点不存在" : "节点存在");
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherExists2() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:自定义watcher对象
zooKeeper.exists("/watcher/node1", event -> {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
});
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherExists3() throws KeeperException, InterruptedException {
// watcher一次性
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
zooKeeper.exists("/watcher/node1", this);
} catch (Exception ex) {
ex.printStackTrace();
}
}
};
zooKeeper.exists("/watcher/node1", watcher);
Thread.sleep(80000);
System.out.println("结束");
}
@Test
public void watcherExists4() throws KeeperException, InterruptedException {
// 注册多个监听器对象
zooKeeper.exists("/watcher/node1", event -> {
System.out.println("1");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
});
zooKeeper.exists("/watcher/node1", event -> {
System.out.println("2");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
});
Thread.sleep(80000);
System.out.println("结束");
}
}
查看节点
// 使用连接对象的监视器
getData(String path, boolean b, Stat stat)
// 自定义监视器
getData(String path, Watcher w, Stat stat)
// NodeDeleted:节点删除
// NodeDataChanged:节点内容发生变化
path- znode路径。
b- 是否使用连接对象中注册的监视器。
w-监视器对象。
stat- 返回znode的元数据。
/**
* getData 监控 Change ,Deleted
*/
public class ZKWatcherGetData {
public static String IP = "192.168.226.144:2181";
ZooKeeper zooKeeper = null;
@Before
public void before() throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
// 连接zookeeper客户端
zooKeeper = new ZooKeeper(IP, 6000, event -> {
System.out.println("连接对象的参数!");
// 连接成功
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
});
countDownLatch.await();
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
@Test
public void watcherGetData1() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:使用连接对象中的watcher
zooKeeper.getData("/watcher/node2", true, null);
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherGetData2() throws KeeperException, InterruptedException {
// arg1:节点的路径
// arg2:自定义watcher对象
zooKeeper.getData("/watcher/node2", event -> {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
}, null);
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherGetData3() throws KeeperException, InterruptedException {
// 一次性
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
System.out.println("自定义watcher");
System.out.println("path=" + event.getPath());
System.out.println("eventType=" + event.getType());
if (event.getType() == Event.EventType.NodeDataChanged) {
zooKeeper.getData("/watcher/node2", this, null);
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
};
byte[] data = zooKeeper.getData("/watcher/node2", watcher, null);
System.out.println("data = " + new String(data));
Thread.sleep(50000);
System.out.println("结束");
}
@Test
public void watcherGetData4() throws KeeperException, InterruptedException {
// 注册多个监听器对象
zooKeeper.getData("/watcher/node2", new Watcher() {
@Override
public void process(WatchedEvent event) 三天学会ZooKeeper第一天(全网最细)
2021年 全网最细大数据学习笔记:Zookeeper 集群