Redis的分布式锁

Posted 爱上口袋的天空

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis的分布式锁相关的知识,希望对你有一定的参考价值。

目录

一、什么是分布式锁? 

二、成为分布式锁的必要条件

三、分布式锁的实现方案

四、Redis 分布式锁的实现核心思路

1、实现分布式锁时需要实现的两个基本方法

 2、核心思路:

如果在添加过期时间后宕机了呢? 

五、Redis分布式锁实现秒杀下单(初级版本)

1、代码实现 

测试结果 

存在问题

2、解决方案  

六、使用 Lua 脚本来实现分布式锁

1、Lua的简单介绍 

 2、代码实现 

 3、小总结


一、什么是分布式锁? 

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。 

  • 当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。
  • 传统单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLcok或synchronized)进行互斥控制。
  • 但是在分布式系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机并发控制锁策略失效,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁的由来。

二、成为分布式锁的必要条件

 


三、分布式锁的实现方案

分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,市面上常见大致有三种分布式锁的实现方案


四、Redis 分布式锁的实现核心思路

1、实现分布式锁时需要实现的两个基本方法

  • 获取锁:
    • 互斥:确保只能有一个线程获取锁
    • 非阻塞:尝试一次,成功返回 true,失败返回 false
  • 释放锁:
    • 手动释放
    • 超时释放:获取锁时添加一个超时时间

 2、核心思路:

        我们利用 redis 的 setNx 方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个 key 了,返回了 1,如果结果是 1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的哥们,等待一定时间后重试即可

超时释放 

expire命令添加了过期时间,等时间到了自动释放!

如果在添加过期时间后宕机了呢? 

在redis中,获取锁添加锁的命令可以一起执行,一条redis的命令中,是具有原子性的! 

最优的写法如下 

set lock thread1 nx ex 10


五、Redis分布式锁实现秒杀下单(初级版本)

1、代码实现 

编写ILock接口,定义锁  

/**
 * redis的分布式锁
 */
public interface ILock 
 
    /**
     * 尝试获取锁
     * @param timeoutSec 锁持有的超时时间,过期后自动释放
     * @return true代表获取锁成功; false代表获取锁失败
     */
    boolean tryLock(long timeoutSec);
 
    /**
     * 释放锁
     */
    void unlock();

实现ILock接口,编写tryLock()和unLock()逻辑 

/**
 * redis的分布式锁
 * 实现ILock接口
 */
public class SimpleRedisLock implements ILock 
 
    // 不同的业务有不同的锁名称
    private String name;
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
 
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) 
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    
 
    @Override
    public boolean tryLock(long timeoutSec) 
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        // set lock thread1 nx ex 10
        // nx : setIfAbsent , ex : timeoutSec
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        // 自动拆箱!!!可能有风险
        return Boolean.TRUE.equals(success);
    
 
    @Override
    public void unlock() 
        stringRedisTemplate.delete(KEY_PREFIX + name);
    

 业务逻辑 

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService 
 
    @Resource
    private ISeckillVoucherService seckillVoucherService;
 
    @Resource
    private RedisIdWorker redisIdWorker;
 
    @Resource
    private StringRedisTemplate stringRedisTemplate;
 
    @Override
    public Result seckillVoucher(Long voucherId) 
 
        // 1. 查询优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        LocalDateTime nowTime = LocalDateTime.now();
 
        // 2. 判断秒杀是否开始
        if (nowTime.isBefore(voucher.getBeginTime())) 
            return Result.fail("活动未开始!");
        
 
        // 3. 判断秒杀是否结束
        if (nowTime.isAfter(voucher.getEndTime())) 
            return Result.fail("活动已结束!");
        
 
        // 4. 判断库存
        if (voucher.getStock() < 1) 
            return Result.fail("已买完!");
        
 
        Long userId = UserHolder.getUser().getId();
 
        // 创建锁对象
        SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
        // 获取锁 1200s
        boolean isLock = lock.tryLock(1200);
 
        if (!isLock) 
            return Result.fail("不允许重复下单!");
        
        try 
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
         finally 
            lock.unlock();
        
    
 
    @Transactional
    public Result createVoucherOrder(Long voucherId) 
        // 一人一单
        Long userId = UserHolder.getUser().getId();
 
        Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        if (count > 0)
            return Result.fail("用户已经购买过一次了!");
        
 
        // 5. 减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                .eq("voucher_id", voucherId).gt("stock", 0)  // CAS方案(乐观锁)!
                .update();
 
        if (!success) 
            return Result.fail("库存不足");
        
 
        // 6. 创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
 
        // 6.1 订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 6.2 用户id
        voucherOrder.setUserId(userId);
        // 6.3代金券id
        voucherOrder.setVoucherId(voucherId);
        save(voucherOrder);
        return Result.ok(orderId);
    

从之前的 synchronized 到现在的 isLock

// 创建锁对象

SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);

// 获取锁 1200s

boolean isLock = lock.tryLock(1200);

if (!isLock) 

        return Result.fail("不允许重复下单!");



try 

        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();         return proxy.createVoucherOrder(voucherId);

 finally 

        lock.unlock();

测试结果 

8081端口 

isLock的结果是false,所以不会被锁住,执行后续的业务逻辑 

8082端口 

isLock的结果是true,所以会被锁住,直接return

存在问题

这么看起来视乎是没有问题了,但是我们来看看下面的这种特殊情况!

 由于线程1的业务阻塞,线程1的锁超时释放了,所以此时线程2就可以获取线程1刚刚释放的锁

当线程2开始执行自己的业务代码时,线程1的业务正巧又执行完了,所以刚刚线程2获取的锁会被线程1释放

此时又来了一个线程3,由于此时的状态是“无锁”,所以线程3也可以获取锁,那么现在就变成了线程2、3同时在执行业务代码!!!

这样子就可能发生线程安全的问题

这就是Redis分布式锁误删问题!!!

2、解决方案  

发生这个问题的主要原因。主要是在各个线程之间释放锁的机制有问题!我们要让线程只能释放自己上的锁,不能释放其他线程的锁!(自己的事情自己做

修改一下释放锁的逻辑,每一个线程只能删除自己的锁!!! 

redis中数据样例

 

代码如下

我们可以用当前线程的id与redis的值相比较,来确定是不是同一个!

@Override
public void unlock() 
    // 获取线程标示
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    // 获取锁中的标示
    String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
    // 判断标示是否一致
    if(threadId.equals(id)) 
        // 释放锁
        stringRedisTemplate.delete(KEY_PREFIX + name);
    

 但是这仍然不是最佳的实现方案,它在极端的情况下还是会发生问题!

如上述代码,如果“判断锁标识”和“释放锁”,之间发生了阻塞呢?JVM触发FULL GC

那么之前一节所讲的“锁误删”就有可能发生!!!

所以,我们的解决思路是要保证这两个操作的原子性

我们可以使用Redis的事务+乐观锁来解决这个问题,但是这样子做非常复杂!这里我们使用 Lua 脚本来实现分布式锁


六、使用 Lua 脚本来实现分布式锁

1、Lua的简单介绍 

        Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。Lua 是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍 Redis 提供的调用函数,我们可以使用 lua 去操作 redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了,作为 Java 程序员这一块并不作一个简单要求,并不需要大家过于精通,只需要知道他有什么作用即可。

这里重点介绍 Redis 提供的调用函数,语法如下:

redis.call('命令名称', 'key', '其它参数', ...)

 例如,我们要执行 set name jack,则脚本是这样:

# 执行 set name jack
redis.call('set', 'name', 'jack')

 例如,我们要先执行 set name Rose,再执行 get name,则脚本如下:

# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

 写好脚本以后,需要用 Redis 命令来调用脚本,调用脚本的常见命令如下:

例如,我们要执行 redis.call (‘set’, ‘name’, ‘jack’) 这个脚本,语法如下:

如果脚本中的 key、value 不想写死,可以作为参数传递。key 类型参数会放入 KEYS 数组,其它参数会放入 ARGV 数组,在脚本中可以从 KEYS 和 ARGV 数组获取这些参数:

接下来我们来回一下我们释放锁的逻辑:

释放锁的业务流程是这样的:

  1. 获取锁中的线程标示
  2. ​ 判断是否与指定的标示(当前线程标示)一致
  3. ​ 如果一致则释放锁(删除)
  4. ​ 如果不一致则什么都不做

如果用 Lua 脚本来表示则是这样的:

Lua脚本解决unLock业务流程 

-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
  -- 一致,则删除锁
  return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

 2、代码实现 

unLock.lua 

-- 获取锁标识,是否与当前线程一致?
if(redis.call('get', KEYS[1]) == ARGV[1]) then
    -- 一致,删除
    return redis.call('del', KEYS[1])
end
-- 不一致,直接返回
return 0

RedisTemplate调用Lua脚本的API 

 

Lua 解决unLock问题

/**
 * redis的分布式锁
 * 实现ILock接口
 */
public class SimpleRedisLock implements ILock 
 
    // 不同的业务有不同的锁名称
    private String name;
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
    // DefaultRedisScript,
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
 
    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) 
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    
 
    // 初始化 UNLOCK_SCRIPT,用静态代码块的方式,一加载SimpleRedisLock有会加载unlock.lua
    // 避免每次调unLock() 才去加载,提升性能!!!
    static 
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        // setLocation() 设置脚本位置
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        // 返回值类型
        UNLOCK_SCRIPT.setResultType(Long.class);
    
 
    /**
     * 获取锁
     */
    @Override
    public boolean tryLock(long timeoutSec) 
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        // set lock thread1 nx ex 10
        // nx : setIfAbsent(如果不存在) , ex : timeoutSec(秒)
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        // 自动拆箱(Boolean -> boolean)!!!可能有风险
        return Boolean.TRUE.equals(success);
    
 
    /**
     * 解决判断(锁标识、释放锁)这两个动作,之间产生阻塞!!!
     * JVM的 FULL GC
     * 要让这两个动作具有原子性
     */
    @Override
    public void unlock() 
        // 调用lua脚本
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId());
    

 3、小总结

基于 Redis 的分布式锁实现思路:

  • 利用 set nx ex 获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁
    • 特性:
      • 利用 set nx 满足互斥性
      • 利用 set ex 保证故障时锁依然能释放,避免死锁,提高安全性
      • 利用 Redis 集群保证高可用和高并发特性

以上是关于Redis的分布式锁的主要内容,如果未能解决你的问题,请参考以下文章

redis教程-----redis事务记录日志到redis分布式锁

关于redis分布式锁

基于redis实现分布式锁

redis分布式锁

redis分布式锁

Redis 分布式锁的正确实现方式