基于redis的分布式锁

Posted 水田如雅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于redis的分布式锁相关的知识,希望对你有一定的参考价值。

java使用redisson的客户端

 String lockKey = StringUtils.join(new String[]qid, SafeConverter.toString(resultGroupId), ":");
            RLock lock = client.getNativeClient().getLock(lockKey);
            try 
                lock.lock();
                //这里写具体的业务逻辑操作

             finally 
                lock.unlock();
            

reddison的客户端提供了安全的加锁解锁方式,并且底层实现了锁续命的逻辑。

底层lua加锁实现:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) 
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    

此外,获取不到时候,自旋方式获取锁:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException 
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) 
            return;
        

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try 
            while (true) 
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) 
                    break;
                

                // waiting for message
                if (ttl >= 0) 
                    try 
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                     catch (InterruptedException e) 
                        if (interruptibly) 
                            throw e;
                        
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    
                 else 
                    if (interruptibly) 
                        getEntry(threadId).getLatch().acquire();
                     else 
                        getEntry(threadId).getLatch().acquireUninterruptibly();
                    
                
            
         finally 
            unsubscribe(future, threadId);
        
//        get(lockAsync(leaseTime, unit));
    

锁续命实现:

private void renewExpiration() 
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) 
            return;
        
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() 
            @Override
            public void run(Timeout timeout) throws Exception 
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) 
                    return;
                
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) 
                    return;
                
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> 
                    if (e != null) 
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    
                    
                    if (res) 
                        // reschedule itself
                        renewExpiration();
                    
                );
            
        , internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    

在获取到锁并且使用锁的期间,每10s检查一次,防止因为业务逻辑执行时间过长,造成锁的超时自动释放锁。

redis主从架构锁失效问题

  • 问题描述:

在master-slave架构中,主节点往从结点同步数据,是异步同步的,客户端去主结点请求加锁,成功后,主节点会将加锁信息异步同步给从结点。当主节点还未来得及同步,挂掉了,从结点中某个继承了主节点的皇位,这时候,那个加锁的key已经在系统中不见了,其他线程过来请求加锁,是能够成功的。

  • 解决策略

参考zk的分布式锁失效策略,超过半数以上结点加锁成功,才能算锁成功。

redlock

redisson客户端提供了分布式的redlock来解决上面的问题:redlock

以上是关于基于redis的分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

redis基于redis实现分布式并发锁

基于redis的分布式锁

基于redis的分布式锁的分析与实践

基于redis的分布式锁(不适合用于生产环境)

基于redis实现分布式锁

分布式锁(2) ----- 基于redis的分布式锁