基于redis的分布式锁
Posted 水田如雅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于redis的分布式锁相关的知识,希望对你有一定的参考价值。
java使用redisson的客户端
String lockKey = StringUtils.join(new String[]qid, SafeConverter.toString(resultGroupId), ":");
RLock lock = client.getNativeClient().getLock(lockKey);
try
lock.lock();
//这里写具体的业务逻辑操作
finally
lock.unlock();
reddison的客户端提供了安全的加锁解锁方式,并且底层实现了锁续命的逻辑。
底层lua加锁实现:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command)
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
此外,获取不到时候,自旋方式获取锁:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null)
return;
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try
while (true)
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null)
break;
// waiting for message
if (ttl >= 0)
try
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
catch (InterruptedException e)
if (interruptibly)
throw e;
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
else
if (interruptibly)
getEntry(threadId).getLatch().acquire();
else
getEntry(threadId).getLatch().acquireUninterruptibly();
finally
unsubscribe(future, threadId);
// get(lockAsync(leaseTime, unit));
锁续命实现:
private void renewExpiration()
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null)
return;
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask()
@Override
public void run(Timeout timeout) throws Exception
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null)
return;
Long threadId = ent.getFirstThreadId();
if (threadId == null)
return;
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) ->
if (e != null)
log.error("Can't update lock " + getName() + " expiration", e);
return;
if (res)
// reschedule itself
renewExpiration();
);
, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
在获取到锁并且使用锁的期间,每10s检查一次,防止因为业务逻辑执行时间过长,造成锁的超时自动释放锁。
redis主从架构锁失效问题
- 问题描述:
在master-slave架构中,主节点往从结点同步数据,是异步同步的,客户端去主结点请求加锁,成功后,主节点会将加锁信息异步同步给从结点。当主节点还未来得及同步,挂掉了,从结点中某个继承了主节点的皇位,这时候,那个加锁的key已经在系统中不见了,其他线程过来请求加锁,是能够成功的。
- 解决策略
参考zk的分布式锁失效策略,超过半数以上结点加锁成功,才能算锁成功。
redlock
redisson客户端提供了分布式的redlock来解决上面的问题:redlock
以上是关于基于redis的分布式锁的主要内容,如果未能解决你的问题,请参考以下文章