redis--springboot实现redis的分布式锁

Posted 紫月冰凌

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis--springboot实现redis的分布式锁相关的知识,希望对你有一定的参考价值。

1.redis的应用场景

  • 商品秒杀
  • 点赞等

现在有一个减少商品的场景,我们很容易能写出其代码

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RequestMapping("/redis")
    public String deductSt0ck(){
        int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
        if (stock>0){
            stock--;
            redisTemplate.opsForValue().set("a",stock+"");
            System.out.println("扣除成功,剩余:" + stock);
        }else {
            System.out.println("扣除失败,剩余:" + stock);
        }
        return "end";
    }
}

但是有一个问题,该程序单机下线程不安全。不过可以解决:加锁

@Controller
@ResponseBody
public class Test {
@Autowired
private StringRedisTemplate redisTemplate;
@RequestMapping("/redis")
 public String deductSt0ck(){
     synchronized(this){
         if (!aBoolean){
             return "error";
         }

         int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
         if (stock>0){
             stock--;
             redisTemplate.opsForValue().set("a",stock+"");
             System.out.println("扣除成功,剩余:" + stock);
         }else {
             System.out.println("扣除失败,剩余:" + stock);
         }
         return "end";
     }
 }
}

加锁解决了单机的线程安全的问题,但是在集群的情况下线程依旧不安全,因为集群的情况下有多个服务器同时运行那么依然会产生线程安全问题;

技术图片

因为在同一时间有两个jvm运行,其中一个jvm的锁肯定不会影响另一个jvm。故此时就需要用到redis的分布式锁。

2.redis的分布式锁

Redis 是一个单进程单线程的非关系型数据库,而 Redis 锁的实质便是让并行的多个线程在Redis 内部以串行的方式执行

redis的(SETNX key value)语句

  • 将 key 的值设为 value ,当且仅当 key 不存在。
  • 若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

故可以通过redis自身特性,及其setnx操作来实现其分布式锁。有代码

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;
    
  @RequestMapping("/redis")
  public String deductSt0ck(){
      String lock = "lock";
      Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, "wf");
  
      if (!aBoolean){
          return "error";
      }
  
      int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
  
      if (stock>0){
          stock--;
          redisTemplate.opsForValue().set("a",stock+"");
          System.out.println("扣除成功,剩余:" + stock);
      }else {
          System.out.println("扣除失败,剩余:" + stock);
      }
      redisTemplate.delete(lock);
      return "end";
  }
}

该方法可以规避在单机状态下的安全问题但这个代码依然有问题,当程序加锁后,如果程序在解锁前出现异常,导致方法退出就不会把加的锁解开,会导致出现死锁,可以使用try--finally解决异常的问题。 还有在加锁后如果该程序突然挂断那依然会形成死锁,这个问题可以通过给key设置超时时间来解决

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RequestMapping("/redis")
    public String deductSt0ck(){
        String lock = "lock";
        String value = String.valueOf(UUID.randomUUID());
        try {
            Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, value,10, TimeUnit.SECONDS);
    
            if (!aBoolean){
                return "error";
            }
    
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
    
            if (stock>0){
                stock--;
                redisTemplate.opsForValue().set("a",stock+"");
                System.out.println("扣除成功,剩余:" + stock);
            }else {
                System.out.println("扣除失败,剩余:" + stock);
            }
        }finally {
                redisTemplate.delete(lock);
        }
        return "end";
    }
}

到这里,看似问题已经解决了,但是确实代码在分布式高并发的情况下依然有问题,就是在加锁后,如果代码出现问题,导致在还没有执行完成业务逻辑的情况下,达到了设置到超时时间,就会导致锁失效,从而使其他的线程获得执行权限,而在其他的线程加完锁后,第一个线程突然执行完成释放第二个线程加的锁。使下一个线程获得执行权限,加锁又被第二个线程释放/。这样链式这些下去就会导致锁失效。
这个问题可以通过每次给线程设置value值不同的锁,在释放锁使判断,如果value与设置到相同就释放锁,负责不释放。

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RequestMapping("/redis")
    public String deductSt0ck(){
        String lock = "lock";
        String value = String.valueOf(UUID.randomUUID());
        try {
            Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, value,10, TimeUnit.SECONDS);
    
            if (!aBoolean){
                return "error";
            }
    
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
    
            if (stock>0){
                stock--;
                redisTemplate.opsForValue().set("a",stock+"");
                System.out.println("扣除成功,剩余:" + stock);
            }else {
                System.out.println("扣除失败,剩余:" + stock);
            }
        }finally {
            if (value.equals(redisTemplate.opsForValue().get(lock))){
                redisTemplate.delete(lock);
            }
        }
        return "end";
    }
}

到这里看似代码已经完美了。但是上叙方案还有问题没有解决,就是在第二个线程加锁执行时,如果第一个线程的主逻辑没有执行完成,那么就会导致两个线程进行同时执行业务逻辑,此时线程依然不安全。而要解决该问题就需要在程序还未结束,锁到达超时时间时延长程锁的超时时间,及写一个子线程,检测如果只要程序没有执行完成就不停的延长锁的过期时间

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private Redisson redisson;
    
    @RequestMapping("/redis2")
    public String deductStock(){
        String lock = "lock";
        String value = String.valueOf(UUID.randomUUID());
        try {
            Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lock, value,10, TimeUnit.SECONDS);
           if (!aBoolean){
                return "error";
            }
            new Thread(()->{
                Timer timer = new Timer();
                timer.schedule(new TimerTask() {
                    @Override
                    public void run() {
                        if (value.equals(redisTemplate.opsForValue().get(lock))){
                            redisTemplate.expire(lock,15,TimeUnit.SECONDS);
                        }
                    }
                },5000,5000);
            }).start();
    
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
    
            if (stock>0){
                stock--;
                redisTemplate.opsForValue().set("a",stock+"");
                System.out.println("扣除成功,剩余:" + stock);
            }else {
                System.out.println("扣除失败,剩余:" + stock);
            }
        }finally {
            try {
                TimeUnit.SECONDS.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (value.equals(redisTemplate.opsForValue().get(lock))){
                redisTemplate.delete(lock);
            }
        }
        return "ok";
    }
}

3.通过redisson框架实现redis分布式锁

@Controller
@ResponseBody
public class Test {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RequestMapping("/redis")
    public String deductSt0ck(){
    String lock = "lock";
    RLock rLock = redisson.getLock(lock);
    
    try { 
    rLock.lock(30,TimeUnit.SECONDS);
    int stock = Integer.parseInt(redisTemplate.opsForValue().get("a"));
    
        if (stock>0){
        stock--;
        redisTemplate.opsForValue().set("a",stock+"");
        System.out.println("扣除成功,剩余:" + stock);
        }else {
        System.out.println("扣除失败,剩余:" + stock);
        }
    } finally {
      rLock.unlock();
    }
      return "end";
    }
}

redisson默认实现了以上所有的东西,
技术图片

Redis 主从架构失效问题

   以上架构还存在问题:有这样一种场景,即线程1成功上锁后,但是主服务器还未来的及复制到从服务器便发生了宕机,此时从服务器被选择为主服务器,由于其没有锁记录,其他线程便可以进行上锁,此时便发生了线程安全问题。

   Red Lock 算法也可以解决上面存在的问题,其算法思想为:*使用多台 Redis Master ,节点完全独立,节点间不需要进行数据同步,因为 Master-Slave 架构一旦 Master 发生故障时数据没有复制到 Slave,被选为 Master 的 Slave 就丢掉了锁,另一个客户端就可以再次拿到锁,锁通过 setNX(原子操作) 命令设置,在有效时间内当获得锁的数量大于 (n/2+1) 代表成功,失败后需要向所有节点发送释放锁的消息。*

以上是关于redis--springboot实现redis的分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

使用idea,springboot,springsession,redis实现分布式微服务的session 共享

springboot+spring session+redis+nginx实现session共享和负载均衡

jedis的使用,spring整合redis,springboot整合redis

Spring Boot整合Redis

JavaWeb SSM SpringBoot+Redis网上水果超市商城(源码+论文可运行《精品毕设》)主要实现登录注册商品分类浏览订单评论收藏购物车个人信息地址管理后台管理

Redis - Springboot中集成多个Redis客户端统一管理