redis——基于redission的延时队列
Posted 水田如雅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis——基于redission的延时队列相关的知识,希望对你有一定的参考价值。
private RBlockingQueue<TeacherAuthProgressContext> blockingQueue;
private RDelayedQueue<TeacherAuthProgressContext> delayedQueue;
初始化:
blockingQueue = client.getNativeClient().getBlockingQueue(queueName);
if (blockingQueue == null)
throw new RuntimeException(queueName + " initialize failed");
delayedQueue = client.getNativeClient().getDelayedQueue(blockingQueue);
if (delayedQueue == null)
throw new RuntimeException(queueName + " initialize failed");
AlpsThreadPool.getInstance().submit(this::takeOutDelayedQueue);
放入取出:
public void putInDelayedQueue(TeacherAuthProgressContext data, long delay, TimeUnit timeUnit)
// 超过14天的任务,直接处理为结束啦,不再往延时队列放了
if (data.getTotalDay() > 14)
TeacherAuthProgress teacherAuthProgress = data.getTeacherAuthProgress();
teacherAuthProgress.setEndAt(new Date());
teacherAuthProgressPersistence.upsert(teacherAuthProgress);
else
delayedQueue.offer(data, delay, timeUnit);
private void takeOutDelayedQueue()
while (true)
try
TeacherAuthProgressContext data = blockingQueue.take();
AlpsThreadPool.getInstance().submit(() -> process(data));
catch (InterruptedException e)
logger.error("teacher_auth_progress_delayed_queue process error:", e);
底层lua脚本采用zrangebyscore实现。
以上是关于redis——基于redission的延时队列的主要内容,如果未能解决你的问题,请参考以下文章