对于缓存在redis中且不会改变的数据并发读为啥还会出现QPS的瓶颈
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了对于缓存在redis中且不会改变的数据并发读为啥还会出现QPS的瓶颈相关的知识,希望对你有一定的参考价值。
参考技术A 相对于熟读官方文档,更重要的是要把框架环境搭起来。零、环境介绍操作系统:centOS数据库:mysql5/installerphpmvcomposer.phar/usr/local/bin/composer安装完成,使用以下命令看是否安装成功composer-V出现版本号,即说明安装成功三、安装Laravel按照Laravel的官方文档说明即可,建议使用【通过Laravel安装工具】,没什么坑,这里略过提示:由于Laravel还依赖于一些的PHP扩展,所以使用yum安装sudoinstallyumphp-mysqlphp-mcryptphp-mbstringphp-tokenizerphp-openssl安装完成后,在nginx的配置文件(一般是/etc/nginx/conf.d/default.conf)最下方添加如下配置location/try_files$uri$uri//index.php?$query_string;来到你的laravel工程目录下,看到storage和vendor文件夹,使用以下命令修改其文件读写权限,让Nginx用户能读写它sudochmod-R766storagesudochmod-R766vendor四、让MVC跑起来!在此之前,你应该读一下官方文档路由、控制器、数据库使用基础、EloquentORM至此,可以开始coding,开发一个MVC的demo了,此demo的功能是将数据库表tbl_item从数据库里读出来,并以json格式响应给浏览器。假设你已经通过laravelnewdemo来初始化你的webapp。数据库里建库(demo)、建表(tbl_item),(字段随意定)配置配置文件config/database.php直接操作数据库,往tbl_item里插入一条数据开始codingdemo/app/http/routes.php底部添加如下代码:Route::get('/item/id','ItemController@showItem');demo/app/http/controllers/目录新添文件ItemController.php,代码如下:model=newItem();publicfunctionshowItem($id)$users=$this->model->fetchAll();echojson_encode($users);Log::info('获取用户列表,通过msyql');demo/app/目录下新增文件Item.php代码如下all()->toJson();return$items;使用浏览器访问yourIp/item/1,即可列出所有的item数据五、Laravel结合Redis直连DB是不够的,很快数据库访问就会成为系统的瓶颈。我们引入缓存Redis。还是一样的思路,先让系统跑起来。1、安装启动Redis安装$wgetdownload.redis.io/releases/redis-3.0.1.tar.gz$tarxzfredis-3.0.1.tar.gz$cdredis-3.0.1$make启动$src/redis-server查看官方下载和安装文档,只需要几个命令即可2、安装PHPPRedisPRedis是PHP访问redis的扩展包,只需要下载原码即可,不需要安装PHP扩展(如php-redis.so)。但在这之前要介绍一个composer,因为laravel通过它来安装第三方程序包(管理依赖关系)。cd到你的App所在路径,修改composer.json,在require字段里,添加"predis/predis":"~1.0.1",然后当前目录下sudocomposerupdate,此时就会自动下载包需要的扩展包,这些扩展包将会被放在vendor目录下。如果出现内存不够这些报错,现在看来是内存分配不够的原因,重启一下服务器即可,彻底解决的法要修改服务器配置,但我不清楚改哪里,后续再补配置相关配置,查看官方文档即可。主要是配置config/database.php'redis'=>array('cluster'=>false,'default'=>array('host'=>'127.0.0.1','port'=>6379))3、codingmodel=newUser();/***Show**@returnResponse*/publicfunctionshowUser($id)$redis=Redis::connection('default');$cacheUsers=$redis->get('userList');if($cacheUsers)$users=$cacheUsers;print_r($users);Log::info('获取用户列表,通过redis');else$users=$this->model->fetchAll();$redis->set('userList',$users);print_r($users);Log::info('获取用户列表,通过msyql');一线大厂高并发Redis缓存架构
文章目录
高并发缓存架构设计
架构设计思路
首先是一个基础的缓存架构,对于新增、修改操作set会对缓存更新,对于查询操作先去查询redis缓存,没有再去查询DB,然后把数据在Redis中也缓存一份
但如果数据量很大,redis中不能存储所有的数据,而且很大一部分数据都是冷门数据,可能就插入时用了一次。对于这种情况我们就可以在新增或修改时对缓存set时加过期时间,在查询时将查询到的数据在缓存中更新过期时间,保证热点数据不过期,这样就简单的实现了冷热分离
考虑解决缓存击穿的情况,大量的数据同时间过期导致大量请求直接落到了DB上。在业务上肯定会有一些批量操作,那么对于批量新增的场景,多个数据设置了同一时间过期,那么就可能会出现缓存击穿的情况,解决方法是再加个随机方法,在原有过期时间上再增加一段随机时间
考虑解决缓存穿透的情况,管理员误删热点数据或恶意攻击,一直访问缓存和数据库中都不存在的数据,进而拖垮数据库。
我们可以选择往缓存中存一个空值+过期时间;或者是布隆过滤器。
这里最好是要区分一下,写一个方法区查询缓存,如果查询缓存没有查询到数据返回null,如果查询到了数据但是这个数据是我们上一步存的空值那么这里返回什么,最好是和前一步返回的null区分开
考虑解决热点数据缓存重建问题,热点数据过期或者是冷门数据忽然变成热点数据,这时缓存中没数据,DB中有数据,一瞬间大量的请求要查询这条数据就都落到了DB上。
解决方案是加锁,对查询DB操作加锁,这里需要加分布式锁,锁的粒度要小,比如电商中使用锁前缀+商品id。之所以要使用分布式锁而不是synchronized也是因为锁的粒度,使用synchronized(this)
方式那么查询其他商品的操作也会被阻塞住,使用synchronized(锁前缀 + product.getId())
这种方式生成的锁对象又不是同一个对象。为了性能,同时还需要参考单例模式的双重检测机制,将DCL(Double-Checked-Locking)机制也加上。
伪代码如下:
// 查询缓存
Object o = queryCache(keyStr);
if(o != null)
return o;
// 热点数据缓存重建 缓存没有查询到就加锁
lock.lock();
try
// 再查一遍
Object o = queryCache(keyStr);
if(o != null)
return o;
// 还没有查询到就查询DB并往Redis中写
queryDatabase(...);
finally
lock.unlock();
考虑解决缓存与DB双写不一致问题
双写不一致问题
读写不一致问题,假如更新操作时我不更新缓存,而是删除缓存嘞?如下所示也是有问题的。
要解决上面这些问题的方法还是加锁,在查DB更新缓存、更新DB更新缓存这两个地方加同一把分布式锁。此时就会有人提问了:
-
这么写一个简单的增删改查逻辑代码会变得很复杂
如果并发不高或者能容忍这些问题的话,那么你可以不加这些校验代码,如果你想要解决这些小概率问题那么就必须要加相应的代码。 -
这么写都是串行执行,效率不会很低吗?
在上一步中我们对查询方法就加了DCL机制,绝大部分请求在两次校验时就从缓存中拿到数据返回了,根本不会进入到加锁的位置
Object o = queryCache(keyStr);
if(o != null)
return o;
// 解决热点数据缓存重建问题 缓存没有查询到就加锁
lock.lock();
try
Object o = queryCache(keyStr);
if(o != null)
return o;
// 这个方法中才是查询DB更新缓存,这里面才会加锁,但是大部分请求在上面两次查缓存中就已经拿到数据并返回了
queryDatabase(...);
finally
lock.unlock();
接下来尝试对上面两种分布式锁做优化
首先是为了解决热点数据缓存重建问题而加的分布式锁,其实现在这种方式也能用了,因为只会有一个请求去查询DB然后写入缓存,其他阻塞的线程就会通过第二次验证从缓存中拿到数据返回。但是可能会有几万个请求都阻塞在了加锁的那一行中,我们可以使用trylock()
方法来优化,指定一个最大等待时间,这样所有等待的线程一到时间就直接去执行第二个从缓存中取数据的代码了。
接下来是为了解决缓存与DB数据不一致问题而加的分布式锁,大部分的情况下我们业务都是读多写少,我们可以在这里使用读写锁来提高性能,更新DB更新缓存的地方使用写锁,缓存无数据 查询DB更新缓存的地方用读锁。
接下来考虑解决缓存雪崩的问题,我们都是请求–>web服务–>redis,如果并发量很大,多个web服务的请求到发到了一个redis节点上,redis处理不过来一秒十几万或更多的请求,那么redis就可能宕机或者是用户线程一直等待redis响应
如果redis宕机,那么业务代码trycatch后一般就去查数据库了,大量的请求就会让数据库挂掉,接着再整个服务都不能访问了。
如果redis没宕机,用户线程一直等待redis数据返回,此时并发很高,web服务中线程得不到释放,又不断有新的请求进来,又导致微服务可能宕机,进而影响到整个系统宕机。
解决方法:
-
限流,但万一中间件部分限流没有拦住,我们在代码层面也要做相应的处理,使用下面这种解决方案
-
多级缓存,我们可以在JVM层面也加一层缓存,在访问Redis前先访问JVM层面的缓存。因为JVM是本机内存,并发访问可以达到每秒百万
我们还需要一个单独的系统去维护JVM层面的缓存。确定什么样的数据能放缓存中;某个微服务节点更新了数据,其他节点的JVM层面的缓存也要更新等等情况。
完整代码
package com.hs.distributlock.service;
import com.alibaba.fastjson.JSON;
import com.hs.distributlock.entity.ProductEntity;
import com.hs.distributlock.mapper.ProductMapper;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RReadWriteLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* @Description: 高并发缓存架构
* @Author 胡尚
* @Date: 2023/3/25 20:10
*/
@Service
public class ProductService
@Autowired
StringRedisTemplate redisTemplate;
@Autowired
Redisson redisson;
@Autowired
ProductMapper productMapper;
/**
* 缓存穿透默认值
*/
public static final String PRODUCT_PENETRATE_DEFAULT = "";
/**
* 突发性热点缓存重建时加的锁
*/
public static final String LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX = "lock:product:hot_cache_create:";
/**
* 突发性热点缓存重建时加的锁
*/
public static final String LOCK_CACHE_DB_UNLIKE_PREFIX = "lock:cache_db_unlike:";
public void updateProduct(ProductEntity entity)
// 解决缓存与数据库双写 数据不一致问题 而加写锁
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_CACHE_DB_UNLIKE_PREFIX + entity.getId());
RLock wLock = readWriteLock.writeLock();
wLock.lock();
try
// 更新DB 更新缓存
productMapper.update(entity);
String json = JSON.toJSONString(entity);
redisTemplate.opsForValue().set("product:id:" + entity.getId(), json, 12*60*60+getRandomTime(), TimeUnit.SECONDS);
finally
wLock.unlock();
public void insertProduct(ProductEntity entity)
// 解决缓存与数据库双写 数据不一致问题 而加写锁
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_CACHE_DB_UNLIKE_PREFIX + entity.getId());
RLock wLock = readWriteLock.writeLock();
wLock.lock();
try
// 更新DB 更新缓存
productMapper.insert(entity);
String json = JSON.toJSONString(entity);
redisTemplate.opsForValue().set("product:id:" + entity.getId(), json, 12*60*60+getRandomTime(), TimeUnit.SECONDS);
finally
wLock.unlock();
/**
* 重点方法,这其中使用了双重检测去查询缓存,还加了读锁去解决缓存和数据库双写导致的数据不一致问题
*/
public ProductEntity queryProduct(Long id) throws InterruptedException
String productKey = "product:id:" + id;
// 从缓存取数据
ProductEntity entity = queryCache(productKey);
if (entity != null)
return entity;
// 突发性热点缓存重建问题,避免大量请求直接去请求DB,进而加锁拦截
RLock lock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + id);
lock.tryLock(3, 30, TimeUnit.SECONDS);
try
// 第二次验证 从缓存取数据
entity = queryCache(productKey);
if (entity != null)
return entity;
// 缓存与DB数据双写不一致问题 加锁
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_CACHE_DB_UNLIKE_PREFIX + id);
RLock rLock = readWriteLock.readLock();
rLock.lock();
try
// 查询数据库 更新缓存
entity = queryDatabase(productKey, id);
finally
rLock.unlock();
finally
lock.unlock();
return entity;
/**
* 从缓存中查询
*/
private ProductEntity queryCache(String key)
ProductEntity entity = null;
String json = redisTemplate.opsForValue().get(key);
if (StringUtils.hasLength(json))
// 判断是否为解决缓存穿透而手动存储的值,如果是则直接返回一个新对象,并和前端约定好错误提示
if (Objects.equals(PRODUCT_PENETRATE_DEFAULT, json))
return new ProductEntity();
entity = JSON.parseObject(json, ProductEntity.class);
// 延期
redisTemplate.expire(key, 12*60*60+getRandomTime(), TimeUnit.SECONDS);
return entity;
/**
* 从数据库中查询,如果查询到了就将数据在缓存中保存一份,如果没有查询到则往缓存中存一个默认值来解决缓存击穿问题
*/
private ProductEntity queryDatabase(String productKey, long id)
ProductEntity entity = productMapper.get(id);
// 如果数据库中也没有查询到,那么就往缓存中存一个默认值,去解决缓存击穿问题
if (entity == null)
redisTemplate.opsForValue().set(productKey, PRODUCT_PENETRATE_DEFAULT, 60*1000, TimeUnit.SECONDS);
else
redisTemplate.opsForValue().set(productKey, JSON.toJSONString(entity), 12*60*60+getRandomTime(), TimeUnit.SECONDS);
return entity;
private Integer getRandomTime()
return new Random().nextInt(5) * 60 * 60;
开发规范与优化建议
键值设计
Key的设计
-
可读性和可管理性。以业务名为前缀,用冒号分割。避免直接使用id作为key导致数据覆盖的情况。
-
简洁性。保证语义的前提下,控制key的长度,不要太长
-
不要包含空格、换行、单双引号以及其他特殊字符
value的设计
-
避免bigkey
字符串类型,最大512MB,但是一般超过10KB就认为是bigkey
非字符串类型,hash、list、set、zset,一般它们的元素超过5000就任务是bigkey
bigkey的危害:导致redis阻塞、网络拥堵
一般产生bigkey的原因:按天统计某些功能或用户集合、将数据库查询到的数据序列化后存入缓存,我们要判断是否是所有字段都需要缓存
优化bigkey:将一个bigkey拆分为多个数据、如果不可拆分则不要每次取所有的数据,比如
hmget key field [field ...]
只读取其中某些元素 -
选择合适的数据类型
反例:
set user:1:name tom set user:1:age 19 set user:1:favor football
正例:
hmset user:1 name tom age 19 favor football
-
控制key的生命周期,添加过期时间
命令使用
-
O(N)命令关注N的数量
例如hgeall、lrange、smembers、zrange等并非不能用,但需要关注N的数量。
有遍历的需求可以使用hscan、sscan、zscan代替。
-
禁用命令:keys * 、flushall 、flushdb等
-
使用批量操作提高效率,比如pipeline管道等
-
redis的事务不要过多使用,可以使用lua脚本来代替
客户端的使用
-
避免所有业务使用同一个redis实例
可以搭建多个redis集群,不同的微服务调用不同的redis集群,不相干的业务拆分,公共数据做服务化。
-
使用带有连接池,有效控制连接提高效率
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(5); jedisPoolConfig.setMaxIdle(2); jedisPoolConfig.setTestOnBorrow(true); JedisPool jedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379, 3000, null); Jedis jedis = null; try jedis = jedisPool.getResource(); //具体的命令 jedis.executeCommand() catch (Exception e) logger.error("op key error: " + e.getMessage(), key, e); finally //注意这里不是关闭连接,在JedisPool模式下,Jedis会被归还给资源池。 if (jedis != null) jedis.close();
连接池参数含义:
序号 参数名 含义 默认值 使用建议 1 maxTotal 资源池中最大连接数 8 设置建议见下面 2 maxIdle 资源池允许最大空闲的连接数 8 设置建议见下面 3 minIdle 资源池确保最少空闲的连接数 0 设置建议见下面 4 blockWhenExhausted 当资源池用尽后,调用者是否要等待。只有当为true时,下面的maxWaitMillis才会生效 true 建议使用默认值 5 maxWaitMillis 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒) -1:表示永不超时 不建议使用默认值 6 testOnBorrow 向资源池借用连接时是否做连接有效性检测(ping),无效连接会被移除 false 业务量很大时候建议设置为false(多一次ping的开销)。 7 testOnReturn 向资源池归还连接时是否做连接有效性检测(ping),无效连接会被移除 false 业务量很大时候建议设置为false(多一次ping的开销)。 8 jmxEnabled 是否开启jmx监控,可用于监控 true 建议开启,但应用本身也要开启 一般我们控制最大连接数和资源池允许的最大空闲连接数这两个配置就行了
Redis连接池的连接对象是懒加载的,连接池对象刚创建时不会初始化创建连接对象,是真正使用时才会去创建,我们可以做连接池预热
// 先将创建的连接对象保存起来,然后再统一调用close()方法放回连接池中 List<Jedis> minIdleJedisList = new ArrayList<Jedis>(jedisPoolConfig.getMinIdle()); for (int i = 0; i < jedisPoolConfig.getMinIdle(); i++) Jedis jedis = null; try jedis = pool.getResource(); minIdleJedisList.add(jedis); jedis.ping(); catch (Exception e) logger.error(e.getMessage(), e); finally //注意,这里不能马上close将连接还回连接池,否则最后连接池里只会建立1个连接 //jedis.close(); //统一将预热的连接还回连接池 for (int i = 0; i < jedisPoolConfig.getMinIdle(); i++) Jedis jedis = null; try jedis = minIdleJedisList.get(i); //将连接归还回连接池 jedis.close(); catch (Exception e) logger.error(e.getMessage(), e); finally
扩展
布隆过滤器
当布隆过滤器说某个值存在时,这个值可能不存在;当它说不存在时,那就肯定不存在。
布隆过滤器不能删除数据,如果要删除得重新初始化数据。
底层原理如下图所示:
使用Redisson实现一个简单的案例
package com.redisson;
import org.redisson.Redisson;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
public class RedissonBloomFilter
public static void main(String[] args)
// 构建一个redisson对象
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
RedissonClient redisson = Redisson.create(config);
RBloomFilter<String> bloomFilter = redisson.getBloomFilter("nameList");
//初始化布隆过滤器:预计元素为100000000L,误差率为3%,根据这两个参数会计算出底层的bit数组大小
bloomFilter.tryInit(100000000L,0.03);
//将zhuge插入到布隆过滤器中
bloomFilter.add("hushang");
//判断下面号码是否在布隆过滤器中
System.out.println(bloomFilter.contains("aabbcc"));//false
System.out.println(bloomFilter.contains("eeeee"));//false
System.out.println(bloomFilter.contains("hushang"));//true
实际开发中的伪代码如下
//初始化布隆过滤器
RBloomFilter<String> bloomFilter = redisson.getBloomFilter("nameList");
//初始化布隆过滤器:预计元素为100000000L,误差率为3%
bloomFilter.tryInit(100000000L,0.03);
//把所有数据存入布隆过滤器
void init()
for (String key: keysredis读写分离支撑读请求超过百万