SpringBoot整合Redis以及缓存穿透缓存雪崩缓存击穿的理解分布式情况下如何添加分布式锁 续篇
Posted Mr.Aholic
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合Redis以及缓存穿透缓存雪崩缓存击穿的理解分布式情况下如何添加分布式锁 续篇相关的知识,希望对你有一定的参考价值。
文章目录
前言
上一篇实现了单体应用下如何上锁,这一篇主要说明如何在分布式场景下上锁
上一篇地址:加锁
1、分布式情况下如何加锁
需要注意的点是: 在上锁和释放锁的过程中要保证原子性操作
2、具体实现过程
核心是上锁和解锁的过程
关于解锁使用脚本参考:SET key value [EX seconds] [PX milliseconds] [NX|XX]
//上锁过程
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
//解锁过程、需要 调用脚本
String script = "if redis.call(\\"get\\",KEYS[1]) == ARGV[1] then return redis.call(\\"del\\",KEYS[1]) else return 0 end";
Long lock1 = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
public Map<String, List<Catalog2Vo>> getCatalogJsonDbWithSpringCache()
//占分布式锁.redis中占坑
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
Map<String, List<Catalog2Vo>> dataFromDb;
if (lock)
System.out.println("加锁成功......");
try
//加锁成功...执行业务
dataFromDb = getCategoriesDb();
finally
//删除锁
String script = "if redis.call(\\"get\\",KEYS[1]) == ARGV[1] then return redis.call(\\"del\\",KEYS[1]) else return 0 end";
Long lock1 = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
return dataFromDb;
else
//加锁失败...重试.synchronized 休眠100ms重试
System.out.println("加锁失败......");
try
Thread.sleep(200);
catch (Exception e)
//自旋方式
return getCatalogJsonDbWithSpringCache();
3、测试
3.1 一个服务按照多个端口同时启动
模拟分布式情况、将一个服务按照多个端口同时启动
具体过程
-
1 首先,点击修改运行配置
-
2 将你的项目配置的右上角的Allowl parallel run勾上(允许多启动)
-
3 将你的项目配置复制一份重启个名字,添加上
-Dserver.port=端口号
- 4 启动项目
3.2 使用jmeter进行压测
请求的基本配置
测试情况
模拟的基本前提: redis中没有缓存数据
上锁成功的情况下、 三个服务中只会出现一次查询数据库、其余接口请求从redis中拿取数据.
下方是测试截图、符合预期情况 、上锁成功
redis中缓存的数据
Redis缓存穿透,缓存击穿,缓存雪崩解决方案以及封装Redis工具类
参考自黑马程序员
缓存穿透
缓存穿透: 数据在数据库和redis中都不存在,请求直接打在数据库中。
解决方案:
方案一:
方案二:
代码案例实现:
// 缓存穿透
private Shop cacheBreakDown(Long id)
// 查询redis
String jsonShop = (String) redisTemplate.opsForValue().get(RedisConstants.SHOP_CACHE + id);
// 既不是""也不是null,命中直接返回
if (StrUtil.isNotBlank(jsonShop))
return JSONUtil.toBean(jsonShop, Shop.class);
// 判断是否命中空值"",命中直接返回
if (jsonShop != null)
return null;
// 查询数据库
Shop shop = this.getById(id);
if (shop == null)
// 将空值""写入redis,防止缓存穿透
redisTemplate.opsForValue().set(RedisConstants.SHOP_CACHE + id, "", RedisConstants.SHOP_CACHE_TTL, TimeUnit.SECONDS);
return null;
jsonShop=JSONUtil.toJsonStr(shop);
redisTemplate.opsForValue().set(RedisConstants.SHOP_CACHE + id, jsonShop, RedisConstants.SHOP_CACHE_TTL, TimeUnit.MINUTES);
return shop;
缓存击穿
互斥锁时序图
代码实现:
// 加锁
private boolean tryLock(String key)
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofMillis(10000));
return BooleanUtil.isTrue(ifAbsent);
// 解锁
private void unLock(String key)
redisTemplate.delete(key);
// 缓存击穿
private Shop cachePenetrate(long id)
// 查询redis
String jsonShop = (String) redisTemplate.opsForValue().get(RedisConstants.SHOP_CACHE + id);
if (StrUtil.isNotBlank(jsonShop))
// 直接返回
return JSONUtil.toBean(jsonShop, Shop.class);
// 判断是否命中空值 ""
if (jsonShop != null)
return null;
Shop shop = null;
try
// 实现缓存重建
if (!tryLock(RedisConstants.LOCK_KEY+id))
// 未获得锁,休眠重试
Thread.sleep(100);
return cachePenetrate(id);
// 查询数据库
shop = this.getById(id);
if (shop == null)
// 将空值写入redis,防止缓存穿透
redisTemplate.opsForValue().set(RedisConstants.SHOP_CACHE + id, "", RedisConstants.SHOP_CACHE_TTL, TimeUnit.SECONDS);
return null;
jsonShop=JSONUtil.toJsonStr(shop);
redisTemplate.opsForValue().set(RedisConstants.SHOP_CACHE + id, jsonShop, RedisConstants.SHOP_CACHE_TTL, TimeUnit.MINUTES);
catch (InterruptedException e)
throw new RuntimeException("被打断");
finally
// 释放锁
unLock(RedisConstants.LOCK_KEY+id);
return shop;
逻辑过期时序图
代码实现:
@Data
public class RedisData
// 逻辑过期时间
private LocalDateTime expireTime;
// 要存入redis的数据
private Object data;
// 重建的线程池
private ThreadPoolExecutor cacheThreadPool=new ThreadPoolExecutor(
5,10,1,TimeUnit.MINUTES,new ArrayBlockingQueue<>(10));
private Shop cacheLogicExpire(long id)
// 查询redis
String jsonShop = (String) redisTemplate.opsForValue().get(RedisConstants.SHOP_CACHE + id);
// 没有命中直接返回null
if (StrUtil.isBlank(jsonShop))
return null;
// 反序列化
RedisData redisData= JSONUtil.toBean(jsonShop, RedisData.class);
JSONObject jsonObject= (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(jsonObject, Shop.class);
// 命中了判断过期时间是否过期,时间没有过期直接返回数据
if (redisData.getExpireTime().isAfter(LocalDateTime.now()))
return shop;
// 过期了尝试获得锁
if (tryLock(RedisConstants.LOCK_KEY+id))
// 开启线程缓存重建
cacheThreadPool.execute(()->
try
saveData2Redis(id,1800);
catch (Exception e)
throw new RuntimeException(e);
finally
// 释放锁
unLock(RedisConstants.LOCK_KEY+id);
);
// 返回过期数据
return shop;
public void saveData2Redis(long id,long time)
Shop shop = getById(id);
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(time));
redisTemplate.opsForValue().set(RedisConstants.SHOP_CACHE,JSONUtil.toJsonStr(redisData));
缓存雪崩
封装工具类
方法1:
/**
*
* @param key key
* @param value value
* @param expireTime key过期时间
* @param unit 时间单位
*/
public void set(String key, Object value, long expireTime, TimeUnit unit)
redisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(value),expireTime,unit);
方法2:
/**
*
* @param key key
* @param value value
* @param expireTime key过期时间
* @param unit 时间单位
*/
public void setWithLogicalTime(String key, Object value, long expireTime, TimeUnit unit)
RedisData redisData = new RedisData();
long seconds = unit.toSeconds(expireTime);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(seconds));
redisData.setData(value);
redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
方法3:
/**
*
* @param keyPrefix 要查询数据的key前缀
* @param id 要查询的数据id
* @param type 数据类型
* @param dbOpe 查询数据方法
* @param expireTime 过期时间
* @param unit 时间单位
* @param <R> 返回的数据类型
* @param <ID> 要查询数据的ID类型
* @return 要查询的数据
*/
public <R,ID> R getCache(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbOpe,long expireTime,TimeUnit unit)
String key=keyPrefix+id;
// 先查询Redis
String value = (String) redisTemplate.opsForValue().get(key);
// 不为空且不为""直接返回
if (StrUtil.isNotBlank(value))
return JSONUtil.toBean(value,type);
// 如果为""不为null返回null
if (value!=null)
return null;
// 根据id查询数据库
R result = dbOpe.apply(id);
// 数据库也为空
if (result == null)
// 缓存""防止缓存穿透
redisTemplate.opsForValue().set(key,"",expireTime,unit);
return null;
// 不为空缓存重建
redisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(result),expireTime,unit);
return result;
方法4:
/**
*
* @param keyPrefix 要查询数据的key前缀
* @param id 要查询的数据id
* @param type 数据类型
* @param dbOpe 查询数据方法
* @param expireTime 过期时间
* @param unit 时间单位
* @param <R> 返回的数据类型
* @param <ID> 要查询数据的ID类型
* @return 要查询的数据
*/
public <R,ID> R getCacheWithLogicExpire(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbOpe, long expireTime, TimeUnit unit)
String key=keyPrefix+id;
// 查询redis
String value = (String) redisTemplate.opsForValue().get(key);
// 为空返回null
if (StrUtil.isBlank(value))
return null;
// 反序列化为RedisData
RedisData redisData = JSONUtil.toBean(value, RedisData.class);
R result = JSONUtil.toBean((JSONObject) redisData.getData(), type);
// 判断是否过期,没有过期直接返回
if (LocalDateTime.now().isAfter(redisData.getExpireTime()))
return result;
// 过期了尝试获得锁
if (tryLock(key))
// 开启线程缓存重建
cacheThreadPool.execute(()->
try
R r = dbOpe.apply(id);
RedisData redis_data = new RedisData();
redis_data.setData(r);
redis_data.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(expireTime)));
redisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redis_data));
catch (Exception e)
throw new RuntimeException(e);
finally
// 释放锁
unLock(RedisConstants.LOCK_KEY+id);
);
return result;
以上是关于SpringBoot整合Redis以及缓存穿透缓存雪崩缓存击穿的理解分布式情况下如何添加分布式锁 续篇的主要内容,如果未能解决你的问题,请参考以下文章
Redis分片主从哨兵集群,原理详解,集群的配置安装,8大数据类型,springboot整合使用
Redis分片主从哨兵集群,原理详解,集群的配置安装,8大数据类型,springboot整合使用
Redis缓存穿透,缓存击穿,缓存雪崩解决方案以及封装Redis工具类