Redis | 黑马点评 + 思维导图分布式锁
Posted 十八岁讨厌编程
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis | 黑马点评 + 思维导图分布式锁相关的知识,希望对你有一定的参考价值。
文章目录
分布式锁的基本原理和实现方式对比
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
在分布式情况下有多个JVM,所以就有多个锁监视器,所以就存在有多个线程拿到锁,他们就会产生不互斥的情况,这也是我们为什么要使用分布式锁的原因。而从图中我们也可以看到分布式锁的原理:让多个微服务共用一个锁监视器,这样就不会有多把锁同时存在的情况。
那么分布式锁他应该满足一些什么样的条件呢?
-
可见性
:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思 -
互斥
:互斥是分布式锁的最基本的条件,使得程序串行执行 -
高可用
:程序不易崩溃,时时刻刻都保证较高的可用性 -
高性能
:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能 -
安全性
:安全也是程序中必不可少的一环
常见的分布式锁有三种
-
mysql
:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见 -
Redis
:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁 -
Zookeeper
:zookeeper也是企业级开发中较好的一个实现分布式锁的方案
Redis分布式锁的实现核心思路
实现分布式锁时需要实现的两个基本方法:
-
获取锁:
- 互斥:确保只能有一个线程获取锁
- 非阻塞:尝试一次,成功返回true,失败返回false
我们在获取锁的时候一开始的思路如下:
但是有一种可能就是当我们执行完第一个命令添加完锁之后,此时宕机了,那么后面给锁添加过期时间的命令还没来得及执行,这时候就会出现死锁的现象。为了让这两个命令达到同时成功或者失败的效果,我们可以把两个命令进行合并。 -
释放锁:
- 手动释放
- 超时释放:获取锁时添加一个超时时间
核心思路:
我们利用redis 的setNx 方法,当有多个线程进入时,我们就利用该方法,第一个线程进入时,redis 中就有这个key 了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的哥们,等待一定时间后重试即可
分布式锁的初级实现
锁的基本接口
接下来我们实现这一个接口:
public class SimpleRedisLock implements ILock
private StringRedisTemplate redisTemplate;
//定义一个锁的通用前缀
private static final String KEY_PREFIX="lock:";
//这个锁可能会被多个业务使用,所以需要使用者提供业务名称来完成key的组建
private String name;
public SimpleRedisLock(StringRedisTemplate redisTemplate, String name)
this.redisTemplate = redisTemplate;
this.name = name;
@Override
public Boolean tryLock(long time)
//我们的锁里面要记载当前的线程id
long id = Thread.currentThread().getId();
Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, id + "", time, TimeUnit.SECONDS);
//为了防止拆包的时候出现空指针异常我们使用如下方法
return Boolean.TRUE.equals(success);
@Override
public void unlock()
redisTemplate.delete(KEY_PREFIX + name);
注意点:
- 这个锁可能会被多个业务使用,所以需要使用者提供业务名称来完成key的组建
- 我们的锁里面要记载当前的线程id,为我们后面处理Redis分布式锁误删的问题作准备
- 为了防止拆包的时候出现空指针异常我们使用
Boolean.TRUE.equals(success);
修改业务代码
@Override
public Result seckillVoucher(Long voucherId)
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now()))
// 尚未开始
return Result.fail("秒杀尚未开始!");
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now()))
// 尚未开始
return Result.fail("秒杀已经结束!");
// 4.判断库存是否充足
if (voucher.getStock() < 1)
// 库存不足
return Result.fail("库存不足!");
Long userId = UserHolder.getUser().getId();
//创建锁对象(新增代码)
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//获取锁对象
boolean isLock = lock.tryLock(1200);
//加锁失败
if (!isLock)
return Result.fail("不允许重复下单");
try
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
finally
//释放锁
lock.unlock();
前情回顾:我们设计这个分布式锁的初衷就是解决一人一单问题。当一位用户在同一时间使用高并发的方式去抢优惠卷的时候,因为负载均衡的原因他发出的请求可能会被几个不同的服务器处理,这个时候我们使用java中常规的锁是不起作用的,所以我们需要一个分布式锁。
而这个分布式锁的作用就是保证在高并发的情况下只有一个线程抢到了票,其他线程均会失败,随后释放锁。之后如果该用户还想用这种方法抢优惠卷的话因为数据库已经记录了它的优惠卷,所以他不会成功。
Redis分布式锁误删情况说明
逻辑说明:
持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况说明
有人可能会认为我们存在redis中的key不是包含用户的id吗,怎么会存在误删别人锁的情况呢?
其实我们可以想象一种情况:一位用户进行优惠卷秒杀的时候,第一次领取优惠卷发生了阻塞,过了一会用户可能会觉得我明明领取了怎么还没有反应,于是可能又会去点击领取优惠卷。这个时候用户id是一样的,那么锁误删的情况就不可避免。
解决方案:解决方案就是在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则不进行锁的删除,假设还是上边的情况,线程1卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1反应过来,然后删除锁,但是线程1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程2走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。
解决Redis分布式锁误删问题
需求:修改之前的分布式锁实现,满足:在获取锁时存入线程标示(可以用UUID表示)
这里使用UUID是因为,线程ID是一个递增的数字,在JVM中每创建一个线程,这个数字都会递增。而如果是在集群的模式下,就会有多个JVM,而每个JVM都会维护这样一个线程ID,所以非常容易出现线程ID相同的情况,不利于我们后面的判断。
在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
- 如果一致则释放锁
- 如果不一致则不释放锁
核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。
具体代码如下:加锁
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec)
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
释放锁
public void unlock()
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id))
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
有关代码实操说明:
在我们修改完此处代码后,我们重启工程,然后启动两个线程,第一个线程持有锁后,手动释放锁,第二个线程 此时进入到锁内部,再放行第一个线程,此时第一个线程由于锁的value值并非是自己,所以不能释放锁,也就无法删除别人的锁,此时第二个线程能够正确释放锁,通过这个案例初步说明我们解决了锁误删的问题。
分布式锁的原子性问题
更为极端的误删逻辑说明:
线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生,
Lua脚本解决多条命令原子性问题
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。
Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html
这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁、比锁、删锁是一个原子性动作了,作为Java程序员这一块并不作一个简单要求,并不需要大家过于精通,只需要知道他有什么作用即可。
这里重点介绍Redis提供的调用函数,语法如下:
redis.call('命令名称', 'key', '其它参数', ...)
例如,我们要执行set name jack,则脚本是这样:
# 执行 set name jack
redis.call('set', 'name', 'jack')
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:
例如,我们要执行 redis.call(‘set’, ‘name’, ‘jack’) 这个脚本,语法如下:
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:
接下来我们来回一下我们释放锁的逻辑:
释放锁的业务流程是这样的
1、获取锁中的线程标示
2、判断是否与指定的标示(当前线程标示)一致
3、如果一致则释放锁(删除)
4、如果不一致则什么都不做
如果用Lua脚本来表示则是这样的:
最终我们操作redis的拿锁比锁删锁的lua脚本就会变成这样
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
利用Java代码调用Lua脚本改造分布式锁
lua脚本本身并不需要大家花费太多时间去研究,只需要知道如何调用,大致是什么意思即可,所以在笔记中并不会详细的去解释这些lua表达式的含义。
我们的RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图股
Java代码
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
public void unlock()
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
//经过以上代码改造后,我们就能够实现 拿锁比锁删锁的原子性动作了~
- Collections.singletonList(KEY_PREFIX + name):返回一个只包含指定对象的不可变列表。 返回的列表是可序列化的。
小总结:
基于Redis的分布式锁实现思路:
- 利用set nx ex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
- 特性:
- 利用set nx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
- 特性:
小总结:我们一路走来,利用添加过期时间,防止死锁问题的发生,但是有了过期时间之后,可能出现误删别人锁的问题,这个问题我们开始是利用删之前 通过拿锁,比锁,删锁这个逻辑来解决的,也就是删之前判断一下当前这把锁是否是属于自己的,但是现在还有原子性问题,也就是我们没法保证拿锁比锁删锁是一个原子性的动作,最后通过lua表达式来解决这个问题
但是目前还剩下一个问题锁不住,什么是锁不住呢,你想一想,如果当过期时间到了之后,我们可以给他续期一下,比如续个30s,就好像是网吧上网, 网费到了之后,然后说,来,网管,再给我来10块的,是不是后边的问题都不会发生了,那么续期问题怎么解决呢,可以依赖于我们接下来的redission啦
测试逻辑:
第一个线程进来,得到了锁,手动删除锁,模拟锁超时了,其他线程会执行lua来抢锁,当第一天线程利用lua删除锁时,lua能保证他不能删除他的锁,第二个线程删除锁时,利用lua同样可以保证不会删除别人的锁,同时还能保证原子性。
分布式锁-redission
分布式锁-redission功能介绍
基于setnx实现的分布式锁存在下面的问题:
重入问题:重入问题是指获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
我们可以举一个例子,我们有一个方法A,在方法A中我们要去调方法B。在方法A中要先去获取锁,然后再去调B,而B里又要去获取同一把锁。如果不可重入的话,那么在B里就获取不到锁会一直等待该锁的释放,而方法A因为没有执行完不会释放锁,所以B就会一直陷入等待,造成死锁。
不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
超时释放: 我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患
主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
那么什么是Redission呢
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
简单来说Redisson就是一个在Redis基础上实现的分布式工具集。
Redission提供了分布式锁的多种多样的功能
分布式锁-Redission快速入门
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置Redisson客户端:
@Configuration
public class RedissonConfig
@Bean
public RedissonClient redissonClient()
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
如何使用Redission的分布式锁
@Resource
private RedissionClient redissonClient;
@Test
void testRedisson() throws Exception
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock)
try
System.out.println("执行业务");
finally
//释放锁
lock.unlock();
lock.tryLock(1,10,TimeUnit.SECONDS)
的意思是:尝试加锁,最多等待(尝试)1秒,上锁以后10秒自动解锁
在 VoucherOrderServiceImpl
注入RedissonClient
@Resource
private RedissonClient redissonClient;
@Override
public Result seckillVoucher(Long voucherId)
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now()))
// 尚未开始
return Result.fail("秒杀尚未开始!");
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now()))
// 尚未开始
return Result.fail("秒杀已经结束!");
// 4.判断库存是否充足
if (voucher.getStock() < 1)
// 库存不足
return Result.fail("库存不足!");
Long userId = UserHolder.getUser().getId();
//创建锁对象 这个代码不用了,因为我们现在要使用分布式锁
//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁对象
boolean isLock = lock.tryLock();
//加锁失败
if (!isLock)
return Result.fail("不允许重复下单");
try
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
finally
//释放锁
lock.unlock();
分布式锁-redission可重入锁原理
我们来看一个案例:
咱们自定义的分布锁采用的是 Redis 的 string 数据类型,也就是简单的 key value。整个获取锁的流程是上图右边这样子的,在一开始尝试获取锁,其实就是执行这个 set 命令,当然要加上 nx、ex 参数,那 nx 的目的就是实现一个互斥,满足互斥锁的基本要求。同时再去获取锁的时候,我们要存入这个线程的标识,其目的就是将来在释放锁的时候做判断,避免误删,只有锁是自己的才去做这个删除。
那么这样的一个流程为什么不能重录?我们一起来看一下上图左边这样一个demo。首先在这里我们会去创建一个锁的对象,接下来有一个测试方法, method1,在 method1 里会首先尝试获取锁,如果失败它就会报错,而如果成功,那么它就会去调用一个方法 method 2。而在 method 2 里又一次尝试获取锁,那么 method1 去调 method2,所以他们两个是在一个线程里的,那一个线程连续两次去加锁,这其实就是锁的重入了。那我们来看一下如果按照我们这个流程,它能不能实现重入。
首先 method1 尝试获取锁,那按照我们这里就会去 set 这个锁名称以及锁的标识进去,那在这就是lock,以及比如说这个线程名叫 Thread 1,我们把它存进去了,接下来往下执行调用这个 method 2,那么 method 2 又一次尝试获取锁,那么又要执行这个 set lock thread 1,那因为这里加了 nx 的一个参数,也就是说只有第一个人能 set 成功,那这里已经有值了。所以说 master2 在直行 try 的时候会失败。
那么可重入是怎么实现的呢?在这简单给大家说一下,其实所谓的可重入无非就是在获取锁的时候,当我判断这个锁已经有人的情况下看一下拿到锁的是不是我自己,也就是说是不是同一个线程,如果是同一个线程的话,我也会让它获取锁。
在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。
在分布式锁中,他采用hash结构用来存储锁,其中大key表示表示这把锁是否存在,用小key表示当前这把锁被哪个线程持有。
在redission中,我们的也支持可重入锁
redission为了保证获得锁、释放锁等操作的原子性其内部使用的也是Lua脚本:
获取锁的Lua脚本:
释放锁的Lua脚本:
所以接下来我们一起分析一下当前的这个lua表达式
这个地方一共有3个参数
KEYS[1] : 锁名称
ARGV[1]: 锁失效时间
ARGV[2]: id + “:” + threadId; 锁的小key
exists: 判断数据是否存在 name:是lock是否存在,如果==0,就表示当前这把锁不存在
redis.call(‘hset’, KEYS[1], ARGV[2], 1);此时他就开始往redis里边去写数据 ,写成一个hash结构
Lock
id + “:” + threadId : 1
如果当前这把锁存在,则第一个条件不满足,再判断
redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1
此时
黑马点评项目总结
黑马点评
一、短信登陆功能
1.基于session实现
2.基于session实现登陆的问题
单体应用时用户的会话信息保存在session中,session存在于服务器端的内存中,由于前前后后用户只针对一个web服务器,所以没啥问题。但是一到了web服务器集群的环境下
(我们一般都是用Nginx做负载均衡,若是使用了轮询等这种请求分配策略),就会导致用户小a在A服务器登录了,session存在于A服务器中,但是第二次请求被分配到了B服务器,由于B服务器中没有用户小a的session会话,导致用户小a还要再登陆一次,以此类推。这样用户体验很不好。当然解决办法也有很多种,比如同一个用户分配到同一个服务处理、使用cookie保持用户会话信息等。
因此,要解决这样的问题必须满足以下条件:
- 数据共享
- 内存存储
- key、value结构
3.基于redis实现短信登陆
发送验证码:
/**
* 发送手机验证码
*/
@PostMapping("code")
public Result sendCode(@RequestParam("phone") String phone, HttpSession session)
return userService.sendCode(phone,session);
@Override
public Result sendCode(String phone, HttpSession session)
//1.校验手机号
if (RegexUtils.isPhoneInvalid(phone))
//2.如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
//3.符合则生成验证码
final String code = RandomUtil.randomNumbers(6);
//4.保存验证码到redis
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
//5.发送验证码
log.debug("发送短信验证码成功,验证码:",code);
//6.返回null
return Result.ok();
验证登陆功能:
login方法会把生成的token返回给前端,浏览器会将其保存到session中。
/**
* 登录功能
* @param loginForm 登录参数,包含手机号、验证码;或者手机号、密码
*/
@PostMapping("/login")
public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session)
return userService.login(loginForm,session);
@Override
public Result login(LoginFormDTO loginForm, HttpSession session)
//1.校验手机号
final String phone = loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone))
//2.如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
//2.校验验证码,从redis中获取
final String cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY+phone);
final String code = loginForm.getCode();
if(cacheCode==null||!cacheCode.equals(code))
//3.不一直,报错
return Result.fail("验证码错误");
//4.一致,根据手机号查询用户
User user = query().eq("phone", phone).one();
//5.判断用户是否存在
if (user == null)
//6.不存在,创建新用户并保存
user = createUserWithPhone(phone);
//7.保存用户信息到redis中
//7.1随机生成token,作为登陆令牌
String token = UUID.randomUUID().toString(true);
//7.2将User对象转为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user,UserDTO.class);
final Map<String, Object> map = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create().setIgnoreNullValue(true).setFieldValueEditor((fieldName,fieldValue)->
return fieldValue.toString();
)
);
//7.3存储
stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY+token,map);
//7.4设置token有效期
stringRedisTemplate.expire(LOGIN_USER_KEY+token,3000,TimeUnit.MINUTES);
//8.返回token
return Result.ok(token);
private User createUserWithPhone(String phone)
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(5));
save(user);
return user;
这里使用redis的hash结构
存储user信息,原因是:
- 若使用String结构,以JSON字符串来保存,比较直观
- 但Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD,并且内存占用更少
拦截器:
- 首先,对于每个请求,我们首先根据token判断用户是否已经登陆(是否已经保存到ThreadLocal中),如果没有登陆,放行交给登陆拦截器去做,如果已经登陆,刷新token的有效期,然后放行。
- 之后来到登陆拦截器,如果ThreadLocal没有用户,说明没有登陆,拦截,否则放行。
定义UserHolder工具类:
public class UserHolder
private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
public static void saveUser(UserDTO user)
tl.set(user);
public static UserDTO getUser()
return tl.get();
public static void removeUser()
tl.remove();
刷新token拦截器:
@Slf4j
public class RefreshTokenInterceptor implements HandlerInterceptor
private StringRedisTemplate stringRedisTemplate;
public RefreshTokenInterceptor(StringRedisTemplate stringRedisTemplate)
this.stringRedisTemplate = stringRedisTemplate;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception
//1.获取请求头中的token
final String token = request.getHeader("authorization");
if (token == null)
return true;
//2.获取redis中的用户
final Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(LOGIN_USER_KEY + token);
//3.判断用户是否存在
if (userMap.isEmpty())
return true;
//5.将查询到的Hash数据转换为UserDto对象
final UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
//6.存在,保存用户信息到ThreadLocal
UserHolder.saveUser(userDTO);
//7.刷新token有效期
stringRedisTemplate.expire(LOGIN_USER_KEY+token,3000, TimeUnit.MINUTES);
//8.放行
return true;
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception
UserHolder.removeUser();
登陆拦截器:
public class LoginInterceptor implements HandlerInterceptor
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception
//1.判断是否需要拦截(ThreadLocal中是否有用户)
if(UserHolder.getUser()==null)
response.setStatus(401);
return false;
//8.放行
return true;
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception
UserHolder.removeUser();
在配置类中配置拦截器:
@Configuration
public class MvcConfig implements WebMvcConfigurer
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry)
//登陆拦截器
registry.addInterceptor(new LoginInterceptor()).excludePathPatterns(
"/user/code","/user/login","/blog/hot","/shop/**","/shop-type/**","/upload/**"
,"/voucher/**"
).order(1);
//token属性的拦截器
registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).addPathPatterns("/**").order(0);
4.补充ThreadLocal相关知识
a.ThreadLocal的数据结构
-
Thread类有一个类型为
ThreadLocal.ThreadLocalMap
的实例变量threadLocals,也就是说每个线程有一个自己的ThreadLocalMap。 -
ThreadLocalMap有自己的独立实现,可以简单地将它的key视作ThreadLocal,value为代码中放入的值(实际上key并不是ThreadLocal本身,而是它的一个弱引用)。
-
每个线程在往ThreadLocal里放值的时候,都会往自己的ThreadLocalMap里存,读也是以ThreadLocal作为引用,在自己的map里找对应的key,从而实现了线程隔离。
-
ThreadLocalMap有点类似HashMap的结构,只是HashMap是由数组+链表实现的,而ThreadLocalMap中并没有链表结构。
-
我们还要注意Entry, 它的key是ThreadLocal<?> k ,继承自WeakReference, 也就是我们常说的弱引用类型。
b.内存泄露问题
由于ThreadLocal的key是弱引用
,故在gc
时,key会被回收掉,但是value是强引用
没有被回收,所以在我们拦截器的方法里必须手动remove()。
二、redis缓存
1.选择缓存更新策略
项目选择了主动更新策略,相对较好,主动更新又有以下三种方式:
选择在更新数据库的同时更新缓存。
操作缓存和数据库时有三个问题需要考虑:
- 删除缓存还是更新缓存?
更新缓存:每次更新数据库都更新缓存,无效写操作较多
删除缓存:更新数据库时让缓存失效,查询时再更新缓存 - 如何保证缓存与数据库的操作的同时成功或失败?
单体系统,将缓存与数据库操作放在一个事务
分布式系统,利用TCC等分布式事务方案 - 先操作缓存还是先操作数据库?
若先删除缓存,再操作数据库:
请求1先把缓存中的A数据删除,请求2从db中读数据,请求1再把db中的A更新
若先操作数据库,再删除缓存:
请求1从db中读取数据A,请求2随后更新db中的数据(缓存中由于没有数据,所以不需要删除),最后请求1更新缓存。
可以看出两种方法都有各自的问题,但是由于写的时间要远大于读的时间,所以先操作db再删除cache的出现问题的几率非常小。
2.业务逻辑
- 根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
- 根据id修改店铺时,先修改数据库,再删除缓存
3.缓存存在的问题
a.缓存穿透
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
常见的解决方案有两种:
- 缓存空对象
优点:实现简单,维护方便
缺点:额外的内存消耗,可能造成短期的不一致
适合命中不高,但可能被频繁更新的数据 - 布隆过滤
优点:内存占用较少,没有多余key
缺点:实现复杂,存在误判可能
适合命中不高,但是更新不频繁的数据
解决方案:
/**
* 缓存穿透方法
* @param id
* @return
*/
public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Long time, TimeUnit unit,Function<ID,R> dbFallback)
String key = keyPrefix+id;
//1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isNotBlank(json))
//3.存在,直接返回
return JSONUtil.toBean(json, type);
//命中的是否是空值
if (json != null)
return null;
//4.不存在,根据id查询数据库
R r = dbFallback.apply(id);
//5.不存在,返回错误
if(r==null)
//将空值写入reddis
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
//6.存在,写入redis
this.set(key,r,time,unit);
//7.返回
return r;
b.缓存雪崩
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
- 给不同的Key的TTL添加随机值
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
c.缓存击穿
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案有两种:
- 互斥锁
- 逻辑过期
4.基于逻辑过期解决缓存击穿问题
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* 逻辑过期解决缓存击穿
* @param id
* @return
*/
public <R,ID> R queryWithLogicalExpire(String keyPrefix,ID id,Class<R> type, Long time, TimeUnit unit,Function<ID,R> dbFallback)
String key = keyPrefix+id;
//1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isBlank(json))
//3.不存在,直接返回
return null;
//4.命中,先把json反序列化
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
R r = JSONUtil.toBean(data, type);
LocalDateTime expireTime = redisData.getExpireTime();
//5.判断是否过期
if(expireTime.isAfter(LocalDateTime.now()))
//5.1未过期,直接返回
return r;
//5.2已过期,需要缓存重建
//6.缓存重建
//6.1获取互斥锁
String lockkey = LOCK_SHOP_KEY + id;
boolean lock = tryLock(lockkey);
//6.2判断是否获取锁成功
if(lock)
//6.3成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->
try
//查询数据库
R r1 = dbFallback.apply(id);
//写入redis
this.setWithLogicalExpire(key,r1,time,unit);
catch (Exception e)
e.printStackTrace();
finally
//释放锁
unlock(lockkey);
);
//6.4返回商铺信息
return r;
三、优惠券秒杀
1.优惠券秒杀下单
一般流程:
2.超卖问题
请求a查询库存,发现库存为1,请求b这时也来查询库存,库存也为1,然后请求a让数据库减1,这时候b查询到的仍然是1,也继续让库存减1,就会导致超卖。
超卖问题有以下几个解决方案:
乐观锁
:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据时去判断有没有其它线程对数据做了修改。如果没有修改则认为是安全的,自己才更新数据。如果已经被其它线程修改说明发生了安全问题,此时可以重试或异常。悲观锁
:认为线程安全问题一定会发生,因此在操作数据之前先获取锁,确保线程串行执行。例如Synchronized、Lock都属于悲观锁
实现乐观锁主要有以下两种方法:
- 版本号法
每次更新数据库的时候按照版本查询,并且要更新版本。
- CAS
CAS是英文单词Compare And Swap
的缩写,翻译过来就是比较并替换。
CAS机制当中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。
更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。
CAS的缺点:
1.CPU开销较大
在并发量比较高的情况下,如果许多线程反复尝试更新某一个变量,却又一直更新不成功,循环往复,会给CPU带来很大的压力。
2.不能保证代码块的原子性
CAS机制所保证的只是一个变量的原子性操作,而不能保证整个代码块的原子性。比如需要保证3个变量共同进行原子性的更新,就不得不使用Synchronized了。
3.一人一单功能
要求同一个优惠券,一个用户只能下一单
这样的方式会产生并发安全问题:
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了(每个jvm都有自己的锁监视器,集群模式下各个服务器的锁不共享)。
因此,我们的解决方案就是实现一个共享的锁监视器,即:
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
4.基于redis的分布式锁
a.setnx命令
setnx = SET if Not eXists
-
将 key 的值设为 value ,当且仅当 key 不存在。
-
若给定的 key 已经存在,则 SETNX 不做任何动作
带你重新认识ZooKeeper!java开发入门思维导图