Curator使用:分布式锁

Posted june777

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Curator使用:分布式锁相关的知识,希望对你有一定的参考价值。

分布式锁介绍

分布式执行一些不需要同时执行的复杂任务,curator利用zk的特质,实现了这个选举过程。其实就是利用了多个zk客户端在同一个位置建节点,只会有一个客户端建立成功这个特性。来实现同一时间,只会选择一个客户端执行任务

代码

    //分布式锁
    InterProcessMutex lock = new InterProcessMutex(cc,"/lock_path");
    CountDownLatch down = new CountDownLatch(1);
    for (int i = 0; i < 30; i++) {
        new Thread(()->{
            try {
                down.await();
                lock.acquire();

            } catch (Exception e) {
                e.printStackTrace();
            }
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
            System.out.println(sdf.format(new Date()));
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
    down.countDown();

InterProcessMutex 是一个可重入的排他锁,获取锁的过程是通过往ZK下面成功建立节点来实现的,下面是获取锁的过程

    //获取当前线程
    Thread currentThread = Thread.currentThread();
    //获取当前线程的锁数据
        LockData lockData = threadData.get(currentThread);
    //如果不为null,则将锁的数量+1,实现可重入
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }
    //第一次会到这里,尝试在我们之前设置的路径下建立节点
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    //建立成功后就初始化lockdata 并按当前线程进行保存,所以可以通过创建多个thread来模拟锁竞争,而不需要建多个client。
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;

下面是attemptLock的重要代码

    while ( !isDone )
        {
            isDone = true;

            try
            {
                //建立节点
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                //下面就是获取锁和加锁的循环了
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                //session过期时走这里,按策略处理,允许重试就重试,否则就抛出异常
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }
 

下面是internalLockLoop的重要代码

    while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                //排序是因为要实现公平锁,加上maxleases参数限制取首位
                List<String>        children = getSortedChildren();
                //得到子节点名称,比如 _c_ce2a26cb-9721-4f56-91fd-a6d00b00b12c-lock-0000000030
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                
                //获取是否获得锁的状态,重要方法,maxLeases为1,后面要通过这个参数进行对比,通过判断小于这个来实现 公平锁
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    //如果没有获得锁,就给当前节点加个watcher,继续等待,一旦被删掉就调用这个watcher notifyall。
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            // 这个watcher只有一个作用就是唤醒其他线程进行竞争 notifyAll();
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            ...
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }

以上是关于Curator使用:分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

使用Curator框架实现分布式锁

zookeeperApache curator的使用及zk分布式锁实现

zookeeper实现分布式锁-curator

分布式锁-curator实现

Curator实现分布式锁的基本原理

Curator实现分布式锁的基本原理