Redis
Posted 哏都程序猿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis相关的知识,希望对你有一定的参考价值。
Redis(二)
分布式锁
背景
-
就是保证同一时间只有一个客户端可以对共享资源进行操作
-
案例:优惠券领劵限制张数、商品库存超卖
-
核心
-
为了防止分布式系统中的多个进程之间相互干扰,我们需要一种分布式协调技术来对这些进程进行调度 -
利用互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题 -
避免共享资源并发操作导致数据问题
-
本地锁:synchronize、lock等,锁在当前进程内,集群部署下依旧存在问题 -
分布式锁:redis、zookeeper等实现,虽然还是锁,但是多个进程共用的锁标记,可以用Redis、Zookeeper、mysql等都可以 -
加锁 -
设计分布式锁应该考虑的东西
-
分布式锁一定能得到释放,比如客户端奔溃或者网络中断 -
在分布式应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行 -
排他性 -
容错性 -
满足可重入、高性能、高可用 -
注意分布式锁的开销、锁粒度 -
实现分布式锁 可以用 Redis、Zookeeper、Mysql数据库这几种 , 性能最好的是Redis且是最容易理解
-
分布式锁离不开 key - value 设置
key 是锁的唯一标识,一般按业务来决定命名,比如想要给一种优惠券活动加锁,key 命名为 “coupon:id” 。value就可以使用固定值,比如设置成1
基于redis实现分布式锁,文档:http://www.redis.cn/commands.html#string
-
加锁 SETNX key value
setnx 的含义就是 SET if Not Exists,有两个参数 setnx(key, value),该方法是原子性操作
如果 key 不存在,则设置当前 key 成功,返回 1;
如果当前 key 已经存在,则设置当前 key 失败,返回 0
-
解锁 del (key)
得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入,调用 del(key)
-
配置锁超时 expire (key,30s)
客户端奔溃或者网络中断,资源将会永远被锁住,即死锁,因此需要给key配置过期时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放
methodA(){
String key = "coupon_66"
if(setnx(key,1) == 1){
expire(key,30,TimeUnit.MILLISECONDS)
try {
//做对应的业务逻辑
//查询用户是否已经领券
//如果没有则扣减库存
//新增领劵记录
} finally {
del(key)
}
}else{
//睡眠100毫秒,然后自旋调用本方法
methodA()
}
}
-
多个命令之间不是原子性操作,如 setnx
和expire
之间,如果setnx
成功,但是expire
失败,且宕机了,则这个资源就是死锁
使用原子命令:设置和配置过期时间 setnx / setex
如: set key 1 ex 30 nx
java里面 redisTemplate.opsForValue().setIfAbsent("seckill_1","success",30,TimeUnit.MILLISECONDS)
-
业务超时,存在其他线程勿删,key 30秒过期,假如线程A执行很慢超过30秒,则key就被释放了,其他线程B就得到了锁,这个时候线程A执行完成,而B还没执行完成,结果就是线程A删除了线程B加的锁
可以在 del 释放锁之前做一个判断,验证当前的锁是不是自己加的锁, 那 value 应该是存当前线程的标识或者uuid
String key = "coupon_66"
String value = Thread.currentThread().getId()
if(setnx(key,value) == 1){
expire(key,30,TimeUnit.MILLISECONDS)
try {
//做对应的业务逻辑
} finally {
//删除锁,判断是否是当前线程加的
if(get(key).equals(value)){
//还存在时间间隔
del(key)
}
}
}else{
//睡眠100毫秒,然后自旋调用本方法
}
-
进一步细化误删
-
当线程A获取到正常值时,返回带代码中判断期间锁过期了,线程B刚好重新设置了新值,线程A那边有判断value是自己的标识,然后调用del方法,结果就是删除了新设置的线程B的值 -
核心还是判断和删除命令 不是原子性操作导致 -
总结
-
加锁+配置过期时间:保证原子性操作 -
解锁: 防止误删除、也要保证原子性操作 -
核心是保证多个指令原子性,加锁使用setnx setex 可以保证原子性,那解锁使用 判断和删除怎么保证原子性
-
文档:http://www.redis.cn/commands/set.html
-
多个命令的原子性:采用 lua脚本+redis, 由于【判断和删除】是lua脚本执行,所以要么全成功,要么全失败
//获取lock的值和传递的值一样,调用删除操作返回1,否则返回0
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
//Arrays.asList(lockKey)是key列表,uuid是参数
Integer result = redisTemplate.execute(new DefaultRedisScript<>(script, Integer.class), Arrays.asList(lockKey), uuid);
/**
* 原生分布式锁 开始
* 1、原子加锁 设置过期时间,防止宕机死锁
* 2、原子解锁:需要判断是不是自己的锁
*/
@RestController
@RequestMapping("/api/v1/coupon")
public class CouponController {
@Autowired
private StringRedisTemplate redisTemplate;
@GetMapping("add")
public JsonData saveCoupon(@RequestParam(value = "coupon_id",required = true) int couponId){
//防止其他线程误删
String uuid = UUID.randomUUID().toString();
String lockKey = "lock:coupon:"+couponId;
lock(couponId,uuid,lockKey);
return JsonData.buildSuccess();
}
private void lock(int couponId,String uuid,String lockKey){
//lua脚本
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Boolean nativeLock = redisTemplate.opsForValue().setIfAbsent(lockKey,uuid,Duration.ofSeconds(30));
System.out.println(uuid+"加锁状态:"+nativeLock);
if(nativeLock){
//加锁成功
try{
//TODO 做相关业务逻辑
TimeUnit.SECONDS.sleep(10L);
} catch (InterruptedException e) {
} finally {
//解锁
Long result = redisTemplate.execute( new DefaultRedisScript<>(script,Long.class),Arrays.asList(lockKey),uuid);
System.out.println("解锁状态:"+result);
}
}else {
//自旋操作
try {
System.out.println("加锁失败,睡眠5秒 进行自旋");
TimeUnit.MILLISECONDS.sleep(5000);
} catch (InterruptedException e) { }
//睡眠一会再尝试获取锁
lock(couponId,uuid,lockKey);
}
}
}
原生代码+redis实现分布式锁使用比较复杂,且有些锁续期问题更难处理
-
延伸出框架 官方推荐方式:https://redis.io/topics/distlock
最佳实战
List数据结构设计
在线教育-天热销视频榜单实战-List数据结构设计
-
需求 -
需要一个视频学习榜单,每天更新一次 -
需要支持人工运营替换榜单位置 -
企业中流程 -
定时任务计算昨天最多人学习的视频 -
晚上12点到1点更新到榜单上 -
预留一个接口,支持人工运营 -
类似场景 -
京东:热销手机榜单、电脑榜单等 -
百度:搜索热榜 -
疑惑:为啥不是实时计算,真正高并发下项目,都是预先计算好结果,然后直接返回数据,且存储结构最简单
@RequestMapping("rank")
public JsonData videoRank(){
List<VideoDO> list = redisTemplate.opsForList().range(RANK_KEY,0,-1);
return JsonData.buildSuccess(list);
}
@Test
void saveRank(){
String RANK_KEY = "rank:video";
VideoDO video1 = new VideoDO(3,"微服务","xxxx",1099);
VideoDO video2 = new VideoDO(5,"AlibabaCloud","xxx",59);
VideoDO video3 = new VideoDO(53,"SpringBoot","xxx",49);
VideoDO video4 = new VideoDO(15,"设计模式","xxx",49);
VideoDO video5 = new VideoDO(45,"nginx","xxxx",89);
redisTemplate.opsForList().leftPushAll(RANK_KEY,video4,video5,video3,video2,video1);
}
-
人工运营操作榜单
/**
* 替换榜单第二名
*/
@Test
void replaceRank(){
String RANK_KEY = "rank:video";
VideoDO video = new VideoDO(42,"工程师","xxx",89);
//在集合的指定位置插入元素,如果指定位置已有元素,则覆盖,没有则新增
redisTemplate.opsForList().set(RANK_KEY,1,video);
}
Hash数据结构
-
背景
-
电商购物车实现,支持买多件商品,每个商品可以买不同数量 -
支持高性能处理 -
购物车常见实现方式
-
可以开启AOF持久化防止重启丢失(推荐) -
localstorage在浏览器中存储 key/value 对,没有过期时间。 -
sessionstorage在浏览器中存储 key/value 对,在关闭会话窗口后将会删除这些数据。 -
性能存在瓶颈 -
实现方式一:存储到数据库 -
实现方式二:前端本地存储-localstorage-sessionstorage -
实现方式三:后端存储到缓存如redis -
购物车数据结构介绍
-
Map<String,Map<String,String>> -
第一层Map,Key是用户id -
第二层Map,Key是购物车中商品id,值是购物车数据 -
一个购物车里面,存在多个购物项 -
所以 购物车结构是一个双层Map: -
对应redis里面的存储
-
redis里面有多种数据结构,应该使用哪种? -
答案是 hash结构 -
购物车常见实现方式
-
可以开启AOF持久化防止重启丢失(推荐) -
localstorage在浏览器中存储 key/value 对,没有过期时间。 -
sessionstorage在浏览器中存储 key/value 对,在关闭会话窗口后将会删除这些数据。 -
性能存在瓶颈 -
实现方式一:存储到数据库 -
实现方式二:前端本地存储-localstorage-sessionstorage -
实现方式三:后端存储到缓存如redis -
购物车数据结构介绍
-
Map<String,Map<String,String>> -
第一层Map,Key是用户id -
第二层Map,Key是购物车中商品id,值是购物车数据 -
一个购物车里面,存在多个购物项 -
所以 购物车结构是一个双层Map: -
对应redis里面的存储
-
redis里面有多种数据结构,应该使用哪种? -
答案是 hash结构 -
购物车常见实现方式
-
可以开启AOF持久化防止重启丢失(推荐) -
localstorage在浏览器中存储 key/value 对,没有过期时间。 -
sessionstorage在浏览器中存储 key/value 对,在关闭会话窗口后将会删除这些数据。 -
性能存在瓶颈 -
实现方式一:存储到数据库 -
实现方式二:前端本地存储-localstorage-sessionstorage -
实现方式三:后端存储到缓存如redis -
购物车数据结构介绍
-
Map<String,Map<String,String>> -
第一层Map,Key是用户id -
第二层Map,Key是购物车中商品id,值是购物车数据 -
一个购物车里面,存在多个购物项 -
所以 购物车结构是一个双层Map: -
对应redis里面的存储
-
redis里面有多种数据结构,应该使用哪种? -
答案是 hash结构 -
json工具类
public class JsonUtil {
// 定义jackson对象
private static final ObjectMapper MAPPER = new ObjectMapper();
/**
* 将对象转换成json字符串。
* @return
*/
public static String objectToJson(Object data) {
try {
String string = MAPPER.writeValueAsString(data);
return string;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 将json结果集转化为对象
*
* @param jsonData json数据
* @param clazz 对象中的object类型
* @return
*/
public static <T> T jsonToPojo(String jsonData, Class<T> beanType) {
try {
T t = MAPPER.readValue(jsonData, beanType);
return t;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
-
添加购物车接口
@RequestMapping("addCart")
public JsonData addCart(int videoId,int buyNum){
//获取购物车
BoundHashOperations<String, Object, Object> myCart = getMyCartOps();
Object cacheObj = myCart.get(videoId+"");
String result = "";
if (cacheObj != null) {
result = (String) cacheObj;
}
if (cacheObj == null) {
//不存在则新建一个购物项
CartItemVO cartItem = new CartItemVO();
//从数据库查询详情,我们这边直接随机写个
VideoDO videoDO = videoDao.findDetailById(videoId);
videoDO.setId(videoId);
cartItem.setPrice(videoDO.getPrice());
cartItem.setBuyNum(buyNum);
cartItem.setProductId(videoId);
cartItem.setProductImg(videoDO.getImg());
cartItem.setProductTitle(videoDO.getTitle());
myCart.put(videoId+"", JsonUtil.objectToJson(cartItem));
} else {
//存在则新增数量
CartItemVO cartItem = JsonUtil.jsonToPojo(result, CartItemVO.class);
cartItem.setBuyNum(cartItem.getBuyNum() + buyNum);
myCart.put(videoId+"", JsonUtil.objectToJson(cartItem));
}
return JsonData.buildSuccess();
}
-
购物车方法抽取
/**
* 抽取我的购物车通用方法
*
* @return
*/
private BoundHashOperations<String, Object, Object> getMyCartOps() {
String cartKey = getCartKey();
return redisTemplate.boundHashOps(cartKey);
}
/**
* 获取购物车的key
*
* @return
*/
private String getCartKey() {
//从拦截器获取 ,这里写死即可,每个用户不一样
int userId = 88;
String cartKey = String.format("product:cart:%s", userId);
return cartKey;
}
-
查看我的购物车
@GetMapping("/mycart")
public JsonData findMyCart(){
BoundHashOperations<String,Object,Object> myCart = getMyCartOps();
List<Object> itemList = myCart.values();
List<CartItemVO> cartItemVOList = new ArrayList<>();
for(Object item: itemList){
CartItemVO cartItemVO = JsonUtil.jsonToPojo((String)item,CartItemVO.class);
cartItemVOList.add(cartItemVO);
}
//封装成cartvo
CartVO cartVO = new CartVO();
cartVO.setCartItems(cartItemVOList);
return JsonData.buildSuccess(cartVO);
}
-
清空购物车
@GetMapping("/clear")
public JsonData clear() {
String cartKey = getCartKey();
redisTemplate.delete(cartKey);
return JsonData.buildSuccess();
}
Set集合数据结构
简介:案例实战需求之大数据下的用户画像标签去重
-
介绍
-
用户画像 英文为User Profile,是根据用户基本属性、社会属性、行为属性、心理属性等真实信息而抽象出的一个标签化的、虚拟的用户模型。“用户画像”的实质是对 “人”的数字化。 -
应用场景有很多,比如个性化推荐、精准营销、金融风控、精细化运营等等, 举个例子来理解用户画像的实际应用价值,我们经常用手机网购,淘宝里面的千人千面 -
通过“标签 tag”来对用户的多维度特征进行提炼和标识,那每个人的用户画像就需要存储,set集合就适合去重 -
用户画像不止针对某个人,也可以某一人群或行业的画像 -
利用redis可以很好的去重 -
案例
BoundSetOperations operations = redisTemplate.boundSetOps("user:tags:1");
operations.add("car","student","rich","guangdong","dog","rich");
Set<String> set1 = operations.members();
System.out.println(set1);
operations.remove("dog");
Set<String> set2 = operations.members();
System.out.println(set2);
return JsonData.buildSuccess();
-
背景 -
社交应用里面的知识,关注、粉丝、共同好友案例
public void testSet(){
BoundSetOperations operationLW = redisTemplate.boundSetOps("user:lw");
operationLW.add("A","B","C","D","E");
System.out.println("老王的粉丝:"+operationLW.members());
BoundSetOperations operationXD = redisTemplate.boundSetOps("user:xd");
operationXD.add("A","B","F","G","H","J");
System.out.println("小D的粉丝:"+operationXD.members());
//差集
Set lwSet = operationLW.diff("user:xd");
System.out.println("老王的优势:"+lwSet);
//差集
Set xdSet = operationXD.diff("user:lw");
System.out.println("小滴的优势:"+xdSet);
//交集
Set interSet = operationLW.intersect("user:xd");
System.out.println("共同好友:"+interSet);
//并集
Set unionSet = operationLW.union("user:xd");
System.out.println("两个人的并集:"+unionSet);
//用户A是否是 老王 的粉丝
boolean flag = operationLW.isMember("A");
System.out.println(flag);
}
SortedSet开发用户积分
-
背景 -
用户玩游戏-积分实时榜单 -
IT视频热销实时榜单 -
电商商品热销实时榜单 -
一般的排行榜读多写少,可以对 master 进行写入操作,然后多个 slave 进行读取操作。 -
如果是对象记得重写HashCode与Equals方法
/**
* 非实时榜单
*
* @return
*/
@RequestMapping("daily_rank")
public JsonData videoDailyRank() {
List<VideoDO> list = redisTemplate.opsForList().range(DAILY_RANK_KEY, 0, -1);
return JsonData.buildSuccess(list);
}
/**
* 返回全部榜单,从大到小
* @return
*/
@RequestMapping("real_rank1")
public JsonData realRank1() {
BoundZSetOperations<String, UserPointVO> operations = redisTemplate.boundZSetOps("point:rank:real");
Set<UserPointVO> set = operations.reverseRange(0, -1);
return JsonData.buildSuccess(set);
}
/**
* 返回全部榜单,从小到大
* @return
*/
@RequestMapping("real_rank2")
public JsonData realRank2() {
BoundZSetOperations<String, UserPointVO> operations = redisTemplate.boundZSetOps("point:rank:real");
Set<UserPointVO> set = operations.range(0, -1);
return JsonData.buildSuccess(set);
}
/**
* 返回全部榜单,从大到小,指定长度
* @return
*/
@RequestMapping("real_rank3")
public JsonData realRank3() {
BoundZSetOperations<String, UserPointVO> operations = redisTemplate.boundZSetOps("point:rank:real");
Set<UserPointVO> set = operations.reverseRange(0, 3);
return JsonData.buildSuccess(set);
}
/**
* 查看某个用户的排名
* @param phone
* @param name
* @return
*/
@RequestMapping("find_myrank")
public JsonData realMyRank(String phone,String name) {
BoundZSetOperations<String, UserPointVO> operations = redisTemplate.boundZSetOps("point:rank:real");
UserPointVO userPointVO = new UserPointVO(name,phone);
long rank = operations.reverseRank(userPointVO);
return JsonData.buildSuccess(++rank);
}
/**
* 加积分
* @param phone
* @param name
* @return
*/
@RequestMapping("uprank")
public JsonData uprank(String phone,String name,int point) {
BoundZSetOperations<String, UserPointVO> operations = redisTemplate.boundZSetOps("point:rank:real");
UserPointVO userPointVO = new UserPointVO(name,phone);
operations.incrementScore(userPointVO,point);
Set<UserPointVO> set = operations.range(0, -1);
return JsonData.buildSuccess(set);
}
/**
* 查看个人的积分
* @param phone
* @param name
* @return
*/
@RequestMapping("mypoint")
public JsonData mypoint(String phone,String name) {
BoundZSetOperations<String, UserPointVO> operations = redisTemplate.boundZSetOps("point:rank:real");
UserPointVO userPointVO = new UserPointVO(name,phone);
double score = operations.score(userPointVO);
return JsonData.buildSuccess(score);
}
SpringCache
-
SpringCache简介 -
一个是Cache接口,缓存操作的API; -
一个是CacheManager管理各类缓存,有多个缓存框架的实现 -
文档:https://spring.io/guides/gs/caching/ -
自Spring 3.1起,提供了类似于@Transactional注解事务的注解Cache支持,且提供了Cache抽象 -
提供基本的Cache抽象,方便切换各种底层Cache -
只需要更少的代码就可以完成业务数据的缓存 -
提供事务回滚时也自动回滚缓存,支持比较复杂的缓存逻辑 -
核心
使用
-
项目中引入starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
-
配置文件指定缓存类型
spring:
cache:
type: redis
-
启动类开启缓存注解
@EnableCaching
-
Cacheable注解
-
标记在一个方法上,也可以标记在一个类上 -
缓存标注对象的返回结果,标注在方法上缓存该方法的返回值,标注在类上缓存该类所有的方法返回值 -
value 缓存名称,可以有多个 -
key 缓存的key规则,可以用springEL表达式,默认是方法参数组合 -
condition 缓存条件,使用springEL编写,返回true才缓存 -
案例
//对象
@Cacheable(value = {"product"}, key="#root.methodName")
//分页
@Cacheable(value = {"product_page"},key="#root.methodName + #page+'_'+#size")
-
spEL表达式 -
result -
root.args[0] -
root.methodname -
methodName 当前被调用的方法名 -
args 当前被调用的方法的参数列表 -
result 方法执行后的返回值
SpringCache框架自定义CacheManager配置和过期时间
-
修改redis缓存序列化器和配置manager过期时间
@Bean
@Primary
public RedisCacheManager cacheManager1Hour(RedisConnectionFactory connectionFactory) {
RedisCacheConfiguration config = instanceConfig(3600L);
return RedisCacheManager.builder(connectionFactory)
.cacheDefaults(config)
.transactionAware()
.build();
}
@Bean
public RedisCacheManager cacheManager1Day(RedisConnectionFactory connectionFactory) {
RedisCacheConfiguration config = instanceConfig(3600 * 24L);
return RedisCacheManager.builder(connectionFactory)
.cacheDefaults(config)
.transactionAware()
.build();
}
private RedisCacheConfiguration instanceConfig(Long ttl) {
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());
// 去掉各种@JsonSerialize注解的解析
objectMapper.configure(MapperFeature.USE_ANNOTATIONS, false);
// 只针对非空的值进行序列化
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
// 将类型序列化到属性json字符串中
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance ,
ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
return RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(ttl))
.disableCachingNullValues()
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer));
}
SpringCache框架自定义缓存KeyGenerator
-
Key规则定义麻烦,支持自定规则
@Bean
public KeyGenerator springCacheDefaultKeyGenerator(){
return new KeyGenerator() {
@Override
public Object generate(Object o, Method method, Object... objects) {
return o.getClass().getSimpleName() + "_"
+ method.getName() + "_"
+ StringUtils.arrayToDelimitedString(objects, "_");
}
};
}
-
使用 -
key 属性和keyGenerator属性只能二选一
@Cacheable(value = {"product"},keyGenerator = "springCacheCustomKeyGenerator", cacheManager = "cacheManager1Minute")
@CachePut
-
CachePut -
根据方法的请求参数对其结果进行缓存,每次都会触发真实方法的调用 -
value 缓存名称,可以有多个 -
key 缓存的key规则,可以用springEL表达式,默认是方法参数组合 -
condition 缓存条件,使用springEL编写,返回true才缓存
@CachePut(value = {"product"},key = "#productDO.id")
@CacheEvict
-
CacheEvict -
代表清除缓存操作是在方法运行之前执行,无论方法是否出现异常,缓存都清除 -
缓存的清除是否在方法之前执行 ,默认代表缓存清除操作是在方法执行之后执行; -
如果出现异常缓存就不会清除 -
从缓存中移除相应数据, 触发缓存删除的操作 -
value 缓存名称,可以有多个 -
key 缓存的key规则,可以用springEL表达式,默认是方法参数组合 -
beforeInvocation = false -
beforeInvocation = true
@CacheEvict(value = {"product"},key = "#root.args[0]")
@Caching
-
Caching -
组合多个Cache注解使用 -
允许在同一方法上使用多个嵌套的@Cacheable、@CachePut和@CacheEvict注释
@Caching(
cacheable = {
@Cacheable(value = "product",keyGenerator = "xdclassKeyGenerator")
},
put = {
@CachePut(value = "product",key = "#id"),
@CachePut(value = "product",key = "'stock:'+#id")
}
)
以上是关于Redis的主要内容,如果未能解决你的问题,请参考以下文章