Curator使用:分布式锁
Posted june777
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Curator使用:分布式锁相关的知识,希望对你有一定的参考价值。
分布式锁介绍
分布式执行一些不需要同时执行的复杂任务,curator利用zk的特质,实现了这个选举过程。其实就是利用了多个zk客户端在同一个位置建节点,只会有一个客户端建立成功这个特性。来实现同一时间,只会选择一个客户端执行任务
代码
//分布式锁
InterProcessMutex lock = new InterProcessMutex(cc,"/lock_path");
CountDownLatch down = new CountDownLatch(1);
for (int i = 0; i < 30; i++) {
new Thread(()->{
try {
down.await();
lock.acquire();
} catch (Exception e) {
e.printStackTrace();
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
System.out.println(sdf.format(new Date()));
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
down.countDown();
InterProcessMutex 是一个可重入的排他锁,获取锁的过程是通过往ZK下面成功建立节点来实现的,下面是获取锁的过程
//获取当前线程
Thread currentThread = Thread.currentThread();
//获取当前线程的锁数据
LockData lockData = threadData.get(currentThread);
//如果不为null,则将锁的数量+1,实现可重入
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
//第一次会到这里,尝试在我们之前设置的路径下建立节点
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
//建立成功后就初始化lockdata 并按当前线程进行保存,所以可以通过创建多个thread来模拟锁竞争,而不需要建多个client。
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
下面是attemptLock的重要代码
while ( !isDone )
{
isDone = true;
try
{
//建立节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//下面就是获取锁和加锁的循环了
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
//session过期时走这里,按策略处理,允许重试就重试,否则就抛出异常
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
下面是internalLockLoop的重要代码
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//排序是因为要实现公平锁,加上maxleases参数限制取首位
List<String> children = getSortedChildren();
//得到子节点名称,比如 _c_ce2a26cb-9721-4f56-91fd-a6d00b00b12c-lock-0000000030
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//获取是否获得锁的状态,重要方法,maxLeases为1,后面要通过这个参数进行对比,通过判断小于这个来实现 公平锁
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
//如果没有获得锁,就给当前节点加个watcher,继续等待,一旦被删掉就调用这个watcher notifyall。
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// 这个watcher只有一个作用就是唤醒其他线程进行竞争 notifyAll();
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
...
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
以上是关于Curator使用:分布式锁的主要内容,如果未能解决你的问题,请参考以下文章