源码分析:Redisson 分布式锁过程分析

Posted 程序员架构进阶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码分析:Redisson 分布式锁过程分析相关的知识,希望对你有一定的参考价值。

封面图来自:Redisson性能压测权威发布 http://www.redis.cn/articles/20170704108.html

一 摘要

    Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。

    通常使用最为广泛的就是它提供的基于Redis的分布式锁功能。本篇也集中对Redisson的分布式锁实现进行分析。

   使用的Redisson版本为3.12.2,maven引入依赖信息:

<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.12.2</version></dependency>

二 锁过程源码

如下代码所示,是我们适用Redisson获取和释放分布式锁的一个demo:

RedissonClient redisson = Redisson.create();RLock lock = redisson.getLock("anyLock");lock.lock();// 其他代码....lock.unlock();

其中,Redisson.create();是默认的创建方法,内容为:

public static RedissonClient create() { Config config = new Config(); ((SingleServerConfig)config.useSingleServer().setTimeout(1000000)).setAddress("redis://127.0.0.1:6379"); return create(config);}

可见,这里使用了本地的redis集群,和默认的6379端口。

   这里重点分析加锁过程,也就是lock.lock(); 方法部分,来看Redisson是怎样实现加锁,以及可能得锁续期等watchdog的动作,下面是RedissonLock类中的lock()方法:

public void lock() { try { this.lock(-1L, (TimeUnit)null, false); } catch (InterruptedException var2) { throw new IllegalStateException(); }}

这里继续向下调用了一个含参数的lock()方法,设置了释放时间(默认设置了-1),TimeUnit(null),是否可中断(false),我们继续看这个方法:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = this.tryAcquire(leaseTime, unit, threadId); if (ttl != null) { RFuture<RedissonLockEntry> future = this.subscribe(threadId); if (interruptibly) { this.commandExecutor.syncSubscriptionInterrupted(future); } else { this.commandExecutor.syncSubscription(future); } try { while(true) { ttl = this.tryAcquire(leaseTime, unit, threadId); if (ttl == null) { return; } if (ttl >= 0L) { try { ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException var13) { if (interruptibly) { throw var13; } ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else if (interruptibly) { ((RedissonLockEntry)future.getNow()).getLatch().acquire(); } else { ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly(); } } } finally { this.unsubscribe(future, threadId); } }}

这一部分代码较长,我们按照步骤整理一下:

1、获取当前线程的线程id;

2、tryAquire尝试获取锁,并返回ttl

3、如果ttl为空,则结束流程;否则进入后续逻辑;

4、this.subscribe(threadId)订阅当前线程,返回一个RFuture;

5、下一步涉及是否可中断标记的判断,如果可中断,调用