JDK源码分析-DelayQueue

Posted WriteOnRead

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK源码分析-DelayQueue相关的知识,希望对你有一定的参考价值。

概述

DelayQueue 也是一种队列,它内部的元素有“延迟”,也就是当从队列中获取元素时,如果它的延迟时间未到,则无法取出。

DelayQueue 的类签名和 承结构如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {}

下面分析其代码实现。

代码分析

相关接口

DelayQueue 中的元素要实现 Delayed 接口,该接口 定义如下:
public interface Delayed extends Comparable<Delayed> { /** * 以给定的时间单位,返回该对象的剩余延迟 * 若为零或者负数表示延时已经过去 */ long getDelay(TimeUnit unit);}
Delayed 接口继承自 Comparable 接口,而它本身只定义了一个 getDelay 方法,该方法的作用是获取对象的剩余延迟时间。

Comparable 接口也只有一个 compareTo 方法:
public interface Comparable<T> { public int compareTo(T o);}
这里不再详述。

构造器

DelayQueue 有两个构造器,如下:
// 无参构造器public DelayQueue() {}
// 指定集合的构造器public DelayQueue(Collection<? extends E> c) {    // 该方法最后是通过 add 方法实现的,后文进行分析 this.addAll(c);}

成员变量

// 锁,用于保证线程安全private final transient ReentrantLock lock = new ReentrantLock();
// 优先队列,实际存储元素的地方private final PriorityQueue<E> q = new PriorityQueue<E>();
// 线程等待的标识private Thread leader = null;
// 触发条件,表示是否可以从队列中读取元素private final Condition available = lock.newCondition();
关于优先队列可参考前文「」的分析。

入队方法

DelayQueue 也是一个队列,它的入队方法有:add(E), offer(E), put(E) 等,它们的定义如下:
public boolean add(E e) { return offer(e);}
public void put(E e) { offer(e);}
public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e);}
这几个方法都是通过 offer(E) 方法实现的,它的代码如下:
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try {        // 入队 q.offer(e);        // 若该元素为队列头部元素,唤醒等待的线程        // (表示可以从队列中读取数据了)        if (q.peek() == e) {            leader = null;            available.signal(); } return true; } finally { lock.unlock(); }}

出队方法

有入队自然也有出队,主要方法有:poll(), take(), poll(timeout, unit), 如下:
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try {        // 获取队列头部元素 E first = q.peek();        // 头部元素为空,或者延时未到,则返回空 if (first == null || first.getDelay(NANOSECONDS) > 0) return null;        // 否则返回头部元素 else return q.poll(); } finally { lock.unlock(); }}
poll 方法是非阻塞的,即调用之后无论元素是否存在都会立即返回。下面看下阻塞的 take 方法:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 以可中断方式获取锁 lock.lockInterruptibly(); try {        // 无限循环 for (;;) { // 获取队列头部元素 E first = q.peek(); // 若为空,则等待 if (first == null) available.await();            // 若不为空 else {                // 获取延迟的纳秒数,若小于等于零(即过期),则获取并删除头部元素 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll();                // 执行到这里,表示 delay>0,也就是延时未过期 first = null; // don't retain ref while waiting                // leader 不为空表示有其他线程在读取数据,当前线程等待 if (leader != null) available.await(); else {                    // 将当前线程设置为 leader Thread thisThread = Thread.currentThread(); leader = thisThread; try {                        // 等待延迟时间过期 available.awaitNanos(delay);                    } finally { if (leader == thisThread) leader = null; } } } } } finally { // 唤醒该条件下的其他线程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); }}
该方法看起来稍复杂,主要逻辑如下:
1. 获取队列头部元素;
    1.1 若该元素为空(队列为空),则当前线程等待;
    1.2 若该元素不为空,且已经过期,则取出该元素(并移除);
        1.2.1 若未过期,且有其他线程在操作(leader 不为空),当前线程等待;
        1.2.2 若未过期,且没有其他线程操作,则占有“操作权”(将 leader 设置为当前线程),并等待延迟过期。
以上操作循环执行。

take 方法是阻塞操作,当条件不满足时会一直等待。另一个  poll(timeout, unit) 方法和它有些类似,只不过带有延时,如下:
public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // 以可中断方式获取锁 lock.lockInterruptibly(); try { // 无限循环 for (;;) { // 获取队列的头部元素 E first = q.peek();                // 若头部元素为空(即队列为空),当超时时间大于零则等待相应的时间;                //   否则(即超时时间小于等于零)返回空 if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { // 执行到这里表示队列头部元素不为空 // 获取剩余延时 long delay = first.getDelay(NANOSECONDS);                    // 延时已过期,返回队列头部元素 if (delay <= 0) return q.poll();                    // 延时未过期且等待超时,返回空 if (nanos <= 0)                        return null; first = null; // don't retain ref while waiting                    // 延时未过期且等待未超时,且等待超时<延迟时间                    // 表示有其他线程在取数据,则当前线程进入等待 if (nanos < delay || leader != null)                        nanos = available.awaitNanos(nanos); else {                        // 没有其他线程等待,将当前线程设置为 leader,类似于“独占”操作 Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); // 计算剩余延迟时间 nanos -= delay - timeLeft; } finally {                            // 该线程操作完毕,把 leader 置空 if (leader == thisThread) leader = null; } } } } } finally {            // 唤醒 available 条件下的一个其他线程 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
take 和 poll 方法还有一个区别: 当延迟未过期时,take 方法会一直等待,而 poll 方法则会返回空。

此外还有一个 peek 方法,该方法虽然也能获取队列头部的元素,但与以上出队方法不同的是,peek 方法只是读取队列头部元素,并不会将其删除:
public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { // 返回队列的头部元素(不删除) return q.peek(); } finally { lock.unlock(); }}
以上就是 DelayQueue 的主要方法的代码分析,为便于理解,下面简要举例分析。

用法举例

示例代码:
自定义一个实现了 Delayed 接口的 Task 类,并将它的几个对象添加到 一个延迟队列中,代码如下:
public class TestDelayedQueue { public static void main(String[] args) throws Exception { BlockingQueue<Task> delayQueue = new DelayQueue<>(); long now = System.currentTimeMillis(); delayQueue.put(new Task("c", now + 6000));  delayQueue.put(new Task("d", now + 10000));        delayQueue.put(new Task("a", now + 3000));        delayQueue.put(new Task("b", now + 4000));         while (true) { System.out.println(delayQueue.take()); TimeUnit.SECONDS.sleep(1); } }
private static class Task implements Delayed { private String taskName; private long endTime;
public Task(String taskName, long endTime) { this.taskName = taskName; this.endTime = endTime; }
@Override public long getDelay(TimeUnit unit) { return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }
@Override public int compareTo(Delayed o) { return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); }
@Override public String toString() { return "taskName-->" + taskName; } }}
结果会以延迟时间的顺序取出各个元素。

小结

1. DelayQueue 是一种队列,同时实现了 BlockingQueue 接口;
2. 它内部的元素有延迟时间的概念,出队时,若延时未到,则无法读取到队列头部的元素;
3. 它是线程安全的。

相关阅读:



以上是关于JDK源码分析-DelayQueue的主要内容,如果未能解决你的问题,请参考以下文章

# Java 常用代码片段

jdk源码解析--DelayQueue类

J.U.C并发框架源码阅读DelayQueue

死磕 java集合之DelayQueue源码分析

DelayQueue之源码分析

Java并发编程19DelayQueue源码分析