Java并发编程19DelayQueue源码分析

Posted Leon

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程19DelayQueue源码分析相关的知识,希望对你有一定的参考价值。

DelayQueue,带有延迟元素的线程安全队列,当非阻塞从队列中获取元素时,返回最早达到延迟时间的元素,或空(没有元素达到延迟时间)。DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。DelayQueue不允许包含null元素。

领导者/追随者模式是多个工作线程轮流获得事件源集合,轮流监听、分发并处理事件的一种模式。在任意时间点,程序都仅有一个领导者线程,它负责监听IO事件。而其他线程都是追随者,它们休眠在线程池中等待成为新的领导者。当前的领导者如果检测到IO事件,首先要从线程池中推选出新的领导者线程,然后处理IO事件。此时,新的领导者等待新的IO事件,而原来的领导者则处理IO事件,二者实现了并发。

 简单理解,就是最多只有一个线程在处理,其他线程在睡眠。在DelayQueue的实现中,Leader/Followers模式用于等待队首的第一个元素。

类定义及参数:

复制代码
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    /** 重入锁,实现线程安全 */
    private final transient ReentrantLock lock = new ReentrantLock();
    /** 使用优先队列实现 */
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /** Leader/Followers模式 */
    private Thread leader = null;

    /** 条件对象,当新元素到达,或新线程可能需要成为leader时被通知 */
    private final Condition available = lock.newCondition();
复制代码

  构造函数:

复制代码
    /**
     * 默认构造,得到空的延迟队列
     */
    public DelayQueue() {}

    /**
     * 构造延迟队列,初始包含c中的元素
     *
     * @param c 初始包含的元素集合
     * @throws NullPointerException 当集合或集合任一元素为空时抛出空指针错误
     */
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }
复制代码

  add方法:

复制代码
    /**
     * 向延迟队列插入元素
     *
     * @param e 要插入的元素
     * @return true
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public boolean add(E e) {
        // 直接调用offer并返回
        return offer(e);
    }
复制代码

  offer方法:

复制代码
    /**
     * 向延迟队列插入元素
     *
     * @param e 要插入的元素
     * @return true
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        // 获得锁
        lock.lock();
        try {
            // 向优先队列插入元素
            q.offer(e);
            // 若在此之前队列为空,则置空leader,并通知条件对象,需要结合take方法看
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
复制代码

  put方法:

复制代码
    /**
     * 向延迟队列插入元素. 因为队列是无界的,所以不会阻塞。
     *
     * @param e 要插入的元素
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public void put(E e) {
        offer(e);
    }
复制代码

  带超时的offer方法:

复制代码
    /**
     * 向延迟队列插入元素. 因为队列是无界的,所以不会阻塞,因此,直接调用offer方法并返回
     *
     * @param e 要插入的元素
     * @param timeout 不会阻塞,忽略
     * @param unit 不会阻塞,忽略
     * @return true
     * @throws NullPointerException 元素为空,抛出空指针错误
     */
    public boolean offer(E e, long timeout, TimeUnit unit) {
        // 直接调用offer方法并返回
        return offer(e);
    }
复制代码

  poll方法:

复制代码
    /**
     * 获取并移除队首的元素, 或者返回null(如果队列不包含到达延迟时间的元素)
     *
     * @return 队首的元素, 或者null(如果队列不包含到达延迟时间的元素)
     */
    public E poll() {
        final ReentrantLock lock = this.lock;
        // 获得锁
        lock.lock();
        try {
            // 获取优先队列队首元素
            E first = q.peek();
            // 若优先队列队首元素为空,或者还没达到延迟时间,返回null
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            // 否则,返回并移除队首元素
            else
                return q.poll();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }
复制代码

  take方法(重要):

复制代码
    /**
     * 获取并移除队首元素,该方法将阻塞,直到队列中包含达到延迟时间的元素
     *
     * @return 队首元素
     * @throws InterruptedException 阻塞时被打断,抛出打断异常
     */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 获得锁,该锁可被打断
        lock.lockInterruptibly();
        try {
            // 循环处理
            for (;;) {
                // 获取队首元素
                E first = q.peek();
                // 若元素为空,等待条件,在offer方法中会调用条件对象的通知方法
                // 并重新进入循环
                if (first == null)
                    available.await();
                // 若元素不为空
                else {
                    // 获取延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 若达到延迟时间,返回并移除队首元素
                    if (delay <= 0)
                        return q.poll();
                    // 否则,需要进入等待
                    first = null; // 在等待时,不持有引用
                    // 若leader不为空,等待条件
                    if (leader != null)
                        available.await();
                    // 否则,设置leader为当前线程,并超时等待延迟时间
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 通知其他线程条件得到满足
            if (leader == null && q.peek() != null)
                available.signal();
             // 释放锁
            lock.unlock();
        }
    }
复制代码

  带超时的poll方法(重要):

复制代码
    /**
     * 获取并移除队首元素,该方法将阻塞,直到队列中包含达到延迟时间的元素或超时
     *
     * @return 队首元素,或者null
     * @throws InterruptedException 阻塞等待时被打断,抛出打断异常*/
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0)
                        return null;
                    else
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    if (nanos <= 0)
                        return null;
                    first = null; // don\'t retain ref while waiting
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
复制代码

  peek方法:

复制代码
    /**
     * 获取但不移除队首元素,或返回null(如果队列为空)。和poll方法不同,
     * 若队列不为空,该方法换回队首元素,不论是否达到延迟时间
     *
     * @return 队首元素,或null(如果队列为空)
     */
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }
复制代码

出处:

https://www.cnblogs.com/enumhack/p/7472873.html

https://www.cnblogs.com/wanly3643/p/3944661.html

jdk源码

        

以上是关于Java并发编程19DelayQueue源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Java Review - 并发编程_DelayQueue原理&源码剖析

J.U.C并发框架源码阅读DelayQueue

死磕 java集合之DelayQueue源码分析

java中DelayQueue的一个使用陷阱分析

DelayQueue之源码分析

并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue