REDISredisson源码分析
Posted xuanxuan96
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了REDISredisson源码分析相关的知识,希望对你有一定的参考价值。
之前《》说到redis的看门狗机制,其中有redisson实现可重入锁,今天趁着没空就来简单分下一下redisson的源码。
首先就是简单建一个springboot项目,这个就不啰嗦。
主要就是引入redisson的jar进pom.xml文件。
mvn上,有普通的,也有springboot starter的,都可以,这里用普通的演示。
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.0</version>
</dependency>
然后就是写一个测试类,主要是用redisson建立连接并完成加锁、解锁操作。
连接的服务器是阿里云的服务器。
代码分析:
1、Config
public Config() {
// 传送方式:默认NIO
this.transportMode = TransportMode.NIO;
// 看门狗时间:默认30秒
this.lockWatchdogTimeout = 30000L;
// 看门狗超时时间:默认10分钟
this.reliableTopicWatchdogTimeout = TimeUnit.MINUTES.toMillis(10L);
// 保持发布订阅
this.keepPubSubOrder = true;
// 是否使用脚本缓存
this.useScriptCache = false;
// 最小清理延迟
this.minCleanUpDelay = 5;
// 最大清理延迟
this.maxCleanUpDelay = 1800;
// 清理key数量
this.cleanUpKeysAmount = 100;
// nettyHook
this.nettyHook = new DefaultNettyHook();
// 是否使用线程类加载器
this.useThreadClassLoader = true;
// 地址解析器组工厂
this.addressResolverGroupFactory = new DnsAddressResolverGroupFactory();
}
Config类主要是一些redisson配置,大多数都有默认值,我们主要看的是看门狗机制
lockWatchdogTimeout ,默认时间是30S,也就是如果在redis的一个分布式锁时间内没有执行完任务,redisson会再次延长30s时间,给当前拥有锁的线程执行代码。
reliableTopicWatchdogTimeout,也就是如果在这个时间范围内,拥有锁线程就算没有执行完逻辑代码,也不会再去加时,当然就很可能失去了锁的控制权。
2、RLOCK
看看获取锁对象方法getLock,返回的是一个new的RedissonLock对象的。
public RLock getLock(String name) {
return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}
RedissonLock对象的构造方法除了自定义的name属性外,还有一些其他的配置属性,都有默认值。
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
// 命令执行器
this.commandExecutor = commandExecutor;
// 内部锁过期时间
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
3.lock方法
public void lock() {
try {
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
加锁方法调用的是无参数方法,但后面都是调用有参数的lock方法。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly)
throws InterruptedException {
// 线程ID
long threadId = Thread.currentThread().getId();
// 尝试获取锁
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
// 获取失败
if (ttl != null) {
// 订阅锁的channel
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
try {
// 不断尝试获取锁
while(true) {
ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
// tt为空,获取成功
if (ttl == null) {
return;
}
// 获取失败
if (ttl >= 0L) {
try {
// 等待ttl时间后再次尝试
((RedissonLockEntry)future.getNow())
.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
if (interruptibly) {
throw var13;
}
// 等待ttl时间后再次尝试
((RedissonLockEntry)future.getNow())
.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
((RedissonLockEntry)future.getNow())
.getLatch().acquire();
} else {
((RedissonLockEntry)future.getNow())
.getLatch().acquireUninterruptibly();
}
}
} finally {
//取消对channel的订阅
this.unsubscribe(future, threadId);
}
}
}
主要是调用tryAcquire来获取锁。
如果返回值ttl为空,则证明加锁成功,返回。
如果不为空,则证明加锁失败。这时候,它会订阅这个锁的Channel,等待锁释放的消息,然后重新尝试获取锁。
流程图
再看回上面的tryAcquire获取锁方法。
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
调用的是tryAcquireAsync方法
private <T> RFuture<Long> tryAcquireAsync(
long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(
waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
RFuture<Long> ttlRemainingFuture =
this.tryLockInnerAsync(
waitTime, this.internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId,
RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
最终实现加锁的是tryLockInnerAsync方法实现。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
return this.evalWriteAsync(
this.getName(),
LongCodec.INSTANCE,
command,
//如果锁不存在,则通过hincrby设置它的值,并设置过期时间
"if (redis.call('exists', KEYS[1]) == 0)
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 如果锁已存在,但并非本线程,则返回过期时间ttl
return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()),
new Object[]{
this.internalLockLeaseTime,
this.getLockName(threadId)
}
);
}
上面这个格式看起来有点怪,注释的都是字符串,都是lua语言这平台bug是有点多的。
也不要因为看不懂lua而慌张,其实语法也挺简单的。
主要是用了exists和hexists。
关于hexists和exists
hexists只用来判断是否存在参数所指定的hash字段,只可以带一个参数,返回值只有1(存在)和0(不存在)两种情况。
exists用来判断key是否存在,只有1组参数时用法和hexists一样,时间复杂度也一样,所以效率没区别。Redis3.0.3之后支持多组参数,返回存在的key的数量。
主要就是三个步骤
首先是exists判断,如果锁不存在,则设置值和过期时间,加锁成功
然后是hexists判断,如果锁已存在,并且锁的是当前线程,则证明是重入锁,加锁成功
如果锁已存在,但锁的不是当前线程,则证明有其他线程持有锁。返回当前锁的过期时间,加锁失败
通过上面的代码可以看出,最终加锁的是还是lua脚本实现。
流程图
4、unlock方法
public void unlock() {
try {
// 解锁
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException var2) {
if (var2.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)var2.getCause();
} else {
throw var2;
}
}
}
实现解锁主要是在unlockAsync方法
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
// 真正的解锁方法
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 取消刷新过期时间的那个定时任务
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
//如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
} else if (opStatus == null) {
IllegalMonitorStateException cause =
new IllegalMonitorStateException(
"attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
//解锁成功
result.trySuccess((Object)null);
}
});
return result;
}
最后解锁的还是lua脚本。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return this.evalWriteAsync(
this.getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
//如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
then return nil;
end;
// 通过hincrby递减1的方式,释放一次锁
// 若剩余次数大于0 ,则刷新过期时间
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0)
then redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
// 否则证明锁已经释放,删除key并发布锁释放的消息
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;",
Arrays.asList(this.getName(), this.getChannelName()),
new Object[]{
LockPubSub.UNLOCK_MESSAGE,
this.internalLockLeaseTime,
this.getLockName(threadId)
}
);
}
看到上面的代码,也可以看出,解锁也是通过lua脚本实现,因为lua脚本可以原子性地运行,加锁解锁一气呵成,就像复制粘贴一样。
主要也是通过判断来执行解锁:
如果锁已经不存在,通过publish发布锁释放的消息,解锁成功
如果解锁的线程和当前锁的线程不是同一个,解锁失败,抛出异常
通过hincrby递减1,先释放一次锁。若剩余次数还大于0,则证明当前锁是重入锁,刷新过期时间;若剩余次数小于0,删除key并发布锁释放的消息,解锁成功
流程图
大概就是这个样子了
快乐是什么
以上是关于REDISredisson源码分析的主要内容,如果未能解决你的问题,请参考以下文章
Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段
Android 事件分发事件分发源码分析 ( Activity 中各层级的事件传递 | Activity -> PhoneWindow -> DecorView -> ViewGroup )(代码片段
mysql jdbc源码分析片段 和 Tomcat's JDBC Pool
Android 逆向ART 脱壳 ( DexClassLoader 脱壳 | DexClassLoader 构造函数 | 参考 Dalvik 的 DexClassLoader 类加载流程 )(代码片段
Android 逆向ART 脱壳 ( DexClassLoader 脱壳 | DexClassLoader 构造函数 | 参考 Dalvik 的 DexClassLoader 类加载流程 )(代码片段