redis实现延时队列(附完整代码)

Posted 张子行的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis实现延时队列(附完整代码)相关的知识,希望对你有一定的参考价值。

最近在复习所学过的队列的知识,像什么LinkedBlockingDeque。ArrayBlockingQueue,还有ribbitmq里的乱七八糟的,其本质我感觉啊这些技术就是一些队列,只不过大体上分为单机队列和分布式队列而已,当然本文的重点在于redis实现延时队列啊,可能有人会说,用ribbitmq这个专门的消息中间件实现延时队列不香么,给消息设置个ttl,失效了放入死信队列进行监听,不就行了吗。可是面试的时候或者有些公司就是不想用消息中间件,你难不成还去劝说老总吗?显然这不现实,我们要做的就是尽量能多学点就多学点吧,保不齐以后就有这么刻薄的需求要我们来实现呢。

思路

先介绍下redis的5大数据类型吧,string hash set sortset list,其中与本文有关的就是sortset,大白话来说就是sortset可以进行排序,可以进行排序就意味这是不是与队列搭边了。其实大体上的思路就是存数据时以时间戳为排序规则存入,取数据时:当前时间戳>存入数据的时间戳则消费。

生产者实现

实际开发中可以根据需求封装一个vo类作为消息存入redis,我这里直接简单写了把i作为value存入QUEUENAME集合中,设置scroe为(当前时间+i分钟)的时间戳

	@Autowired
    RedisTemplate redisTemplate;
    String QUEUENAME = "zzh:queue";
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	@Test
    public void pr() 
        for (int i = 1; i <= 1; i++) 
            Calendar now = Calendar.getInstance();
            now.setTime(new Date());
            now.set(Calendar.MINUTE, now.get(Calendar.MINUTE) + i);
            System.out.println("生产了:" + i + "当前时间为:" + simpleDateFormat.format(System.currentTimeMillis()) + "消费时间为:" + simpleDateFormat.format(now.getTime()));
            //往QUEUENAME这个集合中放入i,设置scorce为排序规则
            redisTemplate.opsForZSet().add(QUEUENAME, i, now.getTime().getTime());
        

    

消费者实现

拿出score范围在0-当前时间戳的每条数据,遍历比较是否时间戳<当前时间戳,如果小于表示没有到达可以消费的时间,直至>了才可以消费

/**
     * @param
     * @method 每间隔1秒执行一次
     */
    @Scheduled(cron = "*/1 * * * * * ")
    public void cs() 
        System.out.println("------------等待消费--------------");
        //取出QUEUENAME集合中的score在0-当前时间戳这个范围的所有值
        Set<Integer> set = redisTemplate.opsForZSet().rangeByScore(QUEUENAME, 0, System.currentTimeMillis());
        Iterator<Integer> iterator = set.iterator();
        while (iterator.hasNext()) 
            Integer value = iterator.next();
            //遍历取出每一个score
            Double score = redisTemplate.opsForZSet().score(QUEUENAME, value);
            //达到了时间就进行消费
            if (System.currentTimeMillis() > score) 
                System.out.println("消费了:" + value + "消费时间为:" + simpleDateFormat.format(System.currentTimeMillis()));
                redisTemplate.opsForZSet().remove(QUEUENAME, value);

            
        


    

测试

可以看到只有达到时间才进行消费

完整代码链接

觉得不错的点个小星星吧
https://github.com/zhangzihang3/DelayQueue.git

以上是关于redis实现延时队列(附完整代码)的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ的死信队列和延时队列

利用rabbitMq的死信队列实现延时消息

RabbitMq 实现延时队列-Springboot版本

RabbitMQ实现延时队列(死信队列)

RabbitMQ实现延时队列(死信队列)

通过RabbitMQ的DIRECT模式以及死信队列实现延时任务