Redis发布订阅功能介绍,生产场景使用及性能测试

Posted 坚持是一种态度

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis发布订阅功能介绍,生产场景使用及性能测试相关的知识,希望对你有一定的参考价值。

Redis发布订阅功能介绍

  • Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息
  • Redis 客户端可以订阅(subscribe)任意数量的频道(channel),一个频道也可以被多个客户端订阅
  • 当有新消息通过PUBLISH命令发送出去(例PUBLISH channel1 message1)后,订阅该channel的客户端就可以收到消息

使用redis-cli客户端测试

  • 先打开一个客户端,使用subscribe命令,订阅channel,这时进程被挂起,持续接收消息

  • 再打开一个客户端,使用publish命令,发送消息

  • 这时,订阅channel的客户端,可以接收到消息,发送方则得到1或0的返回,1成功,0失败

Redis发布订阅生产使用场景

  • 网站前台,会员访问轨迹记录,需要同步到另一个服务会员中心展示
  • 新的访问轨迹insert插入数据库,之前访问过的链接访问次数加一并更新最后一次访问时间。需要先查询一次,再新增或更新数据
  • 为了减少数据库的查询和写入压力,之前是定时任务每半小时处理一次,批量查询,再批量新增或更新
  • 这种写入的延迟,导致一段时间内,访问记录不完整,现在需要能更加实时的反馈会员访问记录
  • 摈弃半小时定时任务,改为实时调用接口写入,可以解决,但如果遇到瞬时大规模访问,或者有人暴力压测网站前台,瞬时压力可能使会员中心数据库服务异常
  • 为了应对瞬时峰值,需要削峰处理,考虑使用消息队列
  • 考虑rabbitMQ和redis二选一,但由于引入MQ代码改动量较大,还需要部署rabbitMQ服务,对我们现有的生产场景不友好
  • 另一方面,访问轨迹不是特别重要的数据,大并发量下偶尔的丢失可以接受,故而放弃使用功能更齐全的rabbitMQ,决定使用redis的发布订阅功能

java代码demo

消息订阅者

访问记录 redis 消息订阅者,继承JedisPubSub抽象类。我们这次主要业务是接收消息,只需要重写onMessage(String channel, String message)方法,在这里面写我们具体要处理的业务细节。
主要将之前的业务实现的代码拷贝过来,放到onMessage方法里面。由于业务处理报错抛出异常时,会杀死该线程,从而导致后面的访问记录写入失败,所以这儿要做好异常捕获,对于偶尔的失败捕获不处理

/**
 * 访问记录 订阅者
 *
 * @author yanyulin
 * @date 2021-8-19 10:57:18
 */
public class VisitSubscriber extends JedisPubSub {
    private static final Logger logger = LoggerFactory.getLogger(SubThread.class);
    // 访问记录保存
    private static final IVisitRecordService visitRecordService =  MemberCenterSpringContextHolder
            .getBean(IVisitRecordService.class);;

    /**
     * 接收到消息,调用此方法
     * @param channel
     * @param message
     */
    @Override
    public void onMessage(String channel, String message) {
        message = URLDecoder.decode(message);
        try {
            VisitRecordMSParamVO paramVO = JSONObject.parseObject(message,VisitRecordMSParamVO.class);
            visitRecordService.saveVisitRecord(paramVO);
        }catch (JSONException e){
            logger.error("保存访问记录数据报错,不是标准格式,",e);
        }catch (Exception e){
            logger.error("保存访问记录报错,该条数据舍弃:{}",message);
            e.printStackTrace();
        }
    }
}

消息订阅 线程类

消息订阅,需要有一条挂起线程,持续接收消息。需要继承Thread类,并将前面定义的订阅者类拿来使用,使该订阅者持续接收消息

/**
 * 访问记录 消息订阅 线程
 *
 * @author yanyulin
 * @date 2021-8-19 14:43:08
 */
public class SubThread extends Thread {

    private static final Logger logger = LoggerFactory.getLogger(SubThread.class);
    // redis客户端连接池
    private static final JedisPool jedisPool =  MemberCenterSpringContextHolder.getBean(JedisPool.class);

    // 订阅者
    private final VisitSubscriber subscriber = new VisitSubscriber();

    // 默认频道名称
    public static final String CHANNEL = "saveVisitRecord";

    @Override
    public void run() {
        logger.info("subscribe redis, channel {}, thread will be blocked", CHANNEL);
        Jedis jedis = null;
        try {
            //取出一个连接
            jedis = jedisPool.getResource();
            //通过subscribe 的api去订阅,入参是订阅者和频道名
            jedis.subscribe(subscriber, CHANNEL);
        } catch (Exception e) {
            logger.error("run subsrcibe channel error, ", e);
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}

消息发布类

在原有的业务代码里,将原有的保存业务,改为发布redis消息,将数据保存需求,写入到redis消息队列里,等待订阅者接收处理,从而达到削峰作用。为了防止数据格式和特殊字符影响,直接将数据编码了,使用时再解码。

        //保存会员访问记录
//        visitRecordService.saveVisitRecord(paramVO);
        // 改为使用redis 发布/订阅
        Jedis jedis = null;
        try {
            //取出一个连接
            jedis = jedisPool.getResource();
            String message = URLEncoder.encode(JSON.toJSONString(paramVO));
            // 向CHANNEL发布消息
            Long status = jedis.publish(CHANNEL, message);
            if(status == 0){
                logger.info("try again!");
                jedis.publish(CHANNEL, message);
            }
        } catch (Exception e) {
            logger.error("doing, subsrcibe channel error, ", e);
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }

Spring Boot启动类

在启动类里,把redis消息线程挂起,持续接收消息

    public static void main(String[] args) {
        SpringApplication.run(MemberCenterSpringbootApplication.class, args);
        // 新增,访问记录 消息订阅 线程
        new SubThread().start();
    }

功能和性能测试

功能测试

  • 选择程序连接的redis的ip端口,打开一个redis-cli客户端,选中程序使用的db,直接发送消息(publish),程序可以接收,功能没问题
  • 将会员服务和前台服务代码部署,前台发送消息测试,会员中心消息接收、数据处理正常,功能正常

性能测试

  • 模拟正常业务,持续写入。前台服务,将取到的数据,循环访问,调用会员中心微服务接口,持续25分钟左右,功能正常,无数据丢失,满足业务需求
    /**
     * 测试会员中心访问轨迹记录接口,单线程
     * @return
     */
    @RequestMapping("/saveVisitRecord")
    @ResponseBody
    public Object saveVisitRecord(){
        List<SiteChartMainEO> siteChartMainEOList = siteChartMainService.getEntities(SiteChartMainEO.class,new HashMap<>());
        logger.info("数据量为:{}", siteChartMainEOList.size());
        // 循环写入100次,4万多条数据
        for (int i = 0; i< 100; i++){
            siteChartMainEOList.forEach(mainEO -> {
                long num = Math.round(Math.random() * 100);
                SiteChartMainEO newEO = new SiteChartMainEO();
                BeanUtils.copyProperties(mainEO,newEO);
                newEO.setMemberId(mainEO.getMemberId() + num);
                newEO.setUrl(mainEO.getUrl()+num);
                // 调用微服务接口
                saveMemberCenterVisitRecord(newEO);
            });
        }

        return getObject();
    }
  • 模拟高并发,10条线程。前台服务,将同样的数据,使用10条线程,循环访问,调用会员中心接口。经测试,前台服务90秒左右执行完成,会员中心则执行了25分钟左右,无数据丢失,满足业务需求,达到了削峰需求
    /**
     * 测试会员中心访问轨迹记录接口,10条线程
     * @return
     */
    @RequestMapping("/saveVisitRecord")
    @ResponseBody
    public Object saveVisitRecord(){
        List<SiteChartMainEO> siteChartMainEOList = siteChartMainService.getEntities(SiteChartMainEO.class,new HashMap<>());
        logger.info("一批次数据量为:{}", siteChartMainEOList.size());
        // 循环写入100次,4万多条数据
        for (int i = 0; i< 10; i++){
            new Thread(() -> {
                long t1 = System.currentTimeMillis();
                for (int j = 0; j< 10; j++){
                    siteChartMainEOList.forEach(mainEO -> {
                        long num = Math.round(Math.random() * 199);
                        SiteChartMainEO newEO = new SiteChartMainEO();
                        BeanUtils.copyProperties(mainEO,newEO);
                        newEO.setMemberId(mainEO.getMemberId() + num);
                        newEO.setUrl(mainEO.getUrl()+num);
                        // 调用微服务接口
                        saveMemberCenterVisitRecord(newEO);
                    });
                }
                logger.info("循环一次完成,耗时:{}", (System.currentTimeMillis() - t1)/1000);
            }).start();

        }

        return getObject();
    }
  • 备注:以上代码只是测试用,所以在for循环里重复调用微服务接口,如果实际业务场景这么写,同样的业务重复调用微服务接口,一般都是不合理的。对于这样的场景,要优化接口,修改传参,改为一次调用处理一批数据,减少网络开销、接口调用开销

以上是关于Redis发布订阅功能介绍,生产场景使用及性能测试的主要内容,如果未能解决你的问题,请参考以下文章

laravel扩展使用redis发布与订阅

我在生产项目里是如何使用Redis发布订阅的?Java版代码实现(含源码)

Redis常见场景解析

利用Redis作消息队列,实现生产消费和发布订阅

Redis实现消息队列(生产者/消费者发布订阅模式)

阿里云RocketMQ的性能测试(本地测试)