redis缓存-分布式锁原理与使用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis缓存-分布式锁原理与使用相关的知识,希望对你有一定的参考价值。

  • 本地锁 只能锁定自己的服务

  • SET key value [EX seconds] [PX milliseconds] [NX|XX] http://www.redis.cn/commands/set.html

public Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() 
    //redis 分布原则 加锁保证原子性,解锁保证原子性
    //1.占用分布式锁 去redis站坑setIfAbsent = SET NX,随便set K V数值,之后返回一个结果 =>2.设置过期时间和枷锁是同步的原子的
    String uuid = UUID.randomUUID().toString(); //设置v值是uuid
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
    if (lock) 
        System.out.println("获取分布式锁成功...");
        //1.1加锁成功 执行业务
        //2.设置期时间 以免造成死锁 设置 30秒自动删除 ;设置过期时间和枷锁是同步的原子的
        Map<String, List<Catelog2Vo>> dataFromDb;
        try 
            //redisTemplate.expire("lock",30,TimeUnit.SECONDS);
            dataFromDb = getDataFromDb();
         finally 
            String script = "if redis.call(get,KEYS[1]) == ARGV[1] then return redis.call(del,KEYS[1]) else return 0 end";
            Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
        
        //3.删除 不是自己锁 获取数值对比+对比删除成功=原子操作 lua 解锁操作
        /**
         * String lockValue = redisTemplate.opsForValue().get("lock"); //查到锁的数值
         *   //2.1查看放的uuid和lockValue一致 说明 锁是自己加的 在执行删除自己 锁
         *   if (uuid.equals(lockValue))
         *     //1.2执行成功之后 解锁/删除锁 共其他使用
         *     redisTemplate.delete("lock");
         *     
         */
        return dataFromDb;
     else 
        //枷锁失败。。在重置,把自己在调一遍
        try 
            Thread.sleep(200);
        catch (Exception e)
        
        System.out.println("获取分布式锁失败 重试...");
        return getCatalogJsonWithRedisLock();//自旋方式

    


导入pom

<!--用redisson作为分布式锁,分布式对象 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.0</version>
</dependency>

配置MyRedissonConfig

@Configuration
public class MyRedissonConfig 
    /**
     * 对于redisson配置是RedissonClient操作的
     *
     * @return
     * @throws IOException
     */
    @Bean(destroyMethod = "shutdown")
    RedissonClient redisson() throws IOException 
        //创建配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.56.10:6379");
        //config.useClusterServers() 集群模式
        //        .addNodeAddress("127.0.0.1:7004", "127.0.0.1:7001");
        //依据Config创建RedissonClient
        RedissonClient redissonClient = Redisson.create(config);
        return redissonClient;
    

测试加锁 解锁

public String hello() 
    //1.获取一把锁 只要锁 名字一样就是用一把锁
    RLock lock = redisson.getLock("my_lock");
    //2.加锁
    lock.lock(30, TimeUnit.SECONDS);
    //lock.lock(); //堵塞式等待
    try 
        //打印线程好  如果服务中断 有个看门狗自动续期
        System.out.println("加锁成功执行业务"+Thread.currentThread().getId());
        Thread.sleep(30000);
     catch (InterruptedException e) 
     finally 
        //3.解锁
        System.out.println("解锁成功执行业务"+Thread.currentThread().getId());
        lock.unlock();
    
    return "hello";

//2.加锁 锁的自动续期,如果业务超长,运行期间自动给锁续上行的时间,不用担心业务时间长 锁自动过期被删掉 // 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。

  • 10秒自动解锁,自动解锁时间-定要大于业务的执行时间。在锁时间到了以后,不会自动续期。
  • 如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
  • 如果我们未指定锁的超时时间,就使用30 * 1000 [LockWatchdogTimeout看门狗的默认时间] ;
  • 只要占锁成功,就会启动一个定时任务[重新给锁设置过期时间,新的过期时间就是看[ ]狗的默认时间]
  • internallockLeaseTime [看门狗时间] / 3, 10s 规则用 lock.lock(30, TimeUnit.SECONDS); 加锁时间>=解锁时间

读写锁测试

  • 保证一 定能读到最新数据,修改期间,写锁是一 个排他锁(互斥锁,独享锁)。读锁是一 一个共享锁
  • 与锁役释放读就必须寺待
  • 读+读 相当于无锁,并发读,只会在redis中记录好,所有当前的读锁。他们都会同时加锁成功
  • 写+读:等待写锁释放
  • 写+写:阻塞方式
  • 读+写:有读锁。写也需要等待。
  • 只要有写的存在,都必须等待
@ResponseBody
@GetMapping("/write") //读写锁
public String writeValue() 
    String s = "";
    RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");//加写锁
    RLock rLock = lock.writeLock();
    try 
        //改数据加写锁,读数据加读锁
        rLock.lock();
        s = UUID.randomUUID().toString();
        redisTemplate.opsForValue().set("writeValue", s);
        Thread.sleep(30000);
     catch (InterruptedException e) 
        e.printStackTrace();
     finally 
        rLock.unlock();
    
    return s;


@ResponseBody
@GetMapping("/read") //读写锁
public String readValue() 
    String s = "";
    RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
    RLock rLock = lock.readLock(); //加读锁
    //执行业务代码
    rLock.lock();
    try 
        s = redisTemplate.opsForValue().get("writeValue");
     catch (Exception e) 
        e.printStackTrace();
     finally 
        rLock.unlock();
    
    return s;


Redisson-信号量测试

车库停车 信号量测试 限流使用

@GetMapping("park")
@ResponseBody
public String park() throws InterruptedException 
    //使用信号量命名
    RSemaphore park = redisson.getSemaphore("park");
    //获取一个信号 获取一个数值 占一个车位  获取失败会一直等待
    boolean b = park.tryAcquire(); //尝试获取 能否取到数值 不行就false 走
    if (b)
        //执行代码
    else 
        return "error";
    
    park.acquire();
    return "ok停车+1";


@GetMapping("go")
@ResponseBody
public String go() throws InterruptedException 
    //使用信号量命名
    RSemaphore park = redisson.getSemaphore("park");
    //释放信号
    park.release();
    return "ok 开走-1";

Redisson-闭锁测试

分布式闭锁 放假锁门 等到全部走完 才关锁

@GetMapping("/lockDoor")
@ResponseBody
public  String lockDoor() throws InterruptedException 
    RCountDownLatch doot = redisson.getCountDownLatch("door");
    doot.trySetCount(5);//等待5个
    doot.await();//等待闭锁都完成
    return "放假了";


@GetMapping("/gogo/id")//接收走的id
@ResponseBody
public  String gogo(@PathVariable("id") Long id)
    RCountDownLatch doot = redisson.getCountDownLatch("door");
    doot.countDown();//等价于计数减一
    return id+"走了";

修正之前方法

public Map<String, List<Catelog2Vo>> getCatalogJsonWithRedissonLock() 
    //去占锁
    RLock lock = redisson.getLock("catalogJson-lock");
    //占锁之后 加锁
    lock.lock();
    Map<String, List<Catelog2Vo>> dataFromDb;
    try 
        dataFromDb = getDataFromDb();
     finally 
        //解锁
        lock.unlock();
    
    return dataFromDb;

缓存-分布式锁-缓存一致性解决

redis分布式锁原理与实现

分布式锁原理

分布式锁,是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。

使用setnx、getset、expire、del这4个redis命令实现

setnx 是『SET if Not eXists』(如果不存在,则 SET)的简写。 命令格式:SETNX key value;使用:只在键 key 不存在的情况下,将键 key 的值设置为 value 。若键 key 已经存在, 则 SETNX 命令不做任何动作。返回值:命令在设置成功时返回 1 ,设置失败时返回 0 。
getset 命令格式:GETSET key value,将键 key 的值设为 value ,并返回键 key 在被设置之前的旧的value。返回值:如果键 key 没有旧值, 也即是说, 键 key 在被设置之前并不存在, 那么命令返回 nil 。当键 key 存在但不是字符串类型时,命令返回一个错误。
expire 命令格式:EXPIRE key seconds,使用:为给定 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除。返回值:设置成功返回 1 。 当 key 不存在或者不能为 key 设置生存时间时(比如在低于 2.1.3 版本的 Redis 中你尝试更新 key 的生存时间),返回 0 。
del 命令格式:DEL key [key …],使用:删除给定的一个或多个 key ,不存在的 key 会被忽略。返回值:被删除 key 的数量。
redis 分布式锁原理一

原理图如下

过程分析:

A尝试去获取锁lockkey,通过setnx(lockkey,currenttime+timeout)命令,对lockkey进行setnx,将value值设置为当前时间+锁超时时间;
如果返回值为1,说明redis服务器中还没有lockkey,也就是没有其他用户拥有这个锁,A就能获取锁成功;
在进行相关业务执行之前,先执行expire(lockkey),对lockkey设置有效期,防止死锁。因为如果不设置有效期的话,lockkey将一直存在于redis中,其他用户尝试获取锁时,执行到setnx(lockkey,currenttime+timeout)时,将不能成功获取到该锁;
执行相关业务;
释放锁,A完成相关业务之后,要释放拥有的锁,也就是删除redis中该锁的内容,del(lockkey),接下来的用户才能进行重新设置锁新值。
代码实现:

public void redis1() {
log.info("关闭订单定时任务启动");
long lockTimeout = Long.parseLong(PropertiesUtil.getProperty("lock.timeout", "5000"));
//这个方法的缺陷在这里,如果setnx成功后,锁已经存到Redis里面了,服务器异常关闭重启,将不会执行closeOrder,也就不会设置锁的有效期,这样的话锁就不会释放了,就会产生死锁
Long setnxResult = RedisShardedPoolUtil.setnx(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
if (setnxResult != null && setnxResult.intValue() == 1) {
//如果返回值为1,代表设置成功,获取锁
closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
} else {
log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
}
log.info("关闭订单定时任务结束");
}
private void closeOrder(String lockName) {
//对锁设置有效期
RedisShardedPoolUtil.expire(lockName, 5);//有效期为5秒,防止死锁
log.info("获取锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
//执行业务
int hour = Integer.parseInt(PropertiesUtil.getProperty("close.order.task.time.hour", "2"));
iOrderService.closeOrder(hour);
//执行完业务后,释放锁
RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
log.info("释放锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
log.info("=================================");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
缺陷:
如果A在setnx成功后,A成功获取锁了,也就是锁已经存到Redis里面了,此时服务器异常关闭或是重启,将不会执行closeOrder,也就不会设置锁的有效期,这样的话锁就不会释放了,就会产生死锁。

解决方法:
关闭Tomcat有两种方式,一种通过温柔的执行shutdown关闭,一种通过kill杀死进程关闭

//通过温柔的执行shutdown关闭时,以下的方法会在关闭前执行,即可以释放锁,而对于通过kill杀死进程关闭时,以下方法不会执行,即不会释放锁
//这种方式释放锁的缺点在于,如果关闭的锁过多,将造成关闭服务器耗时过长
@PreDestroy
public void delLock() {
RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
}
1
2
3
4
5
6
redis 分布式锁原理2(优化版)

为了解决原理1中会出现的死锁问题,提出原理2双重防死锁,可以更好解决死锁问题。
原理图如下:

过程分析:

当A通过setnx(lockkey,currenttime+timeout)命令能成功设置lockkey时,即返回值为1,过程与原理1一致;
当A通过setnx(lockkey,currenttime+timeout)命令不能成功设置lockkey时,这是不能直接断定获取锁失败;因为我们在设置锁时,设置了锁的超时时间timeout,当当前时间大于redis中存储键值为lockkey的value值时,可以认为上一任的拥有者对锁的使用权已经失效了,A就可以强行拥有该锁;具体判定过程如下;
A通过get(lockkey),获取redis中的存储键值为lockkey的value值,即获取锁的相对时间lockvalueA
lockvalueA!=null && currenttime>lockvalue,A通过当前的时间与锁设置的时间做比较,如果当前时间已经大于锁设置的时间临界,即可以进一步判断是否可以获取锁,否则说明该锁还在被占用,A就还不能获取该锁,结束,获取锁失败;
步骤4返回结果为true后,通过getSet设置新的超时时间,并返回旧值lockvalueB,以作判断,因为在分布式环境,在进入这里时可能另外的进程获取到锁并对值进行了修改,只有旧值与返回的值一致才能说明中间未被其他进程获取到这个锁
lockvalueB == null || lockvalueA==lockvalueB,判断:若果lockvalueB为null,说明该锁已经被释放了,此时该进程可以获取锁;旧值与返回的lockvalueB一致说明中间未被其他进程获取该锁,可以获取锁;否则不能获取锁,结束,获取锁失败。
代码实现:

public void redis2() {
log.info("关闭订单定时任务启动");
long lockTimeout = Long.parseLong(PropertiesUtil.getProperty("lock.timeout", "5000"));
Long setnxResult = RedisShardedPoolUtil.setnx(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
if (setnxResult != null && setnxResult.intValue() == 1) {
//如果返回值为1,代表设置成功,获取锁
closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
} else {
//未获取到锁,继续判断,判断时间戳,看是否可以重置并获取到锁
String lockValueStr = RedisShardedPoolUtil.get(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
//通过当前的时间与锁设置的时间做比较,如果当前时间已经大于锁设置的时间临界,即可以进一步判断是否可以获取锁,否则说明该锁还在被占用,不能获取该锁
if (lockValueStr != null && System.currentTimeMillis() > Long.parseLong(lockValueStr)) {
//通过getSet设置新的超时时间,并返回旧值,以作判断,因为在分布式环境,在进入这里时可能另外的进程获取到锁并对值进行了修改,只有旧值与返回的值一致才能说明中间未被其他进程获取到这个锁
String getSetResult = RedisShardedPoolUtil.getSet(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, String.valueOf(System.currentTimeMillis() + lockTimeout));
//再次用当前时间戳getset。
//返回给定的key的旧值,与旧值判断,是否可以获取锁
//当key没有旧值时,即key不存在时,返回nil ->获取锁
//这里我们set了一个新的value值,获取旧的值。
//若果getSetResult为null,说明该锁已经被释放了,此时该进程可以获取锁;旧值与返回的getSetResult一致说明中间未被其他进程获取该锁,可以获取锁
if (getSetResult == null || (getSetResult != null && StringUtils.equals(lockValueStr, getSetResult))) {
//真正获取到锁
closeOrder(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
} else {
log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
}
} else {
log.info("没有获得分布式锁:{}", Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
}
}
log.info("关闭订单定时任务结束");
}
private void closeOrder(String lockName) {
//对锁设置有效期
RedisShardedPoolUtil.expire(lockName, 5);//有效期为5秒,防止死锁
log.info("获取锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
//执行业务
int hour = Integer.parseInt(PropertiesUtil.getProperty("close.order.task.time.hour", "2"));
iOrderService.closeOrder(hour);
//执行完业务后,释放锁
RedisShardedPoolUtil.del(Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK);
log.info("释放锁:{},ThreadName:{}",Const.REDIS_LOCK.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
log.info("=================================");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
优化点:

加入了超时时间判断锁是否超时了,及时A在成功设置了锁之后,服务器就立即出现宕机或是重启,也不会出现死锁问题;因为B在尝试获取锁的时候,如果不能setnx成功,会去获取redis中锁的超时时间与当前的系统时间做比较,如果当前的系统时间已经大于锁超时时间,说明A已经对锁的使用权失效,B能继续判断能否获取锁,解决了redis分布式锁的死锁问题。
————————————————
版权声明:本文为CSDN博主「dazou1」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/dazou1/article/details/88088223

以上是关于redis缓存-分布式锁原理与使用的主要内容,如果未能解决你的问题,请参考以下文章

分布式缓存总结

Redis实现分布式锁原理与实现分析

利用多写Redis实现分布式锁原理与实现分析

Redis分布式锁的实现原理

Redis分布式锁的实现原理

Redis分布式锁的实现原理