Zookeeper + Curator实现分布式锁

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper + Curator实现分布式锁相关的知识,希望对你有一定的参考价值。

参考技术A

在分布式系统下,使用Java中的synchronized或者Lock已经不能满足需求了。关于分布式锁的实现,我们可以利用MySQL的唯一索引去实现,也可以利用Redis的SETNX,同样也可以使用Zookeeper的节点唯一路径去实现。

(1)线程先去 /locks 路径下面创建一个带序号的临时节点。

(2)判断自己创建的这个节点是不是 /locks 路径下序号最小的节点,如果是,则获取锁;如果不是,则监听自己的前一个节点。

(3)获取到锁后,处理自己的业务逻辑,然后删除自己创建的节点。监听它的后一个节点收到通知后,执行步骤(2)

上面的过程是不是跟AQS的同步队列有点像,判断自己是不是队列的头结点,如果是就去获取锁,不是就等待。

按照上面的思路,我们可以很快的使用zookeeper相关的api实现分布式锁。

通过在zookeeper中创建带序号的临时节点,然后判断当前线程创建的临时节点序号是不是最小的,如果是则获得锁,否则监听前一节点。

为什么要创建临时节点,就是怕创建完后,zookeeper服务器又挂了,这时候如果是永久节点,那么就会死锁了。而临时节点在关闭服务器后就会被删除。

这里使用 CountDownLatch 在监听节点的时候进行 await 。节点发生变化时,会调用 process 方法,在 process 方法中进行 countDown 。

进行测试

创建两个线程进行测试,看控制打印输出

官方文档

使用原生API存在的问题

基于以上,一般实际开发都是用Curator去实现,毕竟别人的轮子又大又安全,何必自己搞个破破烂烂的轮子上路呢。

Curator主要实现了下面四种锁

首先需要在项目中添加依赖

然后实现即可

查看控制台输出

zookeeper实现分布式锁-curator

前言

在之前《Redis实现分布式锁》一文中我已经介绍了使用Redis实现分布式锁原理,今天主要是介绍一下如何使用zookeeper实现分布式锁。

一.zookeeper介绍和安装

1.zookeeper介绍

1.1.什么是zookeeper

ZooKeeper由雅虎研发,是Google Chubby的开源实现,目前托管给Apache,是一个经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调服务。

分布式应用程序可以基于ZooKeeper实现数据发布与订阅、负载均衡、命名服务、分布式协调与通知、集群管理、Leader选举、分布式锁、分布式队列等功能
它基于层次型的目录树的数据结构,并通过对树上的节点进行有效管理,可以设计出各种各样的分布式集群管理功能,其本身也是分布式的。

1.2.zookeeper存储方式

Zookeeper会维护一个具有层次关系的树状的数据结构,它非常类似于一个标准的文件系统,如下图所
示:同一个目录下不能有相同名称的目录节点
在这里插入图片描述
ZooKeeper 节点是有生命周期的这取决于节点的类型,在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及时序节点(SEQUENTIAL ),具体在节点创建过程中,一般是组合使用,可以生成以下 4 种节点类型。

  • 持久节点(PERSISTENT)
    所谓持久节点,是指在节点创建后,就一直存在,直到有删除操作来主动清除这个节点——不会因为创建该节点的客户端会话失效而消失。
  • 持久顺序节点(PERSISTENT_SEQUENTIAL)
    这类节点的基本特性和上面的节点类型是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整型的最大值。
  • 临时节点(EPHEMERAL)
    和持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。另外,在临时节点下面不能创建子节点。
  • 临时顺序节点(EPHEMERAL_SEQUENTIAL)
    在临时几点的基础上增加了顺序,可以用来实现分布式锁

顺序节点可以用来为所有的事件进行全局排序,这样客户端可以通过序号推断事件的顺序。

2.zookeeper安装(windows)

2.1.下载zookeeper

下载地址:https://mirrors.bfsu.edu.cn/apache/zookeeper/,下载之后解压即可

2.2.配置zookeeper

解压之后,找到config/ zoo_sample.cfg 文件,复制一份重命名为:zoo.cfg ,这个是zookeeper配置文件,编辑zoo.cfg ,将dataDir 和 dataLogDir 修改成磁盘的某个地址

#数据目录
dataDir=D:/opensource/zookeeper-3.4.9/data
#日志目录
dataLogDir=D:/opensource/zookeeper-3.4.9/log

回到bin目录中,执行zkServer.cmd 启动zookeeper
在这里插入图片描述

2.3.测试zookeeper

执行bin/zsCli.cmd 启动zookeeper自带客户端,执行 "ls /"查看根节点

[zk: localhost:2181(CONNECTED) 20] ls /
[zookeeper]

创建节点

[zk: localhost:2181(CONNECTED) 21] create /testkey testvalue
Created /testkey

获取节点值

[zk: localhost:2181(CONNECTED) 23] get /testkey
testvalue
cZxid = 0x82
ctime = Fri May 14 23:15:45 CST 2021
mZxid = 0x82
mtime = Fri May 14 23:15:45 CST 2021
pZxid = 0x82
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 9
numChildren = 0
[zk: localhost:2181(CONNECTED) 24]

删除节点

[zk: localhost:2181(CONNECTED) 24] delete /testkey
[zk: localhost:2181(CONNECTED) 25] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 26]

其他命令

stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port

二.基于Zookeeper的分布式锁

1.zookeeper分布式锁原理

分布式锁就是基于zk的 临时顺序节点+watch监听机制完成的。临时顺序节点特点是客户端断开节点释放,且自己维护节点顺序值,当多个线程同时创建节点我们就可以按照顺序创建N个顺序临时节点,然后依次从第一个往后获取锁。只不过能拿到锁的只能是第一个节点的线程,所以后面的线程需要监听自己上一个节点的节点释放。轮到谁,谁就拿到锁。
在这里插入图片描述

2.Java封装基于ZK的分布式锁

2.1.创建项目导入依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.0.5.RELEASE</version>
        </dependency>
        <!--这个版本和zookeeper安装版本一致-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.9</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
    </dependencies>

2.2.封装zookeeper分布式锁

/**======================================================================================
 * 方法描述:zookeeper分布式锁
 * 逻辑步骤:
 实现 AutoCloseable 可自动关闭资源
 Watcher观察器
 ======================================================================================*/
//zookeeper分布式锁
//实现 AutoCloseable 可自动关闭资源
//Watcher观察器
@Slf4j
public class ZookeeperLock implements AutoCloseable, Watcher {

    //zookeeper客户端对象
    private ZooKeeper zooKeeper;

    //当前节点
    private String currentNode;

    //通过构造器初始化zookeeper
    ZookeeperLock(){
        try {
            //参数:链接字符串 , 会话超时 , 监听器
            this.zooKeeper = new ZooKeeper("172.16.2.54:2181",10000,this);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**======================================================================================
     * 方法描述:提供获取锁的方法: code作为业务相关的编码,比如传入订单ID,就锁住这个订单
     * 逻辑步骤:
     1.创建根节点
     2.进来一个线程,就为线程在zookeeper的根节点创建一个瞬时有序节点
     2.
     3.
     ======================================================================================*/
    public boolean getLock(String code){
        try {
            //1.创建根节点,如果不存在,stat为null 就创建
            String path = "/" + code;
            Stat exists = zooKeeper.exists(path, false);
            //创建节点
            if(null == exists){
                // 节点路径 ,节点值 ,权限:不需要账户密码就能链接 , 创建模式:顺序临时节点
                zooKeeper.create(path,code.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }

            //2.进来一个线程,就为线程在zookeeper创建一个瞬时有序节点 , 如: /xx/xx000000001 ;/xx/xx000000002
            //把当前节点保存为成员变量,后面用来做判断
            currentNode = zooKeeper.create(path+path,code.getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            //3.对所有的子节点进行排序
            List<String> children = zooKeeper.getChildren(path, false);
            Collections.sort(children);

            //4.如果排序过的children中的第一个和 currentNode 一致,拿到锁
            String firstNode = children.get(0);
            if(currentNode.endsWith(firstNode)){
                return true;
            }

            //5.如果不是第一个节点,需要监听前一个节点
            //用一个临时变量记录当前节点的上一个节点
            String previousNode = firstNode;

            for(String node : children){
                if(currentNode.endsWith(node)){
                    //如果当前节点是node节点 ,那么就监听它的上一个节点 :比如 currentNode 这里是  0003节点 ,那 node就是 0002节点
                    //第一个参数是监听的节点,第二个参数是是否要监听,zooKeeper在初始化的时候设置好了监听器
                    log.info("监听上一个节点:{}",node);
                    zooKeeper.exists(path+"/"+previousNode,true);
                }else{
                    //把children中的节点复制给上一个节点
                    previousNode = node;
                }
            }
            //代码到这里,节点已经做好监听了,只需要等待,等待上一个节点完成工作后唤醒他
            synchronized (this){
                //wait会释放锁
                wait();
            }
            //到这里说明被唤醒,说明获取到锁
            log.info("拿到锁:{}",currentNode);
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }


    @Override
    public void close()  {
        //释放锁 :节点路径, 节点版本号(-1匹配所有版本)
        log.info("释放节点:{}",currentNode);
        if(null != currentNode){
            try {
                zooKeeper.delete(currentNode ,-1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        //当节点被释放,就会到这里被监听到

        if(watchedEvent.getType() == Event.EventType.NodeDeleted){
            synchronized (this){
                //唤醒等待的线程
                log.info("当前节点:{},唤醒",watchedEvent.getPath());
                notify();
            }
        }
    }
}

2.3.测试代码

public class ZKTest {
    
    @Test
    public void testZK() throws Exception {
        for(int i = 0 ; i < 10 ; i++){
            new Thread(()->{
                ZookeeperLock zookeeperLock = new ZookeeperLock();
                boolean getLock = zookeeperLock.getLock("order");
                System.out.println("是否获取到锁:"+getLock);
                zookeeperLock.close();
            }).start();
        }
        Thread.sleep(5000);
    }
}

打印日志:

09:18:24.500 [Thread-0] INFO cn.itsource.ZookeeperLock - /order/order0000000014监听上一个节点:/order/order0000000013
09:18:24.500 [Thread-9] INFO cn.itsource.ZookeeperLock - /order/order0000000017监听上一个节点:/order/order0000000016
09:18:24.500 [Thread-1] INFO cn.itsource.ZookeeperLock - /order/order0000000019监听上一个节点:/order/order0000000018
09:18:24.500 [Thread-8] INFO cn.itsource.ZookeeperLock - /order/order0000000018监听上一个节点:/order/order0000000017
09:18:24.500 [Thread-4] INFO cn.itsource.ZookeeperLock - /order/order0000000016监听上一个节点:/order/order0000000015
09:18:24.500 [Thread-2] INFO cn.itsource.ZookeeperLock - /order/order0000000020监听上一个节点:/order/order0000000019
09:18:24.500 [Thread-3] INFO cn.itsource.ZookeeperLock - /order/order0000000013监听上一个节点:/order/order0000000012
09:18:24.500 [Thread-7] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000011
09:18:24.500 [Thread-6] INFO cn.itsource.ZookeeperLock - /order/order0000000015监听上一个节点:/order/order0000000014
09:18:24.500 [Thread-5] INFO cn.itsource.ZookeeperLock - /order/order0000000012监听上一个节点:/order/order0000000011

09:18:24.509 [Thread-5-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000011,唤醒
09:18:24.509 [Thread-5] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000012
是否获取到锁:true
09:18:24.509 [Thread-5] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000012

09:18:24.511 [Thread-3-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000012,唤醒

09:18:24.512 [Thread-3] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000013
是否获取到锁:true
09:18:24.512 [Thread-3] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000013

09:18:24.514 [Thread-0-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000013,唤醒
09:18:24.515 [Thread-0] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000014

是否获取到锁:true
09:18:24.515 [Thread-0] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000014

09:18:24.517 [Thread-6-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000014,唤醒
09:18:24.517 [Thread-6] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000015
是否获取到锁:true
09:18:24.517 [Thread-6] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000015

09:18:24.520 [Thread-4-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000015,唤醒
09:18:24.520 [Thread-4] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000016
是否获取到锁:true
09:18:24.520 [Thread-4] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000016

09:18:24.522 [Thread-9] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000017
是否获取到锁:true
09:18:24.522 [Thread-9] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000017

09:18:24.525 [Thread-8-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000017,唤醒
09:18:24.525 [Thread-8] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000018
是否获取到锁:true
09:18:24.525 [Thread-8] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000018

09:18:24.527 [Thread-1-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000018,唤醒
09:18:24.527 [Thread-1] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000019
是否获取到锁:true
09:18:24.527 [Thread-1] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000019

09:18:24.530 [Thread-2-EventThread] INFO cn.itsource.ZookeeperLock - 当前节点:/order/order0000000019,唤醒
09:18:24.530 [Thread-2] INFO cn.itsource.ZookeeperLock - 拿到锁:/order/order0000000020
是否获取到锁:true
09:18:24.530 [Thread-2] INFO cn.itsource.ZookeeperLock - 释放节点:/order/order0000000020

这个效果看起来是对的。但是如果自己基于zookeeper封装分布式锁未免太过麻烦,而且容易出BUG,Apache提供了一个基于Zookeeper的客户端工具curator已经实现了分布式锁的封装,我们使用它就可以了。

3.使用curator实现分布式锁

3.1.curator介绍

Apache Curator是用于Apache ZooKeeper(一种分布式协调服务)的Java / JVM客户端库。它包括一个高级API框架和实用程序,使使用Apache ZooKeeper变得更加轻松和可靠。解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。

重点是它对分布式锁进行了封装 :http://curator.apache.org/getting-started.html

3.1.导入依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.1.0</version>
</dependency>

3.2.配置curator

	//初始化方法start
    @Bean(initMethod = "start",destroyMethod = "close")
    public CuratorFramework curatorFramework(){
        //重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        //创建客户端
        CuratorFramework client = CuratorFrameworkFactory.newClient("172.16.2.54:2181", retryPolicy);
        return client;
    }

3.3.分布式锁案例

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AppStart.class)
public class ZKTest {

    @Autowired
    private CuratorFramework curatorFramework;

    /**======================================================================================
     * 方法描述:使用curator实现分布式锁
     ======================================================================================*/
    @Test
    public void testCurator() throws Exception {
        for(int i = 0 ; i < 10 ; i++){
            new Thread(()->{
                testCuratorLock();
            }).start();
        }
        Thread.sleep(5000);
    }
    public void testCuratorLock(){
        //分布式锁
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/order");
        try {
            if ( lock.acquire(1, TimeUnit.SECONDS) ){
                //处理业务逻辑
                log.info("获取到锁");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally{
            try {
                //释放锁
                log.info("释放锁");
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

打印效果

2021-05-15 09:54:32.251  INFO 22488 --- [       Thread-4] cn.itsource.ZKTest                       : 获取到锁
2021-05-15 09:54:32.252  INFO 22488 --- [       Thread-4] cn.itsource.ZKTest                       : 释放锁
2021-05-15 09:54:32.267  INFO 22488 --- [       Thread-9] cn.itsource.ZKTest                       : 获取到锁
2021-05-15 09:54:32.267  INFO 22488 --- [       Thread-9] cn.itsource.ZKTest                       : 释放锁
2021-05-15 09:54:32.274  INFO 22488 --- [       Thread-5] cn.itsource.ZKTest                       : 获取到锁
2021-05-15 09:54:32.274  INFO 22488 --- [       Thread-5] cn.itsource.ZKTest                       : 释放锁
2021-05-15 09:54:32.277  INFO 22488 --- [       Thread-8] cn.itsource.ZKTest                       : 获取到锁
2021-05-15 09:54:32.277  INFO 22488 --- [       Thread-8] cn.itsource.ZKTest                       : 释放锁
2021-05-15 09:54:32.280  INFO 22488 --- [       Thread-6] cn.itsource.ZKTest                       : 获取到锁
2021-05-15 09:54:32.280  INFO 22488 --- [       Thread-6] cn.itsource.ZKTest                       : 释放锁
2021-05-15 09:54:32.283  INFO 22488 --- [       Thread-3] cn.itsource.ZKTest                       : 获取到锁
2021-05-15 09:54:32.283  INFO 22488 --- [       Thread-3] cn.itsource.ZKTest                       : 释放锁
2021-05-15 09:54:32.285  INFO 22488 --- [      Thread-11] cn.itsource.ZKTest                       : 获取到锁
2021-05-15 09:54:32.286  INFO 22488 --- [      Thread-11] cn.itsource.ZKTest                       : 释放锁
2021-05-15 09:54:32.288  INFO 22488 --- [      Thread-10] cn.itsource.ZKTest                       : 获取到锁
2021-05-15 09:54:32.288  INFO 22488 --- [      Thread-10] cn.itsource.ZKTest                       : 释放锁
2021-05-15 09:54:32.292  INFO 22488 --- [      Thread-12] cn.itsource.ZKTest                       : 获取到锁
2021-05-15 09:54:32.292  INFO 22488 --- [      Thread-12] cn.itsource.ZKTest                       : 释放锁
2021-05-15 09:54:32.294  INFO 22488 --- [       Thread-7] cn.itsource.ZKTest                       : 获取到锁
2021-05-15 09:54:32.294  INFO 22488 --- [       Thread-7] cn.itsource.ZKTest                       : 释放锁

看这个效果是没有问题的 , 好吧到这文章结束,喜欢的话给个好评吧!!!

以上是关于Zookeeper + Curator实现分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

ZooKeeper 分布式锁 Curator 源码 05:分布式读写锁和联锁

Zookeeper + Curator实现分布式锁

分布式锁-curator实现

ZooKeeper学习ZooKeeper实现分布式锁

SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理

zookeeperApache curator的使用及zk分布式锁实现