分布式前修课:Zookeeper锁实现方式

Posted 俗世游子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式前修课:Zookeeper锁实现方式相关的知识,希望对你有一定的参考价值。

前言

聊完mysql和Redis,我们接下来在聊一聊Zookeeper。相信大家都已经发现了,这些都是我们在开发过程非常常用的技术。搞定他们,一切难题都不在话下。

Zookeeper,盘它

官网是我们学习某一种技术框架的第一手资料,通过官网我们能挖掘到该框架的最新动态

What Is Zookeeper

Zookeeper是一款主要解决分布式协调的服务框架,可以用来维护配置信息、命名、提供分布式同步和服务提供等功能。Zookeeper基于ZAB【ZooKeeper 原子广播】协议,支持高可用。

Zookeeper的设计

设计目标

Zookeeper的设计很简单,其目的就是为了:

  • 减轻分布式应用程序实现协调服务的压力,允许分布式进程通过共享的分层命名空间相互协调

而在Zookeeper中的文件存储可以称为:znodes,类似于Linux下的目录和文件;而不同的一点是:ZooKeeper 数据保存在内存中。这样也就意味着Zookeeper自身可以实现实现高吞吐量和低延迟

命名空间设计

Zookeeper中名称全部由斜杠【/】 分隔的一系列路径元素,命名空间中的每个节点都由路径标识。而每个节点都可以拥有与其关联的数据以及子节点。这就像拥有一个允许文件也成为目录的文件系统

专业点来说Zookeeper中的每一个节点都可以称为znode, 主要分为两类:

  • 持久节点:【节点只要创建就存在,除非手动删除】
  • 临时节点:【只要创建znode的会话处于活动状态,那么当前节点就会存在;当会话结束,临时节点自动删除】

Zookeeper提供了监听/回调的机制,当客户端对znode进行操作之后,会触发watch机制,客户端受到znode已经改变的数据包。

稳定大局

对Zookeeper有一点了解之后,我们就要开始使用它了,我们使用它的目的是为了实现分布式锁。那么我们先来搞定基础环境

环境规划

我们这里使用的Zookeeper版本:3.6.2

node ip port
zookeeper 192.168.10.200 2181

单机环境

环境规划完成之后,接下来就看我操作吧。

 wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
 # 解压
 tar xf apache-zookeeper-3.6.2-bin.tar.gz -C /usr/local/
 ​
 # 进入到/usr/local下,改个名字
 mv apache-zookeeper-3.6.2-bin/ zookeeper-3.6.2

其实到这里环境就已经安装完成了,下面就是针对Zookeeper的配置

 # 配置文件全部存放在conf下,并且我们需要将模板配置换成`zoo.cfg`,不然无法生效
 cd /usr/local/zookeeper-3.6.2/conf && cp ./zoo_sample.cfg ./zoo.cfg
 ​
 vim zoo.cfg
  # 默认在tmp下,但是tmp属于系统临时文件目录,我们最好进行修改
  dataDir=/var/data/bigdata/zookeeper

按照zoo.cfg中的配置,我们也只需要改动dataDir的目录就可以了,其他的暂时默认就好

环境变量配置

 # 编辑配置
 vim /etc/profile
  export ZOOKEEPER_HOME=/usr/local/zookeeper-3.6.2
  export JAVA_HOME=/usr/java/jdk1.8.0_221-amd64
  export PATH=$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH
  
 # 使其生效
 source /etc/profile

下面就开始启动阶段了

 # 以下为启动的全部命令
 zkServer.sh [--config <conf-dir>] start|start-foreground|stop|version|restart|status|print-cmd
 ​
 # 启动:这里已经将Zookeeper加入到了环境变量中
 zkServer.sh start
 # 展示启动状态
 zkServer.sh status

集群配置

集群配置环境下,需要改变两个地方:

第一步:在zoo.cfg配置文件中添加集群节点的配置

 server.1=192.168.10.200:2181:2888:3888
 server.2=192.168.10.201:2181:2888:3888
 server.3=192.168.10.202:2181:2888:3888

第二步:在各自节点的$dataDir目录下添加myid文件,内容对应上面配置的序号

 echo 1 > myid
 echo 2 > myid
 echo 3 > myid

集群对比单机版只是多了一些配置,其他的没有任何变化。相对比还是非常简单的

客户端操作

Zookeeper提供了命令行的操作方式,通过zkCli.sh来启动,并且操作方式和Linux命令基本相同,下面我们简单演示一下

 # 本地环境可以不配置
 zkCli.sh [-server 127.0.0.1:2181]

下面通过一张图来简单介绍一些Zookeeper的增删改查吧

这其实非常简单的,而且我们并不用搞懂它,毕竟我们在操作的时候并不能直接连到服务器上,下面我们来看看如何通过提供的API来对Zookeeper进行操作吧

锁住它

知其然

在《分布式锁原理》一文中我们曾经介绍过基于Zookeeper实现分布式锁的思路,主要通过Zookeeper的临时节点来实现:

  • 在主节点下每个客户端过来都会注册临时有序节点
  • 每个节点只监听自己前一个节点,如果发现自己是第一个节点,说明已经获取到了锁

而只要客户端断开session连接,临时有序节点自动删除,客户端锁就被释放

知其所以然

下面我们就通过Zookeeper的API来实现一个分布式锁吧。还是老样子,一版自己写,一版看看人家的实现方式。对比一下。

原生代码

 private static final CountDownLatch LATCH = new CountDownLatch(1);
 // 获取ZooKeeper的操作
 public static ZooKeeper getZk() 
     ZooKeeper zooKeeper = null;
     try 
         zooKeeper = new ZooKeeper("192.168.10.200:2181/locks", 1000, event -> 
             switch (event.getState()) 
                 case SyncConnected:
                     // 等到回到 链接成功的事件,就能释放阻塞
                     LATCH.countDown();
                     break;
             
         );
         //Reactor编程模型,返回很快,但是内存中并没有构建完成,所以需要等待
         LATCH.await();
      catch (Exception e) 
         e.printStackTrace();
     
     return zooKeeper;
 

主要代码

public class LockWatchCallback implements Watcher, AsyncCallback.StringCallback, AsyncCallback.ChildrenCallback, AsyncCallback.StatCallback 

    private ZooKeeper zk;
    private String name;
    private String nodePathName;

    private CountDownLatch latch = new CountDownLatch(1);

    public LockWatchCallback(ZooKeeper zk, String name) 
        this.zk = zk;
        this.name = name;
    

    public void lock() 
        /**
         * 创建节点:
         *  path: 如果在192.168.10.200:2181/locks指定了目录,那么这里的 根目录 代表的是 /locks,然后在创建对应的临时节点
         *  ZooDefs.Ids.OPEN_ACL_UNSAFE: 权限:全部开放
         *  CreateMode.EPHEMERAL_SEQUENTIAL: 临时有序节点
         *  StringCallback: 节点创建完成之后的回调
         */
        zk.create("/lock", name.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, name);
        try 
            latch.await();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

    public void unLock() 
        try 
            zk.delete(nodePathName, -1);
         catch (Exception e) 
            e.printStackTrace();
        
    

    @Override
    public void process(WatchedEvent event) 
        switch (event.getType()) 
            // 当节点删除之后,重新拉取一次全部子节点,然后进行监听处理
            case NodeDeleted:
                zk.getChildren("/", false, this, "abc");
                break;
        

    

    // zk.create("/lock", name.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, name); 回调
    @Override
    public void processResult(int rc, String path, Object ctx, String name) 
        if (null != name) 
            nodePathName = name;
            // 得到根节点下创建的节点,我们不需要watch根目录
            zk.getChildren("/", false, this, "abc");
        
    

    // zk.getChildren("/", false, this, "abc"); 回调
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children) 
        // 得到的children是无序的,所以要先做一个排序
        Collections.sort(children);

        // /lock0000000000, 而children中是没有斜线的,所以要截取一下
        int i = 1;
        if ((i = children.indexOf(nodePathName.substring(1))) < 1) 
            // 自己已经是第一个节点了,获取到了锁,开始执行
            try 
                zk.setData("/", this.name.getBytes(StandardCharsets.UTF_8), 1);
             catch (Exception e) 
                e.printStackTrace();
            
            // 释放掉阻塞,让执行
            latch.countDown();
         else 
            // 监控自己的前一个节点是否还存在
            try 
                zk.exists("/" + children.get(i - 1), this, this, "abc");
             catch (Exception e) 
                e.printStackTrace();
            
        
    

    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) 
    

全程采用Zookeeper提供的异步API方式进行回调处理,在每一步回调的地方都添加了注释,看起来是比较方便的。

下面我看一下如何测试:为了能和之前的程序进行统一,做了一个小小的封装,也可以直接使用LockWatchCallback对象来处理锁操作

public class ZookeeperLock extends AbstractLock 

    ZooKeeper zk;
    LockWatchCallback watchCallback;
    public ZookeeperLock(ZooKeeper zk) 
        this.zk = zk;
    

    @Override
    public void start() 
        // 每个线程都需要创建一个临时有序节点,所以每个线程都会new一个watchCallback对象
        watchCallback = new LockWatchCallback(zk, Thread.currentThread().getName());
    

    @Override
    public void lock() 
        // 加锁,创建节点
        this.watchCallback.lock();
    

    @Override
    public void unlock() 
        // 解锁,删除节点
        this.watchCallback.unLock();
    

    @Override
    public void destory() 
        try 
            zk.close();
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

    private static ExecutorService executorService = Executors.newCachedThreadPool();

    public static void main(String[] args) throws InterruptedException 
        int[] count = 0;
        final ZookeeperLock zkLock = new ZookeeperLock(getZk());
        for (int i = 0; i < 100; i++) 
            executorService.submit(() -> 
                zkLock.start();

                zkLock.lock();
                count[0]++;
                zkLock.unlock();
            );
        
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.HOURS);
        System.out.println(count[0]);

        zkLock.destory();
    

成熟框架

那接下来我们就聊一聊成熟的框架是怎么实现分布式锁的:Curator

  • 实现方式是不变的,不过在我们上一版的基础丰富了更多的锁特性,并且实现更加稳定,调用更加方便
public class ZkLock extends AbstractLock 

    private static final Logger LOGGER = LoggerFactory.getLogger(ZkLock.class);

    /**
     * Zookeeper地址 ip:port
     */
    private final String zkAddr;
    /**
     * 总路径
     */
    private final String lockPath;
    private CuratorFramework client;
    private InterProcessLock lock;

    public ZkLock(String zkAddr, String lockPath) 
        this.zkAddr = zkAddr;
        this.lockPath = lockPath;
    

    @Override
    public void lock() 
        try 
            this.lock.acquire();
         catch (Exception e) 
            LOGGER.error("Lock异常,异常信息:", e.getMessage());
        
    

    @Override
    public boolean tryLock() 
        boolean isLocked = false;
        try 
            isLocked = this.lock.acquire(0, TimeUnit.SECONDS);
         catch (Exception e) 
            LOGGER.error("tryLock异常,异常信息:", e.getMessage());
        
        return isLocked;
    

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
        boolean isLocked = false;
        try 
            isLocked = this.lock.acquire(time, unit);
         catch (Exception e) 
            LOGGER.error("tryLock异常,异常信息:", e.getMessage());
        
        return isLocked;
    

    @Override
    public void unlock() 
        try 
            this.lock.release();
         catch (Exception e) 
            LOGGER.error("unlock异常,异常信息:", e.getMessage());
        
    

    @Override
    public void start() 
        client = CuratorFrameworkFactory.newClient(
                this.zkAddr,
                new RetryNTimes(5, 5000)
        );
        client.start();
        if (client.getState() == CuratorFrameworkState.STARTED) 
            LOGGER.info("zk client start successfully!");
            LOGGER.info("zkAddress:,lockPath:", this.zkAddr, lockPath);
         else 
            throw new RuntimeException("客户端启动失败。。。");
        
        this.lock = defaultLock(lockPath);
    

    /**
     * 公平可重入锁
     *
     * @param lockPath 路径
     * @return InterProcessMutex
     */
    InterProcessLock defaultLock(String lockPath) 
        return new InterProcessMutex(client, lockPath);
    

看看这个代码量是不是简洁了很多,虽然简洁,但是功能俱全。我们来验证一下:

private static ExecutorService executorService = Executors.newCachedThreadPool();

public static void main(String[] args) throws InterruptedException 
    ZkLock zkLock = new ZkLock("192.168.10.200:2181","/locks");
    zkLock.start();

    int[] num = 0;
    long start = System.currentTimeMillis();
    for(int i=0;i<200;i++)
        executorService.submit(()->
            try 
                zkLock.lock();
                num[0]++;
             catch (Exception e)
                throw new RuntimeException(e);
             finally 
                zkLock.unlock();
            
        );

    
    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.HOURS);
    System.out.println(num[0]);

完全OK!!!

最后

关于Zookeeper分布式锁的实现我们就介绍到这里。Zookeeper在实际使用中的场景还是非常丰富的,包括分布式协调等功能都在等着大家一一探索。

而关于分布式锁还有最后一个章节就结束了,接下来我们就来了解一下关于后起之秀Etcd的相关操作和Etcd是如何实现分布式锁的。

以上是关于分布式前修课:Zookeeper锁实现方式的主要内容,如果未能解决你的问题,请参考以下文章

分布式前修课:MySQL实现分布式锁

ZooKeeper学习ZooKeeper实现分布式锁

分布式锁的三种实现方式

分布式锁三大技术方案实战——基于zookeeper方式实现分布式锁

7.23-7.29博客精彩回顾

基于redis和zookeeper的分布式锁实现方式