多线程(二十一阻塞队列-DelayQueue)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程(二十一阻塞队列-DelayQueue)相关的知识,希望对你有一定的参考价值。

DelayQueue简介

DelayQueue中的所有元素必须实现Delayed接口,还必须实现还实现了Comparable接口。

技术图片

1、DelayQueue是×××阻塞队列
2、队列中的元素必须实现Delayed接口,只有当该对象的getDalay方法返回的剩余时间≤0时才会出队。
3、剩余时间最小的元素就在堆顶,每次出队其实就是删除剩余时间≤0的最小元素。

DelayQueue构造

基于PriorityQueue的堆实现队列,保证最先失效的在堆顶。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> 

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    /**
     * leader线程是首个尝试出队元素(队列不为空)但被阻塞的线程.
     * 该线程会限时等待(队首元素的剩余有效时间),用于唤醒其它等待线程
     * 这个很关键,当队列为空或者堆顶元素还没达到出队时间时,所有线程都会阻塞的,要保证有线程可以唤醒,而leader线程就是这个首先自己唤醒自己的线程,然后才能继续唤醒其他线程。
     */
    private Thread leader = null;

    /**
     * 出队线程条件队列, 当有多个线程, 会在此条件队列上等待.
     */
    private final Condition available = lock.newCondition();

leader字段很关键,DelayQueue每次只会出队一个过期的元素,如果队首元素没有过期,就会阻塞出队线程,让线程在available这个条件队列上无限等待。

为了提升性能,DelayQueue并不会让所有出队线程都无限等待,而是用leader保存了第一个尝试出队的线程,该线程的等待时间是队首元素的剩余有效期。这样,一旦leader线程自己唤醒(此时队首元素也失效了),就可以出队成功,然后唤醒一个其它在available条件队列上等待的线程。之后,会重复上一步,新唤醒的线程可能取代成为新的leader线程。

入队-put

/**
 * 入队一个指定元素e.
 * 由于是×××队列, 所以该方法并不会阻塞线程.
 */
public void put(E e) 
    offer(e);


public boolean offer(E e) 
    final ReentrantLock lock = this.lock;
    lock.lock();
    try 
        q.offer(e);                  // 调用PriorityQueue的offer方法
                // 如果入队元素在队首, 则唤醒一个出队线程
                // 当首次入队元素时,需要唤醒一个出队线程
                // 因为此时可能已有出队线程在空队列上等待了
                //如果不唤醒,会导致出队线程永远无法执行。
        if (q.peek() == e)     
            leader = null;
            available.signal();
        
        return true;
     finally 
        lock.unlock();
    

出队-take

1.队列为空,直接阻塞出队线程,在available条件队列等待
2.队列非空,还要看队首元素的有效期,如果队首元素过期了,那直接出队就行了;如果队首元素未过期,就要看leader是否为空,如果不是,就无限等待,如果是,则自己成为leader,限时等待。

/**
 * 队首出队元素.
 * 如果队首元素(堆顶)未到期或队列为空, 则阻塞线程.
 */
public E take() throws InterruptedException 
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try 
        for (; ; )  //出队是一个自旋操作
            E first = q.peek();     // 读取队首元素
            if (first == null)      // CASE1: 队列为空, 直接阻塞
                available.await();
            else                   // CASE2: 队列非空
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)       // CASE2.0: 队首元素已过期,直接出队
                    return q.poll();

                // 执行到此处说明队列非空, 且队首元素未过期
                first = null;
                if (leader != null)           // CASE2.1: 已存在leader线程
                    available.await();      // 无限期阻塞当前线程
                else                             // CASE2.2: 不存在leader线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;    // 将当前线程置为leader线程
                    try 
                        available.awaitNanos(delay);  // 阻塞当前线程(限时等待剩余有效时间)
                     finally 
                        if (leader == thisThread)
                            leader = null;
                    
                
            
        
     finally 
        if (leader == null && q.peek() != null)  // 不存在leader线程,而且队列不空, 则唤醒一个其它出队线程,防止任务无法执行。
            available.signal();
        lock.unlock();
    

总结

1、DelayQueue是阻塞队列中非常有用的一种队列,经常被用于缓存或定时任务等的设计。
2、ScheduledThreadPoolExecutor.DelayedWorkQueue就是一种延时阻塞队列。

以上是关于多线程(二十一阻塞队列-DelayQueue)的主要内容,如果未能解决你的问题,请参考以下文章

多线程-BlockingQueue,Array[Linked]BlockingQueue,DelayQueue,PriorityBlockingQueue,SynchronousQueue

多线程案例 --- 单例模式(饿汉懒汉)阻塞式队列

多线程(二十阻塞队列-PriorityBlockingQueue)

每日一博 - DelayQueue阻塞队列源码解读

Java并发编程从入门到精通 - 第5章:多线程之间的交互:线程阀

Queue 阻塞队列 DelayQueue