Day377.服务器动态上下线监听案例&ZooKeeper分布式锁案例&企业面试真题 -Zookeeper
Posted 阿昌喜欢吃黄桃
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day377.服务器动态上下线监听案例&ZooKeeper分布式锁案例&企业面试真题 -Zookeeper相关的知识,希望对你有一定的参考价值。
服务器动态上下线监听案例
4.1 需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
4.2 需求分析
- 服务器动态上下线
4.3 具体 实现
-
(1)先在集群上创建/servers 节点
create /servers "servers"
-
(2)在 Idea 中创建包名:com.achang.zkcase1
-
(3)服务器端向 Zookeeper 注册代码
服务器上线的过程对于zookeeper集群来说就是zookeeper创建目录节点的过程
package com.achang.zkcase1;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
public class DistributeServer {
private static String connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
// 创建到 zk 的客户端连接
public void getConnect() throws IOException{
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
// 注册服务器
public void registServer(String hostname) throws Exception{
String create = zk.create("/servers" + hostname,hostname.getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname +" is online "+ create);
}
// 业务功能
public void business(String hostname) throws Exception{
System.out.println(hostname + " is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributeServer server = new DistributeServer();
server.getConnect();
// 2 利用 zk 连接注册服务器信息
server.registServer(args[0]);
// 3 启动业务功能
server.business(args[0]);
}
}
- (3)客户端代码
package com.achang.zkcase1;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class DistributeClient {
private static String connectString ="hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
// 创建到 zk 的客户端连接
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 再次启动监听
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
// 获取服务器列表信息
public void getServerList() throws Exception {
// 1 获取服务器 子节点信息,并且对父节点进行监听
List<String> children = zk.getChildren(parentNode, true);
// 2 存储服务器信息列表
ArrayList<String> servers = new ArrayList<>();
// 3 遍历所有节点,获取节点中的主机名称信息
for (String child : children) {
byte[] data = zk.getData(parentNode + "/" + child,false, null);
servers.add(new String(data));
}
// 4 打印服务器列表信息
System.out.println(servers);
}
// 业务功能
public void business() throws Exception{
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributeClient client = new DistributeClient();
client.getConnect();
// 2 获取 servers 的子节点信息,从中获取服务器信息列表
client.getServerList();
// 3 业务进程启动
client.business();
}
}
4.4 测试
-
在 Linux 命令行上操作增加减少服务器
- 启动 DistributeClient 客户端
- 在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点
create -e -s /servers/hadoop102 "hadoop102" create -e -s /servers/hadoop103 "hadoop103"
- 观察 Idea 控制台变化
[hadoop102, hadoop103]
- 执行删除操作
delete /servers/hadoop1020000000000
- 观察 Idea 控制台变化
[hadoop103]
-
在 Idea 上操作增加减少服务器
-
启动 DistributeClient 客户端(如果已经启动过,不需要重启)
-
启动 DistributeServer 服务
-
①点击 Edit Configurations…
-
②在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop102
-
③ 回 到 DistributeServer 的 main 方 法 , 右 键 , 在 弹 出 的 窗 口 中 点 击 Run“DistributeServer.main()”
-
④观察 DistributeServer 控制台,提示 hadoop102 is working
-
⑤观察 DistributeClient 控制台,提示 hadoop102 已经上线
-
-
ZooKeeper 分布式锁案例
什么叫做分布式锁
呢?
比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
5.1 原生 Zookeeper 实现 分布式锁案例
1) 分布式锁实现
package com.achang.lock2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributedLock {
// zookeeper server 列表
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
// 超时时间
private int sessionTimeout = 2000;
private ZooKeeper zk;
private String rootNode = "locks";
private String subNode = "seq-";
// 当前 client 等待的子节点
private String waitPath;
//ZooKeeper 连接
private CountDownLatch connectLatch = new CountDownLatch(1);
//ZooKeeper 节点等待
private CountDownLatch waitLatch = new CountDownLatch(1);
// 当前 client 创建的子节点
private String currentNode;
// 和 zk 服务建立连接,并创建根节点
public DistributedLock() throws IOException,InterruptedException, KeeperException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
if (event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 发生了 waitPath 的删除事件
if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)){
waitLatch.countDown();
}
}
});
// 等待连接建立
connectLatch.await();
//获取根节点状态
Stat stat = zk.exists("/" + rootNode, false);
//如果根节点不存在,则创建根节点,根节点类型为永久节点
if (stat == null) {
System.out.println("根节点不存在");
zk.create("/" + rootNode, new byte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 加锁方法
public void zkLock() {
try {
//在根节点下创建临时顺序节点,返回值为创建的节点路径
currentNode = zk.create("/" + rootNode + "/" + subNode,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
// wait 一小会, 让结果更清晰一些
Thread.sleep(10);
// 注意, 没有必要监听"/locks"的子节点的变化情况
List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
// 列表中只有一个子节点, 那肯定就是 currentNode , 说明client 获得锁
if (childrenNodes.size() == 1) {
return;
} else {
//对根节点下的所有临时顺序节点进行从小到大排序
Collections.sort(childrenNodes);
//当前节点名称
String thisNode = currentNode.substring(("/" + rootNode + "/").length());
//获取当前节点的位置
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// index == 0, 说明 thisNode 在列表中最小, 当前client 获得锁
return;
} else {
// 获得排名比 currentNode 前 1 位的节点
this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
// 在 waitPath 上注册监听器, 当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
zk.getData(waitPath, true, new Stat());
//进入等待锁状态
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 解锁方法
public void zkUnlock() {
try {
zk.delete(this.currentNode, -1);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}
2)分布式锁 测试
- 创建两个线程
package com.achang.lock2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributedLockTest {
public static void main(String[] args) throws
InterruptedException, IOException, KeeperException {
// 创建分布式锁 1
final DistributedLock lock1 = new DistributedLock();
// 创建分布式锁 2
final DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock1.zkLock();
System.out.println("线程 1 获取锁");
Thread.sleep(5 * 1000);
lock1.zkUnlock();
System.out.println("线程 1 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock2.zkLock();
System.out.println("线程 2 获取锁");
Thread.sleep(5 * 1000);
lock2.zkUnlock();
System.out.println("线程 2 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
- 观察控制台变化
- 线程 1 获取锁
- 线程 1 释放锁
- 线程 2 获取锁
- 线程 2 释放锁
5.2Curator 框架
实现分布式锁案例
1)原生的 Java API 开发存在的问题
- (1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
- (2)Watch 需要重复注册,不然就不能生效
- (3)开发的复杂性还是比较高的
- (4)不支持多节点删除和创建。需要自己去递归
2)Curator 是一个专门解决分布式锁的框架,解决了原生Java API
详情请查看官方文档:https://curator.apache.org/index.html
3 )Curator 案例实操
-
(1)添加依赖
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>
-
(2)代码实现
package com.以上是关于Day377.服务器动态上下线监听案例&ZooKeeper分布式锁案例&企业面试真题 -Zookeeper的主要内容,如果未能解决你的问题,请参考以下文章