基于Redis的分布式锁

Posted zuier

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Redis的分布式锁相关的知识,希望对你有一定的参考价值。

Redis分布式锁

1、锁场景

  • 需要对同一共享资源进行写操作
  • 对资源的访问是互斥的

任务通过竞争获取锁才能才能对该资源进行操作(竞争锁)

当有一个任务对资源进行操作时(占有锁)

其他任务不能对该资源进行操作(任务阻塞)

直到该任务操作结束(释放锁)

竞争锁 -> 占有锁 -> 任务阻塞 -> 释放锁

graph LR
A(竞争锁) -->B(占有锁) 
B(占有锁) --> C[任务阻塞] 
C[任务阻塞] --> D[释放锁]

2、实现思路

加锁:

通过setnx向特定的key写入一个随机值,并设置失效时间,写值成功即加锁成功

1、必须给锁设置失效时间:避免死锁

2、加锁时,每个节点产生一个随机字符串:避免误删锁

3、写入随机值与设置失效时间同一命令:保证原子性

解锁:

匹配随机值,删除redis上特定的key数据,保证获取数据,判断一致性,以及删除数据也是原子性

if redis.call("get", KEYS[1]) == ARGV[1] then
  return redis.call("del", KEYS[1])
else
  return 0
end

3、代码实现

1、RedisTemplate实现

基于RedisTemplate简单封装的RedisHelper,这不是重点

  • 实现Lock接口,完成加锁解锁的redis操作
  • 1、tryLock:加锁操作(注意设置有效时间防止死锁,和操作原子性)
  • 2、Lock:加锁等待,实现可重入锁。用带失效监听的中间件自然更优雅
  • 3、很多教程解锁用的lua脚本。我感觉一句读操作一句写操作不会产生不一致的问题,就直接解锁了。

    @Component
    @Scope("prototype")
    public class RedisLock implements Lock

      /**
       * 锁的key
       */
      @Setter
      private String key;
      private String value;
      @Setter
      private long seconds = 10;
    
      @Resource
      private RedisHelper redisHelper;
    
      /**
       * 阻塞锁
       */
      @Override
      public void lock() 
          while (!tryLock()) 
              try 
                  Thread.sleep(5);
               catch (InterruptedException e) 
                  e.printStackTrace();
              
          
      
    
      @Override
      public boolean tryLock() 
          String value = UUID.randomUUID().toString();
          Boolean b = redisHelper.setNx(this.key, value, this.seconds, TimeUnit.SECONDS);
          if (b) 
              this.value = value;
          
          return b;
      
    
      @Override
      @SneakyThrows
      public boolean tryLock(long time, @NonNull TimeUnit unit) 
          String value = UUID.randomUUID().toString();
          Boolean b = redisHelper.setNx(this.key, value, time, unit);
          if (b) 
              this.value = value;
          
          return b;
      
    
      @Override
      public void unlock() 
          String value = redisHelper.get(key);
          if (value.equals(this.value)) 
              redisHelper.del(key);
          
      
    
      @Override
      @NonNull
      public Condition newCondition() 
          return null;
      

  • 使用:

    @Resource
    private RedisLock redisLock;

    /**
    • 测试并发扣除库存,调用前先手动在redis中设置库存数量
    • set store 100
      */
      @GetMapping("/getStore")
      public R getStore()

      int i;
      // 阻塞加锁
      redisLock.setKey("store-lock");
      redisLock.lock();
      try
      String store = redisHelper.get("store");
      if ("0".equals(store))
      return R.error("库存不足");

      i = Integer.parseInt(store) - 1;
      log.info("剩余库存数量:", i);
      redisHelper.set("store", String.valueOf(i));
      finally
      // 释放锁
      redisLock.unlock();

      return R.ok(i);

2、Redisson实现

自动续约,不用自己开线程检查

  • 依赖


    org.redisson
    redisson
    3.11.0

  • 配置

    /**
    • redisson配置
      */
      @Bean
      public Redisson redisson()
      Config config = new Config();
      config.useSingleServer().setAddress("redis://127.0.0.1:6379")
      .setPassword("654321");
      return (Redisson) Redisson.create(config);
  • 使用

    @Resource
    private Redisson redisson;

    @GetMapping("/getStore1")
    public R getStore1()
    String lockKey = "redisLockKey";
    RLock lock = redisson.getLock(lockKey);
    lock.lock();
    int i;
    // 阻塞加锁
    try
    String store = redisHelper.get("store1");
    if ("0".equals(store))
    return R.error("库存不足");

    i = Integer.parseInt(store) - 1;
    log.info("剩余库存数量:", i);
    redisHelper.set("store1", String.valueOf(i));
    finally
    // 释放锁
    lock.unlock();

    return R.ok(i);

3、封装功能

  • 定义注解(可重入锁注解,其他类型略)

    /**
    • @description 可重入锁注解
      */
      @Target(ElementType.METHOD)
      @Retention(RetentionPolicy.RUNTIME)
      @Documented
      public @interface TryLock

      /**
      • 锁的key,必须指定
        */
        String key();
      /**
      • 自动释放锁的超时时间:默认为10
        */
        long expire() default 10;
      /**
      • 时间单位:默认为秒
        */
        TimeUnit timeUnit() default TimeUnit.SECONDS;

  • 定义注解处理器

    @Slf4j
    @Aspect
    @Component
    @AllArgsConstructor
    public class LockHandler

      private Redisson redisson;
    
      /**
       * 可重入锁处理
       */
      @SneakyThrows
      @Around("@annotation(tryLock)")
      public Object lockAroundAction(ProceedingJoinPoint proceeding, TryLock tryLock) 
          String key = tryLock.key();
          long expire = tryLock.expire();
          TimeUnit timeUnit = tryLock.timeUnit();
          RLock lock = redisson.getLock(key);
          lock.lock(expire, timeUnit);
          try 
              return proceeding.proceed();
           finally 
              lock.unlock();
          
      

  • 使用

    // 注解并指定key
    @TryLock(key = "store2_lock")
    @GetMapping("/getStore2")
    public R getStore2()
    int i;
    // 阻塞加锁
    String store = redisHelper.get("store2");
    if ("0".equals(store))
    String msg = "库存不足";
    log.info(msg);
    return R.error(msg);

    i = Integer.parseInt(store) - 1;
    log.info("剩余库存数量:", i);
    redisHelper.set("store2", String.valueOf(i));
    return R.ok(i);

以上是关于基于Redis的分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

Redis实现分布式锁(设计模式应用实战)

redis基于redis实现分布式并发锁

基于 Redis 分布式锁实现“秒杀”(含代码)

基于redis和zookeeper的分布式锁实现方式

分布式锁的两种实现方式(基于redis和基于zookeeper)

分布式锁的两种实现方式(基于redis和基于zookeeper)