08-Redis_事务_锁机制_秒杀案例分析
Posted 快乐的小码农2号选手
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了08-Redis_事务_锁机制_秒杀案例分析相关的知识,希望对你有一定的参考价值。
一、Redis事务
1.1、Redis事务介绍
Redis事务的主要作用就是串联多个命令防止别的命令插队,当一个事务执行时,可以保证不会被打断,除非exec执行时出现错误。
1.2、Multi、Exec、Discard指令
从输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行。
组队的过程中可以通过discard来放弃组队。
组队示意图:
案例:
当前事务没有执行,key为nil
执行事务:
1.3、事务出现的错误的两种情况
第一种:组队阶段出现错误
第二种:组队阶段没有出现错误,但是我们的exec执行阶段出现错误。
1.4、事务的错误处理
组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消。
如果执行阶段某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚。
1.5、Redis事务的三特性
注意和关系型数据库的事务区别操作,二者的事务是具有不同的特性。
- 单独的隔离操作 : 事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
- 没有隔离级别的概念 : 队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
- 不保证原子性 : 事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚
- 此外Redis事务没有回滚的机制,因为一旦执行,除非出错否则不会出现终止命令。
二、锁的机制
2.1、为啥需要事务机制
想想一个场景:有很多人有你的账户,同时去参加双十一抢购
但是我们只有一个账户可以操作成功,并发执行时,如何完成操作呢,保证我们数据执行不出现错误,这就需要我们事务机制,我们事务机制就是一个锁的机制,我们的Redis的事务锁的机制默认为乐观锁
一个请求想给金额减8000
一个请求想给金额减5000
一个请求想给金额减1000
2.2、悲观锁
悲观锁(Pessimistic Lock),
顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会block直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁,操作当前数据的只能有一个人,其他的需要等待我们的锁完成机制才可以进一步操作我们数据。
但是这种锁的机制,不适用与大并发的操作,我们不能让其他请求一直等待。
本人感觉,有点消息队列的味道。
2.3、乐观锁
乐观锁(Optimistic Lock),
顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis就是利用这种check-and-set机制实现事务的。MP具有乐观锁机制,嘻嘻。
但是也会存在一个问题,就是用户的并发请求,同时请求操作一个数据,此时获取的版本号都是一样的,但是当一个请求执行结束时,修改版本号,其他并发请求都会失败,假设用户只请求一次,那么可能造成数据只被操作一次,比如抢购,同时并发请求,导致商品少卖,需要我们另外解决。
2.4、Redis锁机制实现WATCH key [key]指令
在执行multi之前,先执行watch key1 [key2],可以监视一个(或多个) key ,如果在事务执行之前这个(或这些) key
被其他命令所改动,那么事务将被打断,相当于乐观锁的版本号对不上,所以执行失败。而且当我们执行exec之后我们watch就会自动取消对操作键的监视
当我们在执行exec之前执行了其他指令操作了我们的键,就会导致执行失败
2.5、unwatch
取消 WATCH 命令对所有 key 的监视。 如果在执行 WATCH 命令之后,EXEC 命令或DISCARD
命令先被执行了的话,那么就不需要再执行UNWATCH 了。
三、秒杀案例
3.1、案例分析
我们可以指定一个商品的库存,然后开启秒杀的功能,用户可以并发的执行秒杀功能,在redis中指定我们的商品库存的名字作为key,value为库存数量,在指定一个秒杀成功的清单,我们可以使用set集合进行存储我们秒杀成功的清单,因为set可以自动去重,避免用户重发秒杀。
3.2、第一版:简单分析版
只有简单的逻辑操作,存在超卖的操作,因为并发执行时,同时操作我们的数据,没有事务,会导致我们直接超卖
@PostMapping("/ms")
public boolean test(@RequestParam("prodId") String prodId) {
// 模拟用户Id,商品id已经获取到
String userId = new Random().nextInt(5000) + "";
// 秒杀
boolean result = doSecKill(userId, prodId);
// 返回秒杀结果
return result;
}
/**
* 秒杀具体实现
*
* @param userId
* @param prodId
* @return
*/
private boolean doSecKill(String userId, String prodId) {
// 1、判断用户id和商品id是否有值
if (!StringUtils.hasText(userId) && !StringUtils.hasText(prodId)) {
System.out.println("参数异常");
return false;
}
// 2、redis key的封装
String qtKey = "sk:" + prodId + ":qt";
String userKey = "sk:" + prodId + ":user";
// 3、获取商品的库存,如果为null说明还没有开始
String count = stringRedisTemplate.opsForValue().get(qtKey);
if (count == null) {
System.out.println("秒杀还没有开始!!");
return false;
}
// 4、判断是否为重复秒杀,判断set中是否已经存在该用户的Id值
Boolean isExist = redisTemplate.opsForSet().isMember(userKey, userId);
if (isExist) {
System.out.println("重复秒杀!!");
return false;
}
// 5、判断商品的库存是否还存在库存
int kuCount = Integer.parseInt(count);
if (kuCount <= 0) {
System.out.println("商品已经抢购结束!!");
return false;
}
// 6、执行秒杀
// 6.1、库存-1
redisTemplate.opsForValue().decrement(qtKey);
// 6.2、用户写入秒杀清单
System.out.println(userId);
redisTemplate.opsForSet().add(userKey, userId);
return true;
}
3.3、第二版:加上事务操作,解决超卖
private boolean doSecKill(String userId, String prodId) {
// 1、判断用户id和商品id是否有值
if (!StringUtils.hasText(userId) && !StringUtils.hasText(prodId)) {
System.out.println("参数异常");
return false;
}
// 2、redis key的封装
String qtKey = "sk:" + prodId + ":qt";
String userKey = "sk:" + prodId + ":user";
// 3、获取商品的库存,如果为null说明还没有开始
String count = stringRedisTemplate.opsForValue().get(qtKey);
if (count == null) {
System.out.println("秒杀还没有开始!!");
return false;
}
// 4、判断是否为重复秒杀,判断set中是否已经存在该用户的Id值
Boolean isExist = redisTemplate.opsForSet().isMember(userKey, userId);
if (isExist) {
System.out.println("重复秒杀!!");
return false;
}
// 5、判断商品的库存是否还存在库存
int kuCount = Integer.parseInt(count);
if (kuCount <= 0) {
System.out.println("商品已经抢购结束!!");
return false;
}
// 6、执行秒杀
// 使用事务加乐观锁watch解决超卖
redisTemplate.watch(qtKey);
// 开启事务 redisTemplate事务操作,需要这样写实现一个sessionCallback接口调用
SessionCallback sessionCallback = new SessionCallback() {
@Override
public Object execute(RedisOperations redisOperations) throws DataAccessException {
redisOperations.multi();
// 进行组队
// 库存-1
redisOperations.opsForValue().decrement(qtKey);
// 用户写入秒杀清单
redisOperations.opsForSet().add(userKey, userId);
return redisOperations.exec();
}
};
// 执行事务
Object execute = redisTemplate.execute(sessionCallback);
if (execute == null) {
System.out.println("exec失败");
return false;
}
return true;
}
3.4、第三版:解决并发库存遗留问题
3.4.1、问题分析
乐观锁造成库存遗留的问题解决,并发执行2000个请求同时执行,当第一个请求成功后,修改版本号,导致其他请求版本号不一致,导致请求失败。
3.4.2、使用LUA脚本语言解决
将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。
LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。
但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。
利用lua脚本淘汰用户,解决超卖问题。
redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。
local userid=KEYS[1];
local prodid=KEYS[2];
local qtkey="sk:"..prodid..":qt";
local usersKey="sk:"..prodid.":usr';
local userExists=redis.call("sismember",usersKey,userid);
if tonumber(userExists)==1 then
return 2;
end
local num= redis.call("get" ,qtkey);
if tonumber(num)<=0 then
return 0;
else
redis.call("decr",qtkey);
redis.call("sadd",usersKey,userid);
end
return 1;
3.4.3、完整代码
/**
* lua脚本
*/
private String LuaScript = "local userid=KEYS[1]; \\n" +
"local prodid=KEYS[2];\\n" +
"local qtkey=\\"sk:\\"prodid\\":qt\\";\\n" +
"local usersKey=\\"sk:\\"prodid\\":user';\\n" +
"local userExists=redis.call(\\"sismember\\",usersKey,userid);\\n" +
"if tonumber(userExists)==1 then \\n" +
" return 2;\\n" +
"end\\n" +
"local num= redis.call(\\"get\\" ,qtkey);\\n" +
"if tonumber(num)<=0 then \\n" +
" return 0; \\n" +
"else \\n" +
" redis.call(\\"decr\\",qtkey);\\n" +
" redis.call(\\"sadd\\",usersKey,userid);\\n" +
"end\\n" +
"return 1;\\n";
private boolean doSecKillFinally(String userId, String prodId) {
// 加载Lua脚本
DefaultRedisScript<String> defaultRedisScript = new DefaultRedisScript<>();
defaultRedisScript.setResultType(String.class);
defaultRedisScript.setScriptText(LuaScript);
// 创建执行的参数,keys
List<String> keyList = new ArrayList<>();
keyList.add(userId);
keyList.add(prodId);
// 执行
Object execute = redisTemplate.execute(defaultRedisScript, keyList);
String result = String.valueOf(execute);
// 判断执行的结果
if ("0".equals(result)) {
System.out.println("已强空");
return false;
} else if ("1".equals(result)) {
System.out.println("抢购成功");
return true;
} else if ("2".equals(result)) {
System.out.println("重复抢购");
return false;
} else {
System.out.println("抢购异常");
return false;
}
}
以上是关于08-Redis_事务_锁机制_秒杀案例分析的主要内容,如果未能解决你的问题,请参考以下文章