分布式下如何加锁?

Posted tomcat峰峰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式下如何加锁?相关的知识,希望对你有一定的参考价值。

分布式锁演进-基本原理

 我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。 “占坑”可以去redis,可以去数据库,可以去任何大家都能访问的地方。 等待可以自旋的方式。

分布式锁演进-阶段一

 代码实现:

      //占用分布式锁。去redis占坑
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("locl", "111");
        if (lock) {
            //加锁成功之后,处理业务逻辑
            Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
            stringRedisTemplate.delete("lock");//处理完业务之后需要将锁删除,不然会造成死锁
            return dataFromDb;
        } else {
            //加锁失败。。。。重试  我们必须等待再去尝试获取锁执行业务
            return getCatalogJsonFromDbWithRedisLock();//采用自旋的方式
        }

但是这种方式存在的问题就是:setnx占好了位,业务代码异常或者程序在页面过程突然宕机。那我们就没有执行删除锁逻辑,没有执行删除锁的逻辑下一个线程就永远得不到锁。 这样就造成了死锁现象。

解决这一问题的办法就是:设置锁的自动过期时间,即使我们没有删除逻辑在宕机之后时间到了会直接删除锁。即使没有删除,会自动删除。这样一来就避免了死锁问题。

但是这样也会造成致命性的问题,看下面的解决办法:

分布式锁演进-阶段二

 代码实现:

  //1、占用分布式锁。去redis占坑
        String uuid = UUID.randomUUID().toString();
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("locl", uuid);
        if (lock) {
            //加锁成功之后,处理业务逻辑
            //2、设置过期时间
            stringRedisTemplate.expire("locl", 30, TimeUnit.SECONDS);
            Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
            stringRedisTemplate.delete("lock");//处理完业务之后需要将锁删除,不然会造成死锁
            return dataFromDb;
        } else {
            //加锁失败。。。。重试  我们必须等待再去尝试获取锁执行业务
            return getCatalogJsonFromDbWithRedisLock();//采用自旋的方式
        }

但是这种方式存在的问题就是:setnx设置好,正要去设置过期时间,此时出现了宕机。这样又会造成死锁了。此时我们就想到了设置过期时间和我们上锁的时候保持一个原子性。

解决办法: 在占位的时候我们把过期时间一起设置了。设置过期时间和占位必须是原子的。redis支持使用setnx ex 命令。

下面再来看看我们怎么将占位和设置过期时间保持原子性。

分布式锁演进-阶段三

实现代码:

       //1、占用分布式锁。去redis占坑
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("locl", "1111",30, TimeUnit.SECONDS);
        if (lock) {
            //加锁成功之后,处理业务逻辑
            //2、设置过期时间
           // stringRedisTemplate.expire("locl", 30, TimeUnit.SECONDS);
            Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
            stringRedisTemplate.delete("lock");//处理完业务之后需要将锁删除,不然会造成死锁
            return dataFromDb;
        } else {
            //加锁失败。。。。重试  我们必须等待再去尝试获取锁执行业务
            return getCatalogJsonFromDbWithRedisLock();//采用自旋的方式
        }

存在的问题:删除锁直接删除???这样子实现虽然我们保证了设置过期时间和占位的时候保持了原子性。但是,如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。

解决办法:我们在占锁的时候,将值指定为随机的(用uuid实现随机数),必须拿着UUID产生的信息去从每个人匹配是否是自己的锁才删除。

但是这样又会造成一个问题接着看解决办法:

分布式锁演进-阶段四

 实现代码:

      //1、占用分布式锁。去redis占坑
        String uuid = UUID.randomUUID().toString();
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("locl", uuid, 30, TimeUnit.SECONDS);
        if (lock) {
            //加锁成功之后,处理业务逻辑
            //2、设置过期时间
            // stringRedisTemplate.expire("locl", 30, TimeUnit.SECONDS);
            Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
            //先去redis查询下保证当前的锁是自己的
            //获取值对比,对比成功删除=原子性 lua脚本解锁
            String uuidValue = stringRedisTemplate.opsForValue().get("locl");
            if (uuid.equals(uuidValue)) {
                stringRedisTemplate.delete("lock");//将锁删除
            }
            return dataFromDb;
        } else {
            //加锁失败。。。。重试  我们必须等待再去尝试获取锁执行业务
            return getCatalogJsonFromDbWithRedisLock();//采用自旋的方式
        }

存在的问题:如果正好判断是当前值,正要删除锁的时候,锁已经过期, 别人已经设置到了新的值。那么我们删除的是别人的锁。所以我们删除也必须保持原子性。

解决办法:在设置占位和过期时间要保持原子性,删除锁也必须保证原子性。使用redis+Lua脚本完成。

分布式锁演进-阶段五-最终形态

 实现代码:

    //1、占分布式锁、占坑位
        String uuid = UUID.randomUUID().toString();
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
        if (lock) {
            //加锁成功、执行业务
            //2、设置过期时间,必须和加锁同步的,原子的
            //stringRedisTemplate.expire("lock", 30, TimeUnit.SECONDS);
            Map<String, List<Catelog2Vo>> dataFromDb = null;
            try {
                dataFromDb = getDataFromDb();
            } finally {
                String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return  redis.call('del', KEYS[1]) else return 0 end";
                Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
                        Arrays.asList("lock"), uuid);
            }
        
            return dataFromDb;
        } else {
            //加锁失败,重试(自旋锁)
            return getCatalogJsonFromDbWithRedisLock();//自旋的方式、自己调用自己
        }

这样一来我们就解决了分布式下锁为问题,但是我们从代码来看基本上都是相同的代码。下面我们将利用redis中的redisson框架来解决。

        //1、占分布式锁。去redis占坑
        //(锁的粒度,越细越快:具体缓存的是某个数据,11号商品) product-11-lock
        //RLock catalogJsonLock = redissonClient.getLock("catalogJson-lock");
        //创建读锁
        RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("catalogJson-lock");

        RLock rLock = readWriteLock.readLock();

        Map<String, List<Catelog2Vo>> dataFromDb = null;
        try {
            rLock.lock();
            //加锁成功...执行业务
            dataFromDb = getDataFromDb();
        } finally {
            rLock.unlock();
        }
  

        return dataFromDb;

以上是关于分布式下如何加锁?的主要内容,如果未能解决你的问题,请参考以下文章

Redis分布式锁加锁案例讲解

分布式加锁

如何利用redis来进行分布式集群系统的限流设计

SpringBoot整合Redis以及缓存穿透缓存雪崩缓存击穿的理解如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁

DCS实践干货:使用Redis实现分布式锁

分布式锁之Zookeeper