Zookeeper + Curator实现分布式锁
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper + Curator实现分布式锁相关的知识,希望对你有一定的参考价值。
参考技术A在分布式系统下,使用Java中的synchronized或者Lock已经不能满足需求了。关于分布式锁的实现,我们可以利用MySQL的唯一索引去实现,也可以利用Redis的SETNX,同样也可以使用Zookeeper的节点唯一路径去实现。
(1)线程先去 /locks 路径下面创建一个带序号的临时节点。
(2)判断自己创建的这个节点是不是 /locks 路径下序号最小的节点,如果是,则获取锁;如果不是,则监听自己的前一个节点。
(3)获取到锁后,处理自己的业务逻辑,然后删除自己创建的节点。监听它的后一个节点收到通知后,执行步骤(2)
上面的过程是不是跟AQS的同步队列有点像,判断自己是不是队列的头结点,如果是就去获取锁,不是就等待。
按照上面的思路,我们可以很快的使用zookeeper相关的api实现分布式锁。
通过在zookeeper中创建带序号的临时节点,然后判断当前线程创建的临时节点序号是不是最小的,如果是则获得锁,否则监听前一节点。
为什么要创建临时节点,就是怕创建完后,zookeeper服务器又挂了,这时候如果是永久节点,那么就会死锁了。而临时节点在关闭服务器后就会被删除。
这里使用 CountDownLatch 在监听节点的时候进行 await 。节点发生变化时,会调用 process 方法,在 process 方法中进行 countDown 。
进行测试
创建两个线程进行测试,看控制打印输出
官方文档
使用原生API存在的问题
基于以上,一般实际开发都是用Curator去实现,毕竟别人的轮子又大又安全,何必自己搞个破破烂烂的轮子上路呢。
Curator主要实现了下面四种锁
首先需要在项目中添加依赖
然后实现即可
查看控制台输出
zookeeper实现分布式锁-curator
前言
在之前《Redis实现分布式锁》一文中我已经介绍了使用Redis实现分布式锁原理,今天主要是介绍一下如何使用zookeeper实现分布式锁。
一.zookeeper介绍和安装
1.zookeeper介绍
1.1.什么是zookeeper
ZooKeeper由雅虎研发,是Google Chubby的开源实现,目前托管给Apache,是一个经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调服务。
分布式应用程序可以基于ZooKeeper实现数据发布与订阅、负载均衡、命名服务、分布式协调与通知、集群管理、Leader选举、分布式锁、分布式队列等功能
它基于层次型的目录树的数据结构,并通过对树上的节点进行有效管理,可以设计出各种各样的分布式集群管理功能,其本身也是分布式的。
1.2.zookeeper存储方式
Zookeeper会维护一个具有层次关系的树状的数据结构,它非常类似于一个标准的文件系统,如下图所
示:同一个目录下不能有相同名称的目录节点
ZooKeeper 节点是有生命周期的这取决于节点的类型,在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及时序节点(SEQUENTIAL ),具体在节点创建过程中,一般是组合使用,可以生成以下 4 种节点类型。
- 持久节点(PERSISTENT)
所谓持久节点,是指在节点创建后,就一直存在,直到有删除操作来主动清除这个节点——不会因为创建该节点的客户端会话失效而消失。 - 持久顺序节点(PERSISTENT_SEQUENTIAL)
这类节点的基本特性和上面的节点类型是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整型的最大值。 - 临时节点(EPHEMERAL)
和持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。另外,在临时节点下面不能创建子节点。 - 临时顺序节点(EPHEMERAL_SEQUENTIAL)
在临时几点的基础上增加了顺序,可以用来实现分布式锁
顺序节点可以用来为所有的事件进行全局排序,这样客户端可以通过序号推断事件的顺序。
2.zookeeper安装(windows)
2.1.下载zookeeper
下载地址:https://mirrors.bfsu.edu.cn/apache/zookeeper/,下载之后解压即可
2.2.配置zookeeper
解压之后,找到config/ zoo_sample.cfg 文件,复制一份重命名为:zoo.cfg ,这个是zookeeper配置文件,编辑zoo.cfg ,将dataDir 和 dataLogDir 修改成磁盘的某个地址
#数据目录
dataDir=D:/opensource/zookeeper-3.4.9/data
#日志目录
dataLogDir=D:/opensource/zookeeper-3.4.9/log
回到bin目录中,执行zkServer.cmd 启动zookeeper
2.3.测试zookeeper
执行bin/zsCli.cmd 启动zookeeper自带客户端,执行 "ls /"查看根节点
[zk: localhost:2181(CONNECTED) 20] ls /
[zookeeper]
创建节点
[zk: localhost:2181(CONNECTED) 21] create /testkey testvalue
Created /testkey
获取节点值
[zk: localhost:2181(CONNECTED) 23] get /testkey
testvalue
cZxid = 0x82
ctime = Fri May 14 23:15:45 CST 2021
mZxid = 0x82
mtime = Fri May 14 23:15:45 CST 2021
pZxid = 0x82
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
[zk: localhost:2181(CONNECTED) 24]
删除节点
[zk: localhost:2181(CONNECTED) 24] delete /testkey
[zk: localhost:2181(CONNECTED) 25] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 26]
其他命令
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
二.基于Zookeeper的分布式锁
1.zookeeper分布式锁原理
分布式锁就是基于zk的 临时顺序节点+watch监听机制完成的。临时顺序节点特点是客户端断开节点释放,且自己维护节点顺序值,当多个线程同时创建节点我们就可以按照顺序创建N个顺序临时节点,然后依次从第一个往后获取锁。只不过能拿到锁的只能是第一个节点的线程,所以后面的线程需要监听自己上一个节点的节点释放。轮到谁,谁就拿到锁。
2.Java封装基于ZK的分布式锁
2.1.创建项目导入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.0.5.RELEASE</version>
</dependency>
<!--这个版本和zookeeper安装版本一致-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
2.2.封装zookeeper分布式锁
/**======================================================================================
* 方法描述:zookeeper分布式锁
* 逻辑步骤:
实现 AutoCloseable 可自动关闭资源
Watcher观察器
======================================================================================*/
//zookeeper分布式锁
//实现 AutoCloseable 可自动关闭资源
//Watcher观察器
@Slf4j
public class ZookeeperLock implements AutoCloseable, Watcher {
//zookeeper客户端对象
private ZooKeeper zooKeeper;
//当前节点
private String currentNode;
//通过构造器初始化zookeeper
ZookeeperLock(){
try {
//参数:链接字符串 , 会话超时 , 监听器
this.zooKeeper = new ZooKeeper("172.16.2.54:2181",10000,this);
} catch (IOException e) {
e.printStackTrace();
}
}
/**======================================================================================
* 方法描述:提供获取锁的方法: code作为业务相关的编码,比如传入订单ID,就锁住这个订单
* 逻辑步骤:
1.创建根节点
2.进来一个线程,就为线程在zookeeper的根节点创建一个瞬时有序节点
2.
3.
======================================================================================*/
public boolean getLock(String code){
try {
//1.创建根节点,如果不存在,stat为null 就创建
String path = "/" + code;
Stat exists = zooKeeper.exists(path, false);
//创建节点
if(null == exists){
// 节点路径 ,节点值 ,权限:不需要账户密码就能链接 , 创建模式:顺序临时节点
zooKeeper.create(path,code.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//2.进来一个线程,就为线程在zookeeper创建一个瞬时有序节点 , 如: /xx/xx000000001 ;/xx/xx000000002
//把当前节点保存为成员变量,后面用来做判断
currentNode = zooKeeper.create(path+path,code.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//3.对所有的子节点进行排序
List<String> children = zooKeeper.getChildren(path, false);
Collections.sort(children);
//4.如果排序过的children中的第一个和 currentNode 一致,拿到锁
String firstNode = children.get(0);
if(currentNode.endsWith(firstNode)){
return true;
}
//5.如果不是第一个节点,需要监听前一个节点
//用一个临时变量记录当前节点的上一个节点
String previousNode = firstNode;
for(String node : children){
if(currentNode.endsWith(node)){
//如果当前节点是node节点 ,那么就监听它的上一个节点 :比如 currentNode 这里是 0003节点 ,那 node就是 0002节点
//第一个参数是监听的节点,第二个参数是是否要监听,zooKeeper在初始化的时候设置好了监听器
log.info("监听上一个节点:{}",node);
zooKeeper.exists(path+"/"+previousNode,true);
}else{
//把children中的节点复制给上一个节点
previousNode = node;
}
}
//代码到这里,节点已经做好监听了,只需要等待,等待上一个节点完成工作后唤醒他
synchronized (this){
//wait会释放锁
wait();
}
//到这里说明被唤醒,说明获取到锁
log.info("拿到锁:{}",currentNode);
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public void close() {
//释放锁 :节点路径, 节点版本号(-1匹配所有版本)
log.info("释放节点:{}",currentNode);
if(null != currentNode){
try {
zooKeeper.delete(currentNode ,-1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
@Override
public void process(WatchedEvent watchedEvent) {
//当节点被释放,就会到这里被监听到
if(watchedEvent.getType() == Event.EventType.NodeDeleted){
synchronized (this){
//唤醒等待的线程
log.info("当前节点:{},唤醒",watchedEvent.getPath());
notify();
}
}
}
}
2.3.测试代码
public class ZKTest {
@Test
public void testZK() throws Exception {
for(int i = 0 ; i < 10 ; i++){
new Thread(()->{
ZookeeperLock zookeeperLock = new ZookeeperLock();
boolean getLock = zookeeperLock.getLock("order");
System.out.println("是否获取到锁:"+getLock);
zookeeperLock.close();
}).start();
}
Thread.sleep(5000);
}
}
打印日志:
09:18:24.500 [Thread-0] INFO cn.itsource.ZookeeperLock - /order/order0000000014监听上一个节点:/order/order0000000013
09:18:24.500 [Thread-9] INFO cn.itsource.ZookeeperLock - /order/order0000000017监听上一个节点:/order/order0000000016
09:18:24.500 [Thread-1] INFO cn.itsource.ZookeeperLock - /order/order0000000019监听上一个节点:/order/order0000000018
09:18:24.500 [Thread-8] INFO cn.itsource.ZookeeperLock - /order/order0000000018监听上一个节点:/order/order0000000017
09:18:24.500 [Thread-4] INFO cn.itsource.ZookeeperLock - /order/order0000000016监听上一个节点:/order/order0000000015
09:18:24.500 [Thread-2] INFO cn.itsource.ZookeeperLock - /order/order0000000020监听上一个节点:/order/order0000000019
09:18:24.500 [Thread-3] INFO cn.itsource.ZookeeperLock - /order/order0000000013监听上一个节点:/order/order0000000012
09:18:24.500 [Thread-7] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000011
09:18:24.500 [Thread-6] INFO cn.itsource.ZookeeperLock - /order/order0000000015监听上一个节点:/order/order0000000014
09:18:24.500 [Thread-5] INFO cn.itsource.ZookeeperLock - /order/order0000000012监听上一个节点:/order/order0000000011
09:18:24.509 [Thread-5-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000011,唤醒
09:18:24.509 [Thread-5] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000012
是否获取到锁:true
09:18:24.509 [Thread-5] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000012
09:18:24.511 [Thread-3-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000012,唤醒
09:18:24.512 [Thread-3] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000013
是否获取到锁:true
09:18:24.512 [Thread-3] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000013
09:18:24.514 [Thread-0-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000013,唤醒
09:18:24.515 [Thread-0] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000014
是否获取到锁:true
09:18:24.515 [Thread-0] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000014
09:18:24.517 [Thread-6-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000014,唤醒
09:18:24.517 [Thread-6] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000015
是否获取到锁:true
09:18:24.517 [Thread-6] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000015
09:18:24.520 [Thread-4-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000015,唤醒
09:18:24.520 [Thread-4] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000016
是否获取到锁:true
09:18:24.520 [Thread-4] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000016
09:18:24.522 [Thread-9] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000017
是否获取到锁:true
09:18:24.522 [Thread-9] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000017
09:18:24.525 [Thread-8-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000017,唤醒
09:18:24.525 [Thread-8] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000018
是否获取到锁:true
09:18:24.525 [Thread-8] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000018
09:18:24.527 [Thread-1-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000018,唤醒
09:18:24.527 [Thread-1] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000019
是否获取到锁:true
09:18:24.527 [Thread-1] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000019
09:18:24.530 [Thread-2-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000019,唤醒
09:18:24.530 [Thread-2] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000020
是否获取到锁:true
09:18:24.530 [Thread-2] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000020
这个效果看起来是对的。但是如果自己基于zookeeper封装分布式锁未免太过麻烦,而且容易出BUG,Apache提供了一个基于Zookeeper的客户端工具curator已经实现了分布式锁的封装,我们使用它就可以了。
3.使用curator实现分布式锁
3.1.curator介绍
Apache Curator是用于Apache ZooKeeper(一种分布式协调服务)的Java / JVM客户端库。它包括一个高级API框架和实用程序,使使用Apache ZooKeeper变得更加轻松和可靠。解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。
重点是它对分布式锁进行了封装 :http://curator.apache.org/getting-started.html
3.1.导入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.1.0</version>
</dependency>
3.2.配置curator
//初始化方法start
@Bean(initMethod = "start",destroyMethod = "close")
public CuratorFramework curatorFramework(){
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//创建客户端
CuratorFramework client = CuratorFrameworkFactory.newClient("172.16.2.54:2181", retryPolicy);
return client;
}
3.3.分布式锁案例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AppStart.class)
public class ZKTest {
@Autowired
private CuratorFramework curatorFramework;
/**======================================================================================
* 方法描述:使用curator实现分布式锁
======================================================================================*/
@Test
public void testCurator() throws Exception {
for(int i = 0 ; i < 10 ; i++){
new Thread(()->{
testCuratorLock();
}).start();
}
Thread.sleep(5000);
}
public void testCuratorLock(){
//分布式锁
InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/order");
try {
if ( lock.acquire(1, TimeUnit.SECONDS) ){
//处理业务逻辑
log.info("获取到锁");
}
} catch (Exception e) {
e.printStackTrace();
} finally{
try {
//释放锁
log.info("释放锁");
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
打印效果
2021-05-15 09:54:32.251 INFO 22488 --- [ Thread-4] cn.itsource.ZKTest : 获取到锁
2021-05-15 09:54:32.252 INFO 22488 --- [ Thread-4] cn.itsource.ZKTest : 释放锁
2021-05-15 09:54:32.267 INFO 22488 --- [ Thread-9] cn.itsource.ZKTest : 获取到锁
2021-05-15 09:54:32.267 INFO 22488 --- [ Thread-9] cn.itsource.ZKTest : 释放锁
2021-05-15 09:54:32.274 INFO 22488 --- [ Thread-5] cn.itsource.ZKTest : 获取到锁
2021-05-15 09:54:32.274 INFO 22488 --- [ Thread-5] cn.itsource.ZKTest : 释放锁
2021-05-15 09:54:32.277 INFO 22488 --- [ Thread-8] cn.itsource.ZKTest : 获取到锁
2021-05-15 09:54:32.277 INFO 22488 --- [ Thread-8] cn.itsource.ZKTest : 释放锁
2021-05-15 09:54:32.280 INFO 22488 --- [ Thread-6] cn.itsource.ZKTest : 获取到锁
2021-05-15 09:54:32.280 INFO 22488 --- [ Thread-6] cn.itsource.ZKTest : 释放锁
2021-05-15 09:54:32.283 INFO 22488 --- [ Thread-3] cn.itsource.ZKTest : 获取到锁
2021-05-15 09:54:32.283 INFO 22488 --- [ Thread-3] cn.itsource.ZKTest : 释放锁
2021-05-15 09:54:32.285 INFO 22488 --- [ Thread-11] cn.itsource.ZKTest : 获取到锁
2021-05-15 09:54:32.286 INFO 22488 --- [ Thread-11] cn.itsource.ZKTest : 释放锁
2021-05-15 09:54:32.288 INFO 22488 --- [ Thread-10] cn.itsource.ZKTest : 获取到锁
2021-05-15 09:54:32.288 INFO 22488 --- [ Thread-10] cn.itsource.ZKTest : 释放锁
2021-05-15 09:54:32.292 INFO 22488 --- [ Thread-12] cn.itsource.ZKTest : 获取到锁
2021-05-15 09:54:32.292 INFO 22488 --- [ Thread-12] cn.itsource.ZKTest : 释放锁
2021-05-15 09:54:32.294 INFO 22488 --- [ Thread-7] cn.itsource.ZKTest : 获取到锁
2021-05-15 09:54:32.294 INFO 22488 --- [ Thread-7] cn.itsource.ZKTest : 释放锁
看这个效果是没有问题的 , 好吧到这文章结束,喜欢的话给个好评吧!!!
以上是关于Zookeeper + Curator实现分布式锁的主要内容,如果未能解决你的问题,请参考以下文章
ZooKeeper 分布式锁 Curator 源码 05:分布式读写锁和联锁