Redis发布订阅功能介绍,生产场景使用及性能测试
Posted 坚持是一种态度
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis发布订阅功能介绍,生产场景使用及性能测试相关的知识,希望对你有一定的参考价值。
文章目录
Redis发布订阅功能介绍
- Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息
- Redis 客户端可以订阅(
subscribe
)任意数量的频道(channel
),一个频道也可以被多个客户端订阅 - 当有新消息通过
PUBLISH
命令发送出去(例PUBLISH channel1 message1
)后,订阅该channel的客户端就可以收到消息
使用redis-cli客户端测试
-
先打开一个客户端,使用
subscribe
命令,订阅channel,这时进程被挂起,持续接收消息
-
再打开一个客户端,使用
publish
命令,发送消息
-
这时,订阅channel的客户端,可以接收到消息,发送方则得到1或0的返回,1成功,0失败
Redis发布订阅生产使用场景
- 网站前台,会员访问轨迹记录,需要同步到另一个服务会员中心展示
- 新的访问轨迹insert插入数据库,之前访问过的链接访问次数加一并更新最后一次访问时间。需要先查询一次,再新增或更新数据
- 为了减少数据库的查询和写入压力,之前是定时任务每半小时处理一次,批量查询,再批量新增或更新
- 这种写入的延迟,导致一段时间内,访问记录不完整,现在需要能更加实时的反馈会员访问记录
- 摈弃半小时定时任务,改为实时调用接口写入,可以解决,但如果遇到瞬时大规模访问,或者有人暴力压测网站前台,瞬时压力可能使会员中心数据库服务异常
- 为了应对瞬时峰值,需要削峰处理,考虑使用消息队列
- 考虑rabbitMQ和redis二选一,但由于引入MQ代码改动量较大,还需要部署rabbitMQ服务,对我们现有的生产场景不友好
- 另一方面,访问轨迹不是特别重要的数据,大并发量下偶尔的丢失可以接受,故而放弃使用功能更齐全的rabbitMQ,决定使用redis的发布订阅功能
java代码demo
消息订阅者
访问记录 redis 消息订阅者,继承JedisPubSub
抽象类。我们这次主要业务是接收消息,只需要重写onMessage(String channel, String message)
方法,在这里面写我们具体要处理的业务细节。
主要将之前的业务实现的代码拷贝过来,放到onMessage
方法里面。由于业务处理报错抛出异常时,会杀死该线程,从而导致后面的访问记录写入失败,所以这儿要做好异常捕获,对于偶尔的失败捕获不处理
/**
* 访问记录 订阅者
*
* @author yanyulin
* @date 2021-8-19 10:57:18
*/
public class VisitSubscriber extends JedisPubSub {
private static final Logger logger = LoggerFactory.getLogger(SubThread.class);
// 访问记录保存
private static final IVisitRecordService visitRecordService = MemberCenterSpringContextHolder
.getBean(IVisitRecordService.class);;
/**
* 接收到消息,调用此方法
* @param channel
* @param message
*/
@Override
public void onMessage(String channel, String message) {
message = URLDecoder.decode(message);
try {
VisitRecordMSParamVO paramVO = JSONObject.parseObject(message,VisitRecordMSParamVO.class);
visitRecordService.saveVisitRecord(paramVO);
}catch (JSONException e){
logger.error("保存访问记录数据报错,不是标准格式,",e);
}catch (Exception e){
logger.error("保存访问记录报错,该条数据舍弃:{}",message);
e.printStackTrace();
}
}
}
消息订阅 线程类
消息订阅,需要有一条挂起线程,持续接收消息。需要继承Thread
类,并将前面定义的订阅者类拿来使用,使该订阅者持续接收消息
/**
* 访问记录 消息订阅 线程
*
* @author yanyulin
* @date 2021-8-19 14:43:08
*/
public class SubThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(SubThread.class);
// redis客户端连接池
private static final JedisPool jedisPool = MemberCenterSpringContextHolder.getBean(JedisPool.class);
// 订阅者
private final VisitSubscriber subscriber = new VisitSubscriber();
// 默认频道名称
public static final String CHANNEL = "saveVisitRecord";
@Override
public void run() {
logger.info("subscribe redis, channel {}, thread will be blocked", CHANNEL);
Jedis jedis = null;
try {
//取出一个连接
jedis = jedisPool.getResource();
//通过subscribe 的api去订阅,入参是订阅者和频道名
jedis.subscribe(subscriber, CHANNEL);
} catch (Exception e) {
logger.error("run subsrcibe channel error, ", e);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
消息发布类
在原有的业务代码里,将原有的保存业务,改为发布redis消息,将数据保存需求,写入到redis消息队列里,等待订阅者接收处理,从而达到削峰作用。为了防止数据格式和特殊字符影响,直接将数据编码了,使用时再解码。
//保存会员访问记录
// visitRecordService.saveVisitRecord(paramVO);
// 改为使用redis 发布/订阅
Jedis jedis = null;
try {
//取出一个连接
jedis = jedisPool.getResource();
String message = URLEncoder.encode(JSON.toJSONString(paramVO));
// 向CHANNEL发布消息
Long status = jedis.publish(CHANNEL, message);
if(status == 0){
logger.info("try again!");
jedis.publish(CHANNEL, message);
}
} catch (Exception e) {
logger.error("doing, subsrcibe channel error, ", e);
} finally {
if (jedis != null) {
jedis.close();
}
}
Spring Boot启动类
在启动类里,把redis消息线程挂起,持续接收消息
public static void main(String[] args) {
SpringApplication.run(MemberCenterSpringbootApplication.class, args);
// 新增,访问记录 消息订阅 线程
new SubThread().start();
}
功能和性能测试
功能测试
- 选择程序连接的redis的ip端口,打开一个redis-cli客户端,选中程序使用的db,直接发送消息(
publish
),程序可以接收,功能没问题 - 将会员服务和前台服务代码部署,前台发送消息测试,会员中心消息接收、数据处理正常,功能正常
性能测试
- 模拟正常业务,持续写入。前台服务,将取到的数据,循环访问,调用会员中心微服务接口,持续25分钟左右,功能正常,无数据丢失,满足业务需求
/**
* 测试会员中心访问轨迹记录接口,单线程
* @return
*/
@RequestMapping("/saveVisitRecord")
@ResponseBody
public Object saveVisitRecord(){
List<SiteChartMainEO> siteChartMainEOList = siteChartMainService.getEntities(SiteChartMainEO.class,new HashMap<>());
logger.info("数据量为:{}", siteChartMainEOList.size());
// 循环写入100次,4万多条数据
for (int i = 0; i< 100; i++){
siteChartMainEOList.forEach(mainEO -> {
long num = Math.round(Math.random() * 100);
SiteChartMainEO newEO = new SiteChartMainEO();
BeanUtils.copyProperties(mainEO,newEO);
newEO.setMemberId(mainEO.getMemberId() + num);
newEO.setUrl(mainEO.getUrl()+num);
// 调用微服务接口
saveMemberCenterVisitRecord(newEO);
});
}
return getObject();
}
- 模拟高并发,10条线程。前台服务,将同样的数据,使用10条线程,循环访问,调用会员中心接口。经测试,前台服务90秒左右执行完成,会员中心则执行了25分钟左右,无数据丢失,满足业务需求,达到了削峰需求
/**
* 测试会员中心访问轨迹记录接口,10条线程
* @return
*/
@RequestMapping("/saveVisitRecord")
@ResponseBody
public Object saveVisitRecord(){
List<SiteChartMainEO> siteChartMainEOList = siteChartMainService.getEntities(SiteChartMainEO.class,new HashMap<>());
logger.info("一批次数据量为:{}", siteChartMainEOList.size());
// 循环写入100次,4万多条数据
for (int i = 0; i< 10; i++){
new Thread(() -> {
long t1 = System.currentTimeMillis();
for (int j = 0; j< 10; j++){
siteChartMainEOList.forEach(mainEO -> {
long num = Math.round(Math.random() * 199);
SiteChartMainEO newEO = new SiteChartMainEO();
BeanUtils.copyProperties(mainEO,newEO);
newEO.setMemberId(mainEO.getMemberId() + num);
newEO.setUrl(mainEO.getUrl()+num);
// 调用微服务接口
saveMemberCenterVisitRecord(newEO);
});
}
logger.info("循环一次完成,耗时:{}", (System.currentTimeMillis() - t1)/1000);
}).start();
}
return getObject();
}
- 备注:以上代码只是测试用,所以在for循环里重复调用微服务接口,如果实际业务场景这么写,同样的业务重复调用微服务接口,一般都是不合理的。对于这样的场景,要优化接口,修改传参,改为一次调用处理一批数据,减少网络开销、接口调用开销
以上是关于Redis发布订阅功能介绍,生产场景使用及性能测试的主要内容,如果未能解决你的问题,请参考以下文章