从源码角度分析分布式锁(Redis篇)

Posted 赵晓东-Nastu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从源码角度分析分布式锁(Redis篇)相关的知识,希望对你有一定的参考价值。

一、为什么我们需要分布式锁

        之前我们是单体项目,那个时候已经满足大批量的用户使用了,但是后来有了分布式项目,把项目部署到了不同的服务器那么这个时候我们为了保证一个方法或属性在高并发情况下只能别同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制。

        但是现在变成集群后,由于分布式系统多线程、多进程分布在不同的机器上,所以需要使用一种跨机器的互斥机制来控制共享资源的访问。

二、什么是分布式锁

控制分布式系统有序的去对共享资源进行操作,通过互斥来保持一致性。

三、分布式锁的三种实现方式

1、基于数据库实现分布式锁

1、基于表记录

对于数据库来说,实现分布式锁,最简单的方式就是直接创建一张锁表,然后通过操作表中的数据来实现。当你想要获取锁的时候,在表中添加一个新的记录,想要释放锁的时候就删除这条记录。

1.1、创建一张表

CREATE TABLE `database_lock` (
  `id` BIGINT NOT NULL AUTO_INCREMENT,
  `resource` int NOT NULL COMMENT '锁定的资源',
  `description` varchar(1024) NOT NULL DEFAULT "" COMMENT '描述',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uiq_idx_resource` (`resource`) 
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='数据库分布式锁表';

1.2、利用这张表去获取或者释放锁

(1) 加锁

当我们需要给某个资源添加锁的时候,就插入一条数据

INSERT INTO database_lock(resource, description) VALUES (1, 'lock');

resource字段是唯一索引,多个请求请求添加同一条数据,那么其他的就会报错。

(2)释放锁

释放锁,删除当前这条数据

DELETE FROM database_lock WHERE resource=1;

然后其他资源就可以再次添加去获取这个锁了。

1.3、优缺点

1、没有失效时间
一旦释放锁的操作失败就会导致锁记录一直在数据库中,其它线程无法获得锁。这个缺陷也很好解决,比如可以做一个定时任务去定时清理。
2、依赖数据库
这种锁的可靠性依赖数据库。建议设置备库,避免单点,进一步提高可靠性。
3、非阻塞
这种锁是非阻塞的,因为插入数据失败之后会直接报错,想要获得锁就需要再次操作。如果需要阻塞式的,可以弄个for循环、while循环之类的,直至INSERT成功再返回。
4、非可重入
这种锁也是非可重入的,因为同一个线程在没有释放锁之前无法再次获得锁,因为数据库中已经存在同一份记录了。想要实现可重入锁,可以在数据库中添加一些字段,比如获得锁的主机信息、线程信息等。那么在再次获得锁的时候可以先查询数据,如果当前的主机信息和线程信息等能被查到的话,可以直接把锁分配给它。

2、基于缓存(Redis等)实现分布式锁

2.1、设置唯一键值实现

2.1.1、实现原理

setnx key value——不存在时,添加该键时,如果存在,添加失败。(如果用set的话,key已经存在的话,value直接被覆盖)
使用例子:


//key不存在时,添加该键值对
127.0.0.1:6379> setnx key4 "key44444"
(integer) 1
127.0.0.1:6379> setnx key4 "44"
(integer) 0
127.0.0.1:6379> get key4
"key44444"

在需要获取锁的地方写入键值,然后释放锁的地方删除键值

2.2、实现过程

2.2.1、简单版本

try{
    Boolean result=redisTemplate.opsForValue().setIfAbsent("LOCK","makk");
    if(result){
        ....执行业务逻辑
    }else{
        return "error";
    }
}finally{
    redisTemplate.delete("LOCK");
}

当前版本如果代码中报异常了,锁可以释放,但是如果中间服务宕机了,那么锁就无法释放

2.2.2、设置过期时间的版本

try{
    Boolean result=redisTemplate.opsForValue().setIfAbsent("LOCK","makk",10,TimeUnit.SECONDS);
    if(result){
        ....执行业务逻辑
    }else{
        return "error";
    }
}finally{
    redisTemplate.delete("LOCK");
}

那么这个时候我们设置的过期时间为10秒,但是如果10秒内业务没执行完,也会有新的问题。

可能会导致自己线程加的锁被其他线程释放了。这就是过期时间无法续期导致的
在这里插入图片描述

那么怎么解决这种问题呢?

添加唯一标识:必须在加锁前设定一个雪花算法值,然后在释放锁之前判断雪花算法值是否一致,保证只有自己才能释放自己添加的锁。

2.3、使用redisson实现

2.3.1、引入依赖

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson</artifactId>
  <version>3.11.1</version>
</dependency>

2.3.1、使用

public ItooResult findPaperByStudentIdForOnlineExam(@RequestParam(required = false, defaultValue = "") String examationId,
                                                    @RequestParam(required = false, defaultValue = "") String examineeId,
                                                    @RequestParam(required = false, defaultValue = "") String ip,
                                                    HttpServletRequest request) {
    ......
    //获取锁
    RLock lock = redissonClient.getLock("redisson:lock:stock:" + examineeId);
    try {
        //尝试加锁3分钟
        lock.lock(3, TimeUnit.MINUTES);
        ItooResult paperByStudentIdForOnlineExam = onlineExamService.findPaperByStudentIdForOnlineExam(examationId, clientIp, examineeId, sessionId);
        return ItooResult.build(ItooResult.SUCCESS, "查询成功", paperByStudentIdForOnlineExam);
    } finally {
        //释放锁
        lock.unlock();
    }
}

lock是一种非公平锁——公平与非公平的讲解

2.4、分布式锁实现原理

在这里插入图片描述

使用watch dog进行延迟过期时间,修复了普通分布式锁的缺陷
线程去获取锁,获取成功则执行lua脚本,保存数据到redis数据库。

如果获取失败: 一直通过while循环尝试获取锁(可自定义等待时间,超时后返回失败),获取成功后,执行lua脚本,保存数据到redis数据库。

Redisson提供的分布式锁是支持锁自动续期的,也就是说,如果线程仍旧没有执行完,那么redisson会自动给redis中的目标key延长超时时间(30s),这在Redisson中称之为 Watch Dog 机制。watch dog机制每10秒检测一次。

默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定

2.5、redisson下都有哪些锁

2.5.1、锁类型

首先redisson下的锁都是可重入锁。

//1. 非公平锁——不保证线程的获取顺序

RLock lock = redissonClient.getLock("generalLock");

//2. 公平锁 保证 Redisson 客户端线程将以其请求的顺序获得锁

RLock fairLock = redissonClient.getFairLock("fairLock");

//3. 读写锁 没错与JDK中ReentrantLock的读写锁效果一样

RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
readWriteLock.readLock().lock();
readWriteLock.writeLock().lock();
2.5.2、加锁的方法

// 拿锁失败时会不停的重试
// 具有Watch Dog 自动延期机制 默认续30s 每隔30/3=10 秒续到30s

lock.lock();

// 尝试拿锁,如果10后还没有拿到,停止重试,返回false
// 具有Watch Dog 自动延期机制 默认续30s

boolean res1 = lock.tryLock(10, TimeUnit.SECONDS);

// 拿锁失败时会不停的重试
// 没有Watch Dog ,10s后自动释放

lock.lock(10, TimeUnit.SECONDS);

// 尝试拿锁100s后停止重试,返回false
// 没有Watch Dog ,10s后自动释放

boolean res2 = lock.tryLock(100, 10, TimeUnit.SECONDS);

2.6、加锁和解锁原理

2.6.1、加锁的源码-lock的lock()方法

(1)加锁的方法过程
①调用lock方法

//leaseTime——过期时间z
//unit——时间单位

public void lock(long leaseTime, TimeUnit unit) {
    try {
        this.lockInterruptibly(leaseTime, unit);
    } catch (InterruptedException var5) {
        Thread.currentThread().interrupt();
    }
}

② 调用lockInterruptibly方法

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    //获取当前线程的id
    long threadId = Thread.currentThread().getId();
    //尝试异步获取锁
    Long ttl = this.tryAcquire(leaseTime, unit, threadId);
    if (ttl != null) {
        // 订阅锁
        RFuture<RedissonLockEntry> future = this.subscribe(threadId);
        // 相应中断或不响应中断
        this.commandExecutor.syncSubscription(future);
        try {
            // 进入自旋
            while(true) {
                // 每次自旋都尝试获取锁
                ttl = this.tryAcquire(leaseTime, unit, threadId);
                // 获取成功
                if (ttl == null) {
                    return;
                }
                // 等待锁被释放
                if (ttl >= 0L) {
                    this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    this.getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            // 取消订阅
            this.unsubscribe(future, threadId);
        }
    }
}

③调用 tryAcquire方法


private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}

④ 调用tryAcquireAsync方法

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //当leaseTime = -1 时 启动 watch dog机制
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            public void operationComplete(Future<Long> future) throws Exception {
                if (future.isSuccess()) {
                    Long ttlRemaining = (Long)future.getNow();
                    if (ttlRemaining == null) {
                        RedissonLock.this.scheduleExpirationRenewal(threadId);
                    }
                }
            }
        });
        return ttlRemainingFuture;
    }
}

⑤ 当失效时间不为-1默认值时,调用tryLockInnerAsync方法


<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

⑥ 脚本的参数

(2)tryLockInnerAsync方法中脚本含义:
判断redis中有没有一个叫redisson:lock:stock:sss的key——也就是判断锁有没有被占用,如果没有占用
a. 则执行 hset 写入 Hash 类型数据 key:全局锁名称(锁实例名称-redisson:lock:stock:sss),field:锁拥有者(Redisson客户端ID:线程ID——100b604c-a5fa-496c-b667-9f652d0bd463:46), value:1,
b. 并执行 pexpire 对该 key 设置失效时间,返回空值 nil,至此获取锁成功。

如果键为redisson🔒stock:sss的key存在,说明当前线程之前已经获取到锁
a. 但是这里锁时可重入锁,所以则进一步判断hash中是否有键为100b604c-a5fa-496c-b667-9f652d0bd463:46存在
b. 若存在,则其值加1,并重新设置过期时间
c. 返回的redisson:lock:stock:sss生存时间(毫秒)

2.3.2、解锁的源码
(1)解锁的执行过程
①调用 unlock方法

void unlock();

② unlock方法的具体实现

public void unlock() {
    Boolean opStatus = (Boolean)this.get(this.unlockInnerAsync(Thread.currentThread().getId()));
    if (opStatus == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + Thread.currentThread().getId());
    } else {
        if (opStatus) {
            this.cancelExpirationRenewal();
        }
    }
}

③ 调用异步解锁方法unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    }

④ 脚本的参数的传参
在这里插入图片描述

(2)unlockInnerAsync方法脚本运行过程
判断key为redisson:lock:stock:sss的键值对是否不存在

不存在,说明锁已释放,直接执行 publish 命令向 redisson_lock__channel{redisson:lock:stock:sss} 频道,发送 0 的消息,返回 1。

如果键值对的存在,判断该field——100b604c-a5fa-496c-b667-9f652d0bd463:46在hash中是不是不存在

如果不存在,返回Nil,因为没有自己对应的,说明不是该锁的持有者,无权是否,返回nil

如果存在field,释放锁,因为锁可重入,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一。

释放一把锁后,判断值是否>0

大于0,说明还有剩余的锁,则刷新锁的失效时间并返回 0;

如果不大于0,说明刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1。

上面执行结果返回 nil 的情况(即第2中情况),因为自己不是锁的持有者,不允许释放别人的锁,故抛出异常。

执行结果返回 1 的情况,该锁的所有实例都已全部释放,所以不需要再刷新锁的失效时间。
在这里插入图片描述

3、基于Zookeeper实现分布式锁

以上是关于从源码角度分析分布式锁(Redis篇)的主要内容,如果未能解决你的问题,请参考以下文章

从源码和内核角度分析redis和nginx以及java NIO可以支持多大的并发

redis分布式锁实现---源码分析

Redis小结

简谈Redis

基于Redis实现分布式锁-Redisson使用及源码分析面试+工作

Springboot基于Redisson实现Redis分布式可重入锁案例到源码分析