Redisson分布式锁
Posted lisin-lee-cooper
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redisson分布式锁相关的知识,希望对你有一定的参考价值。
一.什么是锁
锁主要是用来实现资源共享同步,只有获取到了锁才能访问该同步代码,否则等待其他线程使用结束释放锁。
二.redis实现分布式锁主要步骤
1.指定一个 key 作为锁标记,存入 Redis 中,指定一个 唯一的用户标识 作为 value;
2.当 key 不存在时才能设置值,确保同一时间只有一个客户端进程获得锁,满足 互斥性 特性。
3.设置一个过期时间,防止因系统异常导致没能删除这个 key,满足 防死锁 特性。
4.当处理完业务之后需要清除这个 key 来释放锁,清除 key 时需要校验 value 值,需要满足 只有加锁的人才能释放锁 。
三.Redisson实现分布式锁原理
加锁是通过lua脚本实现
if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);"
加锁
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException
// 尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null)
return true;
// 申请锁的耗时如果大于等于最大等待时间,则申请锁失败.
time -= System.currentTimeMillis() - current;
if (time <= 0)
acquireFailed(threadId);
return false;
current = System.currentTimeMillis();
/**
* 订阅锁释放事件,并通过 await 方法阻塞等待锁释放, ,一旦锁释放会发消息通知待等待的线程进行竞争.
* 当 this.await 返回 false,等待时间已经超出获取锁最大等待时间,取消订阅并返回失败.
* 当 this.await 返回 true,进入循环尝试获取锁.
*/
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// await 方法内部是用 CountDownLatch 来实现阻塞,获取 subscribe 异步执行的结果
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS))
if (!subscribeFuture.cancel(false))
subscribeFuture.onComplete((res, e) ->
if (e == null)
unsubscribe(subscribeFuture, threadId);
);
acquireFailed(threadId);
return false;
try
// 计算获取锁的总耗时,如果大于等于最大等待时间,则获取锁失败.
time -= System.currentTimeMillis() - current;
if (time <= 0)
acquireFailed(threadId);
return false;
/**
* 收到锁释放的信号后,在最大等待时间之内,循环一次接着一次的尝试获取锁
* 获取锁成功,则立马返回 true,
* 若在最大等待时间之内还没获取到锁,则认为获取锁失败,返回 false 结束循环
*/
while (true)
long currentTime = System.currentTimeMillis();
// 再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null)
return true;
// 超过最大等待时间则返回 false 结束循环,获取锁失败
time -= System.currentTimeMillis() - currentTime;
if (time <= 0)
acquireFailed(threadId);
return false;
/**
* 阻塞等待锁(通过信号量(共享锁)阻塞,等待解锁消息):
*/
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time)
//如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取一个许可(除非被中断或者一直没有可用的许可)。
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
else
//则就在wait time 时间范围内等待可以通过信号量
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
// 更新剩余的等待时间(最大等待时间-已经消耗的阻塞时间)
time -= System.currentTimeMillis() - currentTime;
if (time <= 0)
acquireFailed(threadId);
return false;
finally
// 取消订阅解锁消息
unsubscribe(subscribeFuture, threadId);
续锁
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId)
if (leaseTime != -1)
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) ->
if (e != null)
return;
// lock acquired
if (ttlRemaining == null)
scheduleExpirationRenewal(threadId);
);
return ttlRemainingFuture;
Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果客户端 1 还持有锁 key(判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),那么就会不断的延长锁 key 的生存时间。Watch Dog前提是没有设置锁释放时间
锁释放
protected RFuture<Boolean> unlockInnerAsync(long threadId)
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判断锁 key 是否存在
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 将该客户端对应的锁的 hash 结构的 value 值递减为 0 后再进行删除
// 然后再向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
1.删除锁(这里注意可重入锁,可重入锁重入数减1)。
2.广播释放锁的消息,通知阻塞等待的进程(向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息)。
3.取消 Watch Dog 机制,即将 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的线程 id 删除,并且 cancel 掉 Netty 的那个定时任务线程。
以上是关于Redisson分布式锁的主要内容,如果未能解决你的问题,请参考以下文章