DelayQueue源码解析

Posted yaowen

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了DelayQueue源码解析相关的知识,希望对你有一定的参考价值。

DelayQueue是一个支持延时获取元素的无界阻塞队列。里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说只有在延迟期到时才能够从队列中取元素。

DelayQueue主要用于两个方面:
- 缓存:清掉缓存中超时的缓存数据
- 任务超时处理 

class DelayedEle implements Delayed {

    private final long delayTime; //延迟时间
    private final long expire;  //到期时间
    private String data;   //数据

    public DelayedEle(long delay, String data) {
        delayTime = delay;
        this.data = data;
        expire = System.currentTimeMillis() + delay; 
    }

    /**
     * 剩余时间=到期时间-当前时间
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    /**
     * 优先队列里面优先级规则
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("DelayedElement{");
        sb.append("delay=").append(delayTime);
        sb.append(", expire=").append(expire);
        sb.append(", data=‘").append(data).append(‘‘‘);
        sb.append(‘}‘);
        return sb.toString();
    }
}

看了DelayQueue的内部结构就对上面几个关键点一目了然了,但是这里有一点需要注意,DelayQueue的元素都必须继承Delayed接口。同时也可以从这里初步理清楚DelayQueue内部实现的机制了:以支持优先级无界队列的PriorityQueue作为一个容器,容器里面的元素都应该实现Delayed接口,在每次往优先级队列中添加元素时以元素的过期时间作为排序条件,最先过期的元素放在优先级最高


 基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。

DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据为空的操作(消费者)才会被阻塞
放的时候不用因为满了而阻塞,取的时候空了会阻塞等待有时间没到也会阻塞等到,自己也可以指定超时时间。


 只有一个getDelay(TimeUnit)方法,该方法返回与此对象相关的的剩余时间。同时我们看到Delayed接口继承自Comperable接口,所以实现Delayed接口的类还必须要定义一个compareTo方法,该方法提供与此接口的getDelay方法一致的排序。

DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。

q:优先队列,用于存储元素,并按优先级排序
leader:用于优化内部阻塞通知的线程,第一个阻塞等待的线程。
以支持优先级的PriorityQueue无界队列作为一个容器,因为元素都必须实现Delayed接口,可以根据元素的过期时间来对元素进行排列,因此,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素。

leader是等待获取队列元素的线程,应用主从式设计减少不必要的等待。如果leader不等于空,表示已经有线程在等待获取队列的元素。所以,通过await()方法让出当前线程等待信号。如果leader等于空,则把当前线程设置为leader,当一个线程为leader,它会使用awaitNanos()方法让当前线程等待接收信号或等待delay时间。 

private final PriorityQueue<E> q = new PriorityQueue<E>();
private final transient ReentrantLock lock = new ReentrantLock();
private Thread leader = null;
private final Condition available = lock.newCondition();

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);  //放一个元素
            if (q.peek() == e) {  //队列的第0个元素是这个元素,放进去的元素成为第0个元素。最大堆第0和元素是最大的,依次是第23大,第4567大,
                leader = null;    //  leader 线程为空
                available.signal();   //其他线程唤醒,如果队首元素是刚插入的元素,则设置leader为null,并唤醒阻塞在available上的线程
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)   //时间不到不走
            return null;
        else
            return q.poll();   //  取出优先级队列里面第0个元素,优先级队列是根据时间排序了的
    } finally {
        lock.unlock();
    }
}

 技术图片

技术图片

推论1:有leader线程等待的时候,新的线程要取,必然加入await队列排队。所有已经await的线程,唤醒了非leader线程就会继续await。每次唤醒一个处于await的线程。同时之能一个线程取就是leader去取。
await释放锁别的线程可以执行,singal不会释放锁别的线程不执行。
推论2:take(),put()方法全部锁住,只能一个线程调用take或put。里面有await就不行了,await后别的线程就可以进来take,put方法,只要不是执行take里面的await方法,那么就只能一个线程执行take,put方法。
推论3:如果都不设置等待时间,那么就没有优先级,只有等到放的时候  或者  成功取到的线程唤醒,都设置等待时间,时间到了一起抢。
推论4:取的时候为空等待,时间到了直接返回,有元素但是时间没到,就设置leader
推论5:等待取的线程,有一个是leader,就是由第一优先权的线程也是第一个尝试获取的线程。所有的等待取的线程中,下一次不一定是ledaer去取,如果唤醒了非leader线程,那也不行,要继续等到,一定要leader线程唤醒后去取,其他线程才能去取,然后成为新的leader。

 

以上是关于DelayQueue源码解析的主要内容,如果未能解决你的问题,请参考以下文章

# Java 常用代码片段

# Java 常用代码片段

JDK源码分析-DelayQueue

J.U.C并发框架源码阅读DelayQueue

死磕 java集合之DelayQueue源码分析

DelayQueue之源码分析