Redis7高级之缓存双写一致性之更新策略探讨
Posted 晓风残月Lx
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis7高级之缓存双写一致性之更新策略探讨相关的知识,希望对你有一定的参考价值。
1.缓存双写一致性
-
如果redis中有数据
- 需要和数据库中的值相同
-
如果redis中无数据
- 数据库中的值是最新值,且准备回写redis
-
缓存按照操作分
- 只读缓存
- 读写缓存
- 同步直写策略
- 写数据库后也同步写 redis 缓存,缓存中的数据和数据中的一致
- 对于读写缓存来说,要想保证缓存和数据库中的数据一致
- 异步缓写策略
- 正常业务运行中,mysql数据变动了,但是可以在业务上容许出现一定时间后才作用于redis,比如仓库、物流系统
- 异常情况出现了,不得不讲失败的动作重新修补,有可能需要借助kafka或者RabbitMQ等消息中间件,实现重写重试
- 同步直写策略
-
-
采用双检加锁策略
- 多个线程同时去查询数据库的这条数据,就在第一个查询数据的请求上使用一个互斥锁来锁住他。
- 其他线程获取不到锁就一直等待,等第一个线程查询到了数据,然后做了缓存
- 后面的线程进来发现已经有了缓存,就直接走缓存
package com.lv.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.lv.User; import com.lv.mapper.UserMapper; import com.lv.service.UserService; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; /** * @author 晓风残月Lx * @date 2023/3/27 12:39 */ @Slf4j @Service public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements UserService public static final String CACHE_KEY_USER = "user:"; @Resource private UserMapper userMapper; @Resource private RedisTemplate redisTemplate; /** * 业务逻辑没有写错,对于小厂中厂(QPS《=1000)可以使用,但是大厂不行 * @param id * @return */ public User findUserById1(Long id) User user = null; String key = CACHE_KEY_USER + id; // 1.先从redis中查询,如果有直接返回结果,没有再去查询 mysql user = (User) redisTemplate.opsForValue().get(key); if (user == null) // 2. redis中没有,查询mysql user = userMapper.selectById(id); if (user == null) // 3.1 redis + mysql 都无数据 // 具体细化,防止多次穿透,业务规定,记录下导致穿透的这个key回写redis return user; else // 3.2 mysql有,需要回写到redis,保证下一次的缓存命中率 redisTemplate.opsForValue().set(key,user); return user; /** * 加强补充,避免突然key失效了,打爆mysql,做一下预防,尽量不出现击穿的情况 * @param id * @return */ public User findUserById2(Long id) User user = null; String key = CACHE_KEY_USER + id; // 1.先从redis里面查询,如果有直接返回结果,如果没有再去查询mysql // 第一次查询redis,加锁前 user = (User) redisTemplate.opsForValue().get(key); if (user == null) // 2.对于高QPS的优化,进来就先加锁,保证一个请求操作,让外面的redis等待一下,避免击穿mysql synchronized (UserServiceImpl.class) // 第二次查询redis,加锁后 user = (User) redisTemplate.opsForValue().get(key); // 3. 二次查redis还是null,可以去查mysql了(mysql默认有数据) if (user == null) //4 查询mysql拿数据(mysql默认有数据) user = userMapper.selectById(id); if (user == null) return null; else // 5. mysql里面有数据的,需要回写redis,完成数据一致性的同步工作 redisTemplate.opsForValue().setIfAbsent(key, user, 7L, TimeUnit.DAYS); return user;
-
2.数据库和缓存一致性的几种更新策略
目的
- 达到最终一致性
- 给缓存设置过期时间,定期清理缓存并回写,是保证最终一致性的解决方案。
- 我们可以对存入缓存的数据设置过期时间,所有的写操作以数据库为准,对缓存操作只是尽最大努力即可。也就是说如果数据库写成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存,达到一致性,切记,要以mysql的数据库写入库为准。
四种更新策略
可停机的情况
基本上怎么处理都可以
- 挂牌报错
- 凌晨升级
- 服务降级
- 温馨提示
- 最好单线程操作(对于重量级的数据操作)
不可停机的情况(推荐最后一种,看场景)
-
先更新数据库,在更新缓存
-
异常问题1
-
异常问题2
-
-
先更新缓存,再更新数据库
-
一般业务会将mysq作为底单数据库,有最终解释权
-
异常问题
-
-
先删除缓存,再更新数据库
-
异常问题
-
解决方案(延时双删策略)
- 注意关键点,我更新完数据库的时间 + sleep的时间 大于 读取数据并写入换的时间 即可(多个100ms即可)
-
关于延时双删的细节问题
-
这个线程休眠时间(线程A sleep的时间,就需要大于线程B读取数据再写入缓存的时间)
- 第一种 :在业务程序运行的时候,统计下线程读数据和写缓存的操作时间,自行评估自己的项目的读数据业务逻辑的耗时,以此为基础来进行估算。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上加百毫秒即可。
- 第二种: 新启动一个后台监控程序,比如WatchDog监控程序,会加时
-
这种同步淘汰策略,吞吐量降低怎么办
-
第二次删除缓存使用 异步删除
-
-
-
-
先更新数据库,再删除缓存
-
异常问题
-
订阅binlog程序再mysql中有现成的中间件叫canal,可以完成订阅binlog日志的功能(下一章具体实现)
-
解决方法
分布式的事务问题一定要遵守最终一致性,可以允许短暂的信息滞后
-
3.总结
-
先删除缓存值再更新数据库
- 有可能导致请求因缓存缺失而访问数据库,给数据库带来压力导致打满mysql
- 如果业务应用中读取数据库和写缓存的时间不好估算,那么,延迟双删中的等待时间就不好设置
-
先更新数据库,再删除缓存
- 如果业务层要求必须读取一致性的数据,那么我们就需要在更新数据库时,先在Redis缓存客户端暂停并发读请求,等数据库更新完、缓存值删除后,再读取数据,从而保证数据一致性,这是理论可以达到的效果,但实际,不推荐,因为真实生产环境中,分布式下很难做到实时一致性,一般都是最终一致性。
Redis缓存双写一致性
目录
双写一致性
Redis与Mysql双写一致性
canal
主要是用于MySQL数据库增量日志数据的订阅,消费和解析(由阿里开源的Java项目),canal是通过伪装成MySQL的slave节点来转储master节点的binlog日志的一个中间件,他拿到日志内容以后,就可以把日志的相关数据变更重放到任何地方,可以是其他的MySQL,也可以是消息队列,redis甚至是文件中.
配置流程
- 开启MySQL的binlog写入功能(需要重启MySQL,阿里云的好像默认就开启了)
- 授权canal连接MySQL的账号,其实就是新建一个canal专用的账号便于区分(权限可以稍微高一些)
- 去官网下载并解压canal到自己的目录下,修改instance.properties配置文件
- 换成自己mysql主机所在的ip地址
- 换成自己刚才给MySQL新建的用户及其密码
- 启动canel并查看server和instance实例的日志来确保启动运行成功
代码案例
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class RedisCanalClientExample
public static final int _60SECONDS = 60;
public static void main(String[] args)
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(
"127.0.0.1", 1111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
System.out.println("---------程序启动,开始监听MySQL的变化: ");
try
connector.connect();
//这个就是你要订阅的变化的那个库表
connector.subscribe("db_test.t_user");
connector.rollback();
int totalEmptyCount = 10 * _60SECONDS;
while (emptyCount < totalEmptyCount)
//获取指定数量的数据
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0)
emptyCount++;
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
else
emptyCount = 0;
printEntry(message.getEntries());
System.out.println();
//提交确认
connector.ack(batchId);
//处理失败,回滚数据
//connector.rollback(batchId);
System.out.println("empty too many times,exit");
finally
connector.disconnect();
private static void printEntry(List<CanalEntry.Entry> entries)
for (CanalEntry.Entry entry : entries)
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND)
continue;
CanalEntry.RowChange rowChange = null;
try
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
catch (InvalidProtocolBufferException e)
throw new RuntimeException(e);
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.printf("==========binlog[%s:%s],name[%s,%s],eventType : %s%n",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType);
for (CanalEntry.RowData rowData : rowChange.getRowDatasList())
if (eventType == CanalEntry.EventType.INSERT)
redisInsert(rowData.getAfterColumnsList());
else if (eventType == CanalEntry.EventType.UPDATE)
redisUpdate(rowData.getAfterColumnsList());
else
redisDelete(rowData.getAfterColumnsList());
private static void redisInsert(List<CanalEntry.Column> columns)
//实现省略,往redis插入数据
private static void redisUpdate(List<CanalEntry.Column> columns)
//实现省略,往redis修改数据
private static void redisDelete(List<CanalEntry.Column> columns)
//实现省略,往redis删除数据
双写一致性理解
- redis中有数据,需要和数据库中的值相同
- redis中无数据,需要数据库中的值要是最新值
缓存操作细分
- 只读缓存
- 读写缓存
- 同步直写策略:写数据库时也同步写缓存,缓存和数据库中的数据一致(对于读写缓存来说,要想保证缓存和数据库中的数据一致,就要采用同步直写策略)
缓存一致性多种更新策略
挂牌报错,凌晨升级
让客户稍作等待,然后趁机更新mysql和redis(特别重要级别的数据最好不要多线程)
给缓存设置过期时间,是保证最终一致性的解决方案.所有的写操作以数据库为准,对缓存操作只是尽最大的努力即可.也就是说如果数据库写入成功,缓存更新失败,那么只要到达过期时间.后面的请求自然会从数据库中读取新数据然后回填缓存,达到一致性.切记以mysql的数据库写入为准.
先更新数据库,在更新缓存
在高并发的情境下,这个操作是跨两个不同的系统的,就一定会可能发生数据不一致的问题,导致读到脏数据(比如某方更新失败了)
先删除缓存,在更新数据库
容易出现的异常问题:A线程删除了缓存,去更新mysql. B线程过来又要读取,A还在更新中,这时候有可能发生
- 有可能缓存击穿(看你有没有双端检索加锁来初始化缓存)
- B从mysql获得了旧值
- B会把获得的旧值写回到Redis缓存(被A删除掉的旧数据,又被B给写会了,缓存的更新就失败了)
- 请求A更新完成,MySQL与Redis发生了数据不一致的情况
这种方案尽量不要用
先更新数据库,在删除缓存
还是会出现短时间的数据不一致(可能会从缓存中读取到旧数据)
canal就是类似的思想
延迟双删策略
先删除Redis的缓存,在更新完数据库之后,再删除一次Redis的缓存(延迟删除),这时候能保证数据的最终一致性.
- 这个删除该休眠多久
- 自己根据业务进行一个具体的评估,在此耗时基础上面加个**百毫秒**左右即可
- 如果MySQL是主从分离如何
- 从库更可能导致数据不一致问题(还有个主从复制的延迟时间),所以更加需要采用延迟双删的策略了(延迟时间可能需要再加上百毫秒时间)
- 这种同步淘汰策略,吞吐量降低了怎么办
- 可以新起来一个线程去后台做这个事情(用CompletableFuture等实现)
分布式系统只有最终一致性,很难去做到强一致性
总结
把Redis作为只读缓存的话还好,没有一致性的问题,但是如果把Redis作为读写缓存来用.建议使用先更新数据库,再删除缓存的方案.理由如下:
- 先删除缓存的值在更新数据库,有可能缓存击穿打满MySQL,并且也避免不了数据不一致的问题
- 如果业务应用中读取数据库和写缓存的时间不好估算,那么延迟双删中的等待时间就不好设置
以上是关于Redis7高级之缓存双写一致性之更新策略探讨的主要内容,如果未能解决你的问题,请参考以下文章