zookeeper分布式锁

Posted Tommy Vercetti

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper分布式锁相关的知识,希望对你有一定的参考价值。

当多个客户端对zk集群中的资源进行访问时,为了保持资源访问的有序性和稳健性,在每个客户端进行访问时需要在访问期间保持其对该份资源的独占性,用分布式锁来实现此步骤,当某进程访问资源结束后将会释放掉锁以供下一个节点对数据的访问。

分布式锁的实现思路:

  • 接收到客户端的请求后在/locks节点下创建一个临时带序号的节点。
  • 判断当前创建的节点是否为序号最小的节点,是则获取到锁,否则监听其上一个节点。因为默认是当前/locks节点中序号最小的节点优先获取到锁。
  • 获取到锁并处理完业务后该节点释放掉锁(该序号最小的节点被删除)然后后面的锁升为序号最小的节点,递归前面的步骤。

目前有成熟的分布式锁框架,但是为了巩固基础手写一个,涉及到多线程,编写步骤较为复杂。

package com.tommy.case2;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributeLock {
    private final String connectString = "192.168.20.151:2181,192.168.20.152:2181,192.168.20.153:2181";
    private final int sessionTimeout = 2000000;
    private final ZooKeeper zk;
    private int count = 0;
    private String waitPath;
    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);
    private String currentMode;

    public DistributeLock() throws IOException, InterruptedException, KeeperException {
        // 获取连接
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // connectLatch 如果连接上zk,释放掉
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // waitLatch 需要释放
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        // 等待zk正常连接后,才往下执行,增强代码的健壮性。
        connectLatch.await();

        // 判断根节点是否存在
        try {
            byte[] data = zk.getData("/locks", false, null);
        } catch (KeeperException.NoNodeException e) {
            System.out.println("directory /locks is not exists, creating....");
            zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("directory /locks creating success!!!!");
        }
    }

    public void zkLock() throws InterruptedException, KeeperException {
        // 创建节点(临时带序号的节点)
        currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        // 判断是否为序号最小的节点,如果是则获取到锁,否则对上一个节点进行监听
        List<String> children = zk.getChildren("/locks", false);
        count = children.size();
        if (count == 1) {
            return;
        } else {
            Collections.sort(children);
            String thisNode = currentMode.substring("/locks/".length());
            // 获取当前节点到在集合children中的位置
            int index = children.indexOf(thisNode);
            if (index == -1) {
                System.out.println("error Data");
            } else if (index == 0) {
                // only one node in children
                return;
            } else {
                // listen last node's change
                waitPath = "/locks/" + children.get(index - 1);
                zk.getData(waitPath, true, null);
                waitLatch.await();
                return;
            }
        }
    }

    public void unZkLock() {
        // 删除节点以释放掉锁
        try {
            zk.delete(currentMode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}
package com.tommy.case2;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributeLockTest {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        final DistributeLock lock1 = new DistributeLock();
        final DistributeLock lock2 = new DistributeLock();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.zkLock();
                    System.out.println("线程1启动,获取到锁");
                    Thread.sleep(5 * 1000);
                    lock1.unZkLock();
                    System.out.println("线程1释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }).start();


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zkLock();
                    System.out.println("线程2启动,获取到锁");
                    Thread.sleep(5 * 1000);
                    lock2.unZkLock();
                    System.out.println("线程2释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

以上是关于zookeeper分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper分布式锁

zooKeeper实现分布式锁

ZooKeeper分布式锁简单实践

基于zookeeper的分布式锁实现

zookeeper怎么实现分布式锁

基于Zookeeper实现的分布式互斥锁 - InterProcessMutex