Redis的分布式锁
Posted 爱上口袋的天空
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis的分布式锁相关的知识,希望对你有一定的参考价值。
目录
一、什么是分布式锁?
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
- 当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。
- 在传统单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLcok或synchronized)进行互斥控制。
- 但是在分布式系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机并发控制锁策略失效,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁的由来。
二、成为分布式锁的必要条件
三、分布式锁的实现方案
分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,市面上常见大致有三种分布式锁的实现方案
四、Redis 分布式锁的实现核心思路
1、实现分布式锁时需要实现的两个基本方法
- 获取锁:
- 互斥:确保只能有一个线程获取锁
- 非阻塞:尝试一次,成功返回 true,失败返回 false
- 释放锁:
- 手动释放
- 超时释放:获取锁时添加一个超时时间
2、核心思路:
我们利用 redis 的 setNx 方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个 key 了,返回了 1,如果结果是 1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的哥们,等待一定时间后重试即可
超时释放
用expire命令添加了过期时间,等时间到了自动释放!
如果在添加过期时间后宕机了呢?
在redis中,获取锁和添加锁的命令可以一起执行,一条redis的命令中,是具有原子性的!
最优的写法如下
set lock thread1 nx ex 10
五、Redis分布式锁实现秒杀下单(初级版本)
1、代码实现
编写ILock接口,定义锁
/**
* redis的分布式锁
*/
public interface ILock
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true代表获取锁成功; false代表获取锁失败
*/
boolean tryLock(long timeoutSec);
/**
* 释放锁
*/
void unlock();
实现ILock接口,编写tryLock()和unLock()逻辑
/**
* redis的分布式锁
* 实现ILock接口
*/
public class SimpleRedisLock implements ILock
// 不同的业务有不同的锁名称
private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate)
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
@Override
public boolean tryLock(long timeoutSec)
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
// set lock thread1 nx ex 10
// nx : setIfAbsent , ex : timeoutSec
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
// 自动拆箱!!!可能有风险
return Boolean.TRUE.equals(success);
@Override
public void unlock()
stringRedisTemplate.delete(KEY_PREFIX + name);
业务逻辑
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result seckillVoucher(Long voucherId)
// 1. 查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
LocalDateTime nowTime = LocalDateTime.now();
// 2. 判断秒杀是否开始
if (nowTime.isBefore(voucher.getBeginTime()))
return Result.fail("活动未开始!");
// 3. 判断秒杀是否结束
if (nowTime.isAfter(voucher.getEndTime()))
return Result.fail("活动已结束!");
// 4. 判断库存
if (voucher.getStock() < 1)
return Result.fail("已买完!");
Long userId = UserHolder.getUser().getId();
// 创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 获取锁 1200s
boolean isLock = lock.tryLock(1200);
if (!isLock)
return Result.fail("不允许重复下单!");
try
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
finally
lock.unlock();
@Transactional
public Result createVoucherOrder(Long voucherId)
// 一人一单
Long userId = UserHolder.getUser().getId();
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if (count > 0)
return Result.fail("用户已经购买过一次了!");
// 5. 减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock", 0) // CAS方案(乐观锁)!
.update();
if (!success)
return Result.fail("库存不足");
// 6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1 订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2 用户id
voucherOrder.setUserId(userId);
// 6.3代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
从之前的 synchronized 到现在的 isLock
// 创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 获取锁 1200s
boolean isLock = lock.tryLock(1200);
if (!isLock)
return Result.fail("不允许重复下单!");
try
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId);
finally
lock.unlock();
测试结果
8081端口
isLock的结果是false,所以不会被锁住,执行后续的业务逻辑
8082端口
isLock的结果是true,所以会被锁住,直接return
存在问题
这么看起来视乎是没有问题了,但是我们来看看下面的这种特殊情况!
由于线程1的业务阻塞,线程1的锁超时释放了,所以此时线程2就可以获取线程1刚刚释放的锁
当线程2开始执行自己的业务代码时,线程1的业务正巧又执行完了,所以刚刚线程2获取的锁会被线程1释放
此时又来了一个线程3,由于此时的状态是“无锁”,所以线程3也可以获取锁,那么现在就变成了线程2、3同时在执行业务代码!!!
这样子就可能发生线程安全的问题
这就是Redis分布式锁误删问题!!!
2、解决方案
发生这个问题的主要原因。主要是在各个线程之间释放锁的机制有问题!我们要让线程只能释放自己上的锁,不能释放其他线程的锁!(自己的事情自己做)
修改一下释放锁的逻辑,每一个线程只能删除自己的锁!!!
redis中数据样例
代码如下
我们可以用当前线程的id与redis的值相比较,来确定是不是同一个!
@Override
public void unlock()
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id))
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
但是这仍然不是最佳的实现方案,它在极端的情况下还是会发生问题!
如上述代码,如果“判断锁标识”和“释放锁”,之间发生了阻塞呢?(JVM触发FULL GC)
那么之前一节所讲的“锁误删”就有可能发生!!!
所以,我们的解决思路是要保证这两个操作的原子性!
我们可以使用Redis的事务+乐观锁来解决这个问题,但是这样子做非常复杂!这里我们使用 Lua 脚本来实现分布式锁
六、使用 Lua 脚本来实现分布式锁
1、Lua的简单介绍
Redis 提供了 Lua 脚本功能,在一个脚本中编写多条 Redis 命令,确保多条命令执行时的原子性。Lua 是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍 Redis 提供的调用函数,我们可以使用 lua 去操作 redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了,作为 Java 程序员这一块并不作一个简单要求,并不需要大家过于精通,只需要知道他有什么作用即可。
这里重点介绍 Redis 提供的调用函数,语法如下:
redis.call('命令名称', 'key', '其它参数', ...)
例如,我们要执行 set name jack,则脚本是这样:
# 执行 set name jack
redis.call('set', 'name', 'jack')
例如,我们要先执行 set name Rose,再执行 get name,则脚本如下:
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name
写好脚本以后,需要用 Redis 命令来调用脚本,调用脚本的常见命令如下:
例如,我们要执行 redis.call (‘set’, ‘name’, ‘jack’) 这个脚本,语法如下:
如果脚本中的 key、value 不想写死,可以作为参数传递。key 类型参数会放入 KEYS 数组,其它参数会放入 ARGV 数组,在脚本中可以从 KEYS 和 ARGV 数组获取这些参数:
接下来我们来回一下我们释放锁的逻辑:
释放锁的业务流程是这样的:
- 获取锁中的线程标示
- 判断是否与指定的标示(当前线程标示)一致
- 如果一致则释放锁(删除)
- 如果不一致则什么都不做
如果用 Lua 脚本来表示则是这样的:
Lua脚本解决unLock业务流程
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
2、代码实现
unLock.lua
-- 获取锁标识,是否与当前线程一致?
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 一致,删除
return redis.call('del', KEYS[1])
end
-- 不一致,直接返回
return 0
RedisTemplate调用Lua脚本的API
Lua 解决unLock问题
/**
* redis的分布式锁
* 实现ILock接口
*/
public class SimpleRedisLock implements ILock
// 不同的业务有不同的锁名称
private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
// DefaultRedisScript,
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate)
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
// 初始化 UNLOCK_SCRIPT,用静态代码块的方式,一加载SimpleRedisLock有会加载unlock.lua
// 避免每次调unLock() 才去加载,提升性能!!!
static
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// setLocation() 设置脚本位置
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
// 返回值类型
UNLOCK_SCRIPT.setResultType(Long.class);
/**
* 获取锁
*/
@Override
public boolean tryLock(long timeoutSec)
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
// set lock thread1 nx ex 10
// nx : setIfAbsent(如果不存在) , ex : timeoutSec(秒)
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
// 自动拆箱(Boolean -> boolean)!!!可能有风险
return Boolean.TRUE.equals(success);
/**
* 解决判断(锁标识、释放锁)这两个动作,之间产生阻塞!!!
* JVM的 FULL GC
* 要让这两个动作具有原子性
*/
@Override
public void unlock()
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
3、小总结
基于 Redis 的分布式锁实现思路:
- 利用 set nx ex 获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
- 特性:
- 利用 set nx 满足互斥性
- 利用 set ex 保证故障时锁依然能释放,避免死锁,提高安全性
- 利用 Redis 集群保证高可用和高并发特性
- 特性:
以上是关于Redis的分布式锁的主要内容,如果未能解决你的问题,请参考以下文章