缓存击穿、穿透、雪崩及Redis分布式锁
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了缓存击穿、穿透、雪崩及Redis分布式锁相关的知识,希望对你有一定的参考价值。
参考技术A 分布式锁: setnx ,redisson 并发问题幂等问题: 落表状态,Redis
缓存击穿: 指缓存中无,db中有
原因: 一个key高并发恰好失效导致大量请求到db
方案: 加锁,自旋锁,或一个线程查db,一个线程监控(直接用Redisson分布式锁)
缓存穿透:指缓存和db中均无
原因: 一般是恶意请求
方案: 加布隆过滤,或查db无时,也设置缓存,value为某些特殊表示或"null"
雪崩:指缓存同时大量失效
原因: 大量的key同时失效,db压力加大
方案: 设置失效时间是增加随机数
问题方案文献:
https://www.jianshu.com/p/31ab9b020cd9 (图例分析)
https://blog.csdn.net/fcvtb/article/details/89478554
Redis分布式锁:
事务未执行完锁已到期释放问题:使用Redissoin解决续租问题,内部已解决
分布式锁文献:
https://www.jianshu.com/p/4838f8be00c9
https://blog.csdn.net/qq_30038111/article/details/90696233 (setnx + expire同时操作)
====================================
https://www.runoob.com/redis/keys-scan.html
https://www.jianshu.com/p/611a492d9121 Redis原理与应用
Redis的缓存问题之缓存穿透缓存雪崩缓存击穿
目录
一、什么是缓存穿透?
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
二、常见的解决方案有两种:
1、缓存空对象
简单的来说,就是请求之后,发现数据不存在,就将null值打入Redis中。
- 优点:实现简单,维护方便
- 缺点:
- 额外的内存消耗
- 可能造成短期的不一致
思路分析:
当我们客户端访问不存在的数据时,先请求 redis,但是此时 redis 中没有数据,此时会访问到数据库,但是数据库中也没有数据,这个数据穿透了缓存,直击数据库,我们都知道数据库能够承载的并发不如 redis 这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库,简单的解决方案就是哪怕这个数据在数据库中也不存在,我们也把这个数据存入到 redis 中去,这样,下次用户过来访问这个不存在的数据,那么在 redis 中也能找到这个数据就不会进入到数据库了。
2、布隆过滤
在客户端与Redis之间加了一个布隆过滤器,对于请求进行过滤。
布隆过滤器的大致的原理:布隆过滤器中存放二进制位。数据库的数据通过hash算法计算其hash值并存放到布隆过滤器中,之后判断数据是否存在的时候,就是判断该hash值是0还是1。
但是这个玩意是一种概率上的统计,当其判断不存在的时候就一定是不存在;当其判断存在的时候就不一定存在。所以有一定的穿透风险!!!
- 优点:内存占用较少,没有多余 key
- 缺点:
- 实现复杂
- 存在误判可能
综上所述
我们可以两种方案一起用,这样子最为保险。据统计使用布隆过滤器一般可以避免90%的无效请求。但是黑马程序员这里的视频是使用方案一(缓存空对象)。
三、编码解决商品查询的缓存穿透问题
核心思路如下:
在原来的逻辑中,我们如果发现这个数据在 mysql 中不存在,直接就返回 404 了,这样是会存在缓存穿透问题的
现在的逻辑中:如果这个数据不存在,我们不会返回 404 ,还是会把这个数据写入到 Redis 中,并且将 value 设置为空,欧当再次发起查询时,我们如果发现命中之后,判断这个 value 是否是 null,如果是 null,则是之前写入的数据,证明是缓存穿透数据,如果不是,则直接返回数据。
显然我们在这里只要做两件事:
- 当查询数据在数据库中不存在时,将空值写入 redis
- 判断缓存是否命中后,再加一个判断是否为空值
@Override
public Result queryById(Long id)
// 从redis查询商铺缓存
String key = CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 判断是否存在
if (StrUtil.isNotBlank(shopJson))
// 存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
// 1.判断空值
if (shopJson != null)
// 返回一个错误信息
return Result.fail("店铺不存在!");
// 不存在,根据id查询数据库
Shop shop = getById(id);
// 不存在,返回错误
if (shop == null)
// 2.防止穿透问题,将空值写入redis!!!
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return Result.fail("店铺不存在!");
// 存在,写入Redis
// 把shop转换成为JSON形式写入Redis
// 同时添加超时时间
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
return Result.ok(shop);
小总结:
缓存穿透产生的原因是什么?
- 用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
- 缓存 null 值
- 布隆过滤
- 增强 id 的复杂度,避免被猜测 id 规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
四、缓存雪崩问题及解决思路
1、什么是缓存雪崩?
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。情况大致如下图所示:
解决方案(4种)
- (1)给不同的Key的TTL添加随机值(推荐)
操作简单,当我们在做缓存预热的时候,就有可能在同一时间批量插入大量的数据,那么如果它们的TTL都一样的话就可能出现大量key同时过期的情况!!!所以我们需要在设置过期时间TTL的时候,定义一个范围,追加该范围内的一个随机数。
- (2)利用Redis集群提高服务的可用性
使用集群提高可靠性,后续讲解~~~之后写了会在这里贴上链接。
- (3)给缓存业务添加降级限流策略
也是后续的微服务的知识~~~SpringCloud中有提!!!
- (4)给业务添加多级缓存
请求到达浏览器,nginx可以做缓存,未命中找Redis,再未命中找JVM,最后到数据库......
五、缓存击穿问题及解决思路
1、什么是缓存击穿?
缓存雪崩是因为大量的key同时过期所导致的问题,而缓存击穿则是部分key过期导致的严重后果。
为什么大量key过期会产生问题而少量的key也会有问题呢???
这是因为这一部分的key不简单!!!
缓存击穿问题也叫热点Key问题,就是⼀个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
具体情况如下图所示:
上述:此时假设该热点key的TTL时间到(失效了),则查询缓存未命中,会继续查询数据库,并进行缓存重建工作。但是由于查询SQL逻辑比较复杂、重建缓存的时间较久,并且该key又是热点key,短时间内有大量的线程对其进行访问,所以请求会直接 “打到” 数据库中,数据库就有可能崩掉!!!
2、解决方案(2种)
(1)互斥锁
- 简单的来说就是,并不是所有的线程都有 “ 资格 ” 去访问数据库,只有持有锁的线程才可以对其进行操作。
- 不过该操作有一个很明显的问题,就是会出现相互等待的情况。
(2)逻辑过期
不设置TTL,之前所说导致缓存击穿的原因就是该key的TTL到期了,所以我们在这就不设置TTL了,而是使用一个字段例如:expire表示过期时间(逻辑上的)。当我们想让它 “ 过期 ” 的时候,我们可以直接手动将其删除(热点key,即只是在一段时间内,其被访问的频次很高)。
这种方案巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。
3、互斥锁与逻辑过期的对比分析
六、利用互斥锁解决缓存击穿问题
核心思路:相较于原来从缓存中查询不到数据后直接查询数据库而言,现在的方案是 进行查询之后,如果从缓存没有查询到数据,则进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有获得到,则休眠,过一会再进行尝试,直到获取到锁为止,才能进行查询
如果获取到了锁的线程,再去进行查询,查询后将数据写入 redis,再释放锁,返回数据,利用互斥锁就能保证只有一个线程去执行操作数据库的逻辑,防止缓存击穿。
代码实现
(1)首先,我们声明一下获取锁、释放锁的方法,tryLock()、unLock()
/**
* 获取锁
* @param key
* @return
*/
private boolean tryLock(String key)
// setnx 就是 setIfAbsent 如果存在
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.MINUTES);
// 装箱是将值类型装换成引用类型的过程;拆箱就是将引用类型转换成值类型的过程
// 不要直接返回flag,可能为null
return BooleanUtil.isTrue(flag);
/**
* 释放锁
* @param key
*/
private void unLock(String key)
stringRedisTemplate.delete(key);
注意:这里的锁不是真正的线程锁,而是redis里面的一个特殊的key。
2)互斥锁解决缓存击穿 queryWithMutex()
/**
* 互斥锁解决缓存击穿 queryWithMutex()
* @param id
* @return
*/
public Shop queryWithMutex(Long id)
// 1.从redis查询商铺缓存
String key = CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(shopJson))
return JSONUtil.toBean(shopJson, Shop.class);
// 判断空值
if (shopJson != null)
// 返回一个错误信息
return null;
String lockKey = "lock:shop:" + id;
Shop shop = null;
try
// 4.实现缓存重建
// 4.1获取互斥锁
boolean isLock = tryLock(lockKey);
// 4.2判断是否成功
if (!isLock)
// 4.3失败,则休眠并重试
Thread.sleep(50);
// 递归
return queryWithMutex(id);
// 4.4成功,根据id查询数据库
shop = getById(id);
// 模拟延迟
Thread.sleep(200);
// 5.不存在,返回错误
if (shop == null)
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
return null;
// 6.存在,写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL,TimeUnit.MINUTES);
catch (InterruptedException ex)
throw new RuntimeException(ex);
finally
// 7.释放锁
unLock(lockKey);
// 8.返回
return shop;
七、利用逻辑过期解决缓存击穿问题
需求描述
修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题
注意:这里的key是否过期,不是由redis控制的,而是由我们自己去手动编写逻辑去控制的。
代码实现
(1)添加逻辑过期时间的字段
由于我们之前的Shop中是没有逻辑过期的字段,那么我们要如何让它带有这个属性,又不修改之前的代码呢?
新建一个RedisData对象,里面的data指的是Shop对象,而expireTime是逻辑过期时间。
即:我们可以使用 JSONUtil.toBean 将Shop对象通过序列化、反序列化到RedisData类的data属性中。
@Data
public class RedisData
// LocalDateTime : 同时含有年月日时分秒的日期对象
// 并且LocalDateTime是线程安全的!
private LocalDateTime expireTime;
private Object data;
(2)逻辑过期解决缓存击穿问题 queryWithLogicalExpire()
缓存重建
/**
* 重建缓存,先缓存预热一下,否则queryWithLogicalExpire() 的expire为null
* @param id
* @param expireSeconds
*/
public void saveShopRedis(Long id, Long expireSeconds)
// 1.查询店铺数据
Shop shop = getById(id);
// 2.封装逻辑过期时间
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds)); // 过期时间
// 3.写入redis
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
先使用测试方法运行一下saveShopRedis(),否则redis里面没有expireTime !
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
/**
* 逻辑过期解决缓存击穿问题 queryWithLogicalExpire()
* 测试前要先缓存预热一下!不然 data 与 expireTime 的缓存值是null!
* @param id
* @return
*/
public Shop queryWithLogicalExpire(Long id)
// 1.从redis查询商铺缓存
String key = CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(shopJson))
return null;
// 4.命中,需要将json反序列化为对象
// redisData没有数据
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if (expireTime.isAfter(LocalDateTime.now()))
// 5.1未过期,直接返回店铺信息
return shop;
// 5.2已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean islock = tryLock(lockKey);
// 6.2.判断是否获取互斥锁成功
if (islock)
// 6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit( () ->
try
// 重建缓存,过期时间为20L
saveShopRedis(id,20L);
catch (Exception ex)
throw new RuntimeException(ex);
finally
unLock(lockKey);
);
// 6.4.返回过期店铺信息
return shop;
我们可以看到在测试的时候,name的值为:“100XXXX”
我们现在来修改一下数据库,将值改为:“900XXXX”,看看并发情况下缓存重建能否正确!
通过Jmeter做压力测试
再查看Redis中的数据,可以看到name的值已经被修改了,而且上面的jmeter的每一个http都是正常的!
八、封装 Redis 工具类
基于 StringRedisTemplate 封装一个缓存工具类,满足下列需求:
- 方法 1:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置 TTL 过期时间
- 方法 2:将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置逻辑过期时间,用于处理缓
存击穿问题
- 方法 3:根据指定的 key 查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
- 方法 4:根据指定的 key 查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
将逻辑进行封装
@Slf4j
@Component
public class CacheClient
private final StringRedisTemplate stringRedisTemplate;
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public CacheClient(StringRedisTemplate stringRedisTemplate)
this.stringRedisTemplate = stringRedisTemplate;
public void set(String key, Object value, Long time, TimeUnit unit)
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit);
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit)
// 设置逻辑过期
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
// 写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
public <R,ID> R queryWithPassThrough(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit)
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(json))
// 3.存在,直接返回
return JSONUtil.toBean(json, type);
// 判断命中的是否是空值
if (json != null)
// 返回一个错误信息
return null;
// 4.不存在,根据id查询数据库
R r = dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null)
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
// 6.存在,写入redis
this.set(key, r, time, unit);
return r;
public <R, ID> R queryWithLogicalExpire(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit)
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isBlank(json))
// 3.存在,直接返回
return null;
// 4.命中,需要先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
// 5.判断是否过期
if(expireTime.isAfter(LocalDateTime.now()))
// 5.1.未过期,直接返回店铺信息
return r;
// 5.2.已过期,需要缓存重建
// 6.缓存重建
// 6.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2.判断是否获取锁成功
if (isLock)
// 6.3.成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() ->
try
// 查询数据库
R newR = dbFallback.apply(id);
// 重建缓存
this.setWithLogicalExpire(key, newR, time, unit);
catch (Exception e)
throw new RuntimeException(e);
finally
// 释放锁
unlock(lockKey);
);
// 6.4.返回过期的商铺信息
return r;
public <R, ID> R queryWithMutex(
String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit)
String key = keyPrefix + id;
// 1.从redis查询商铺缓存
String shopJson = stringRedisTemplate.opsForValue().get(key);
// 2.判断是否存在
if (StrUtil.isNotBlank(shopJson))
// 3.存在,直接返回
return JSONUtil.toBean(shopJson, type);
// 判断命中的是否是空值
if (shopJson != null)
// 返回一个错误信息
return null;
// 4.实现缓存重建
// 4.1.获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
R r = null;
try
boolean isLock = tryLock(lockKey);
// 4.2.判断是否获取成功
if (!isLock)
// 4.3.获取锁失败,休眠并重试
Thread.sleep(50);
return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit);
// 4.4.获取锁成功,根据id查询数据库
r = dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null)
// 将空值写入redis
stringRedisTemplate.opsForValue().set(key, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回错误信息
return null;
// 6.存在,写入redis
this.set(key, r, time, unit);
catch (InterruptedException e)
throw new RuntimeException(e);
finally
// 7.释放锁
unlock(lockKey);
// 8.返回
return r;
private boolean tryLock(String key)
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
private void unlock(String key)
stringRedisTemplate.delete(key);
以上是关于缓存击穿、穿透、雪崩及Redis分布式锁的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot整合Redis以及缓存穿透缓存雪崩缓存击穿的理解如何添加锁解决缓存击穿问题?分布式情况下如何添加分布式锁
SpringBoot整合Redis以及缓存穿透缓存雪崩缓存击穿的理解分布式情况下如何添加分布式锁 续篇
Redis05(缓存穿透,缓存击穿,缓存雪崩,分布式锁ACLIO多线程支持Cluster)
Redis个人笔记:Redis应用场景,Redis常见命令,Reids缓存击穿穿透,Redis分布式锁实现方案,秒杀设计思路,Redis消息队列,Reids持久化,Redis主从哨兵分片集群