java延迟队列

Posted kankankankankan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java延迟队列相关的知识,希望对你有一定的参考价值。

大多数用到定时执行的功能都是用任务调度来做的,单身当碰到类似订餐业务/购物等这种业务就不好处理了,比如购物的订单功能,在你的订单管理中

有N个订单,当订单超过十分钟未支付的时候自动释放购物车中的商品,订单失效。这种高频率的延迟任务再用任务调度(定时)实现就得不偿失了。推荐用Java延迟队列来实现,DelayQueue是java.util.concurrent中提供的一个类DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,

其中的对象只能在其到期时才能从队列中中取走。这种队列是有序的,即对头对象的延迟到期时间最长。注意:不能讲null元素放置到这种队列中。

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

//任务线程 实现delayed接口
public class DelayItem<T extends Runnable> implements Delayed {
//到期时间
private final long time;
//任务对象
private final T task;
//原子类
private static final AtomicLong atomic = new AtomicLong(0);
private final long n;

public DelayItem(long timeout, T t) {
    this.time = Systen,nanoTime() + timeout;
    this.task = t;
    this.n = atomic.getAndIncrement();
}
//返回与此对象相关的剩余延迟时间,以给定的时间单位表示
public long getDelay(TimeUnit unit) {
    return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS);
}

public int compareTo(Delayed other) {
    if(other == this) {
    return 0;
    }
    if(other instanceof DelayItem) {
        DelayItem<?> x = (DelayItem<?>)other;
        long diff = tiem - x.time;
        if(diff < 0) {
            return -1;
        }else if(diff > 0) {
            return 1;
        }else if( n < x.n){
            return -1;
        }else {
            return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) -           other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }
    public T getTask(){
        return this.task;
    }
    @Override
    public int hashCode(){
        return task.hashCode();
    }
    @Override
    public boolean equals(Object object){
        if(object instanceof DelayItem){
            return object.hashCode() == hashCode() ? true : false;
    }
    return false;
}
}

//管理延迟任务的类
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
//延迟队列存放有效期对象
public class ItemQueueThread {
private static final Logger logger = Logger.getLogger(this.class);
private ItemQueueThread(){}
//延迟加载(线程安全)
private static class LazyHolder{
    private static ItemQueueThread itemQueueThread = new ItemQueueThread();
}
public static ItemQueueThread getInstance(){
    return LazyHolder.itemQueueThread;
}
//缓存线程池
ExecutorService executor = Executors.newCacheThreadPool();
//线程
private Thread daemonThead;
//初始化线程
public void init() {
    daemonThread = new Thread(() -> {
        try{
            execute();
        }cathc(InterruptedException e){
            e.printStackTrace();
            logger.info(e.getMessage());
        }
    });
    System.out.println("init......start");
    daemonThread.start();
}

private void execute() throws InterrupedException {
    while(true) {
        Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
        System.out.println("线程数...." + map.size());
        System.out.println(System.currentTimeMills());
        System.out.println(item.size());
        System.out.println("线程状态----" + Thread.currentThread().getState());
        try{
            //从延迟队列中取值,如果没有对象过期责队列一直等待
            DelayItem<?> t1 = item.take();
            if(t1 != null){
                Runnable task = t1.getTask();
                    if(task == null){
                        continue;
                }
                executor.execute(task);
            }
        }catch(Exception e) {
            e.printStackTrace();
            logger.info(e.getMessage());
        }
    }
}
//创建空的延迟队列
private DelayQueue<DelayItem<?>> item = new DelayQueue<>();
//往队列中添加任务
public void put(long time, Runnable task, TimeUnit timeUnit){
    //转换成ns
    long nanoTime = TimeUnit.NANOSECONDS.convert(time,timeUnit);
    DelayItem<?> k = new DelayItem(nanoTime,task);
    item.put(k);_:
}
//结束任务
public boolean endTask(DelayItem<Runnable> task){
    return item.remove(task);
}
}

//把需要延迟的功能代码单独抽取出来作为一个类,继承Runnable实现run方法
public class DataDemo implements Runnable {
    int a = -1;
    public DataDemo(int i){
        this.a = i;
}
@Override
public void run(){
    System.out.println("超时,要撤销订单...." + a);
}
}

//test class
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class DelayTest{
public static void main(String[] args){
    ItemQueueThread  ith = ItemQueueThread.getInstance();
    ith.init();
    Random r = new Random();
    for(int i = 0; i < 5; i++){
        int a = r.nextInt(20);
        System.out.println("预先知道等待时间:" + a);
        DataDemo dd = new DataDemo(a);//创建一个任务对象
        ith.put(a,dd,TimeUnit.SECONDS);//将任务添加到队列中
    }
}
}
//注意ItemQueueThread的init方法,要在容器初始化的时候就要执行,或在第一次put延迟对象任务之前就要初始化完成,当设定的延迟时间到期时会执行任务对象中的run
}            

  

以上是关于java延迟队列的主要内容,如果未能解决你的问题,请参考以下文章

java DelayedQueue延迟队列

java延迟队列DelayQueue使用及原理

java延迟队列

Android中切换标签片段之间的延迟

Java延迟队列——DelayQueue

RabbitMQ:延迟队列