java同步阻塞队列之DelayQueue实现原理,PriorityQueue原理
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java同步阻塞队列之DelayQueue实现原理,PriorityQueue原理相关的知识,希望对你有一定的参考价值。
DelayQueue是一种延迟队列,能够在指定的时间之后执行。其底层采用PriorityQueue作为底层数据结构。
在讲解DelayQueue之前,我们需要先讲解一下PriorityQueue。
PriorityQueue是一个优先级队列,在底层使用了一个可动态扩容的数组作为基础数据结构,实现了堆结构,默认是一个小顶堆。
public class PriorityQueue<E> extends AbstractQueue<E>
implements java.io.Serializable
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
transient Object[] queue; // non-private to simplify nested class access
/**
* The number of elements in the priority queue.
*/
private int size = 0;
我们主要看下其入队和出队操作:
public boolean offer(E e)
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
if (i >= queue.length)
grow(i + 1);
size = i + 1;
if (i == 0)
queue[0] = e;
else
siftUp(i, e);
return true;
private void siftUp(int k, E x)
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
private void siftUpComparable(int k, E x)
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0)
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
queue[k] = key;
public E poll()
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0];
E x = (E) queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
return result;
private void siftDown(int k, E x)
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
private void siftDownComparable(int k, E x)
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; // loop while a non-leaf
while (k < half)
int child = (k << 1) + 1; // assume left child is least
Object c = queue[child];
int right = child + 1;
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
queue[k] = key;
DelayQueue
DelayQueue的类声明结构如下
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>
表名其操作的元素必须是实现了Delayed
接口的类,该接口声明如下:
public interface Delayed extends Comparable<Delayed>
long getDelay(TimeUnit unit);
可以看到,实现该接口还必须实现Comparable
接口。
我们看下DelayQueue的出队和入队方法:
private final Condition available = lock.newCondition();
public E take() throws InterruptedException
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
for (;;)
E first = q.peek();
if (first == null)
available.await();
else
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else
Thread thisThread = Thread.currentThread();
leader = thisThread;
try
available.awaitNanos(delay);
finally
if (leader == thisThread)
leader = null;
finally
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
public boolean offer(E e)
final ReentrantLock lock = this.lock;
lock.lock();
try
q.offer(e);
if (q.peek() == e)
leader = null;
available.signal();
return true;
finally
lock.unlock();
DelayQueue内部只有一把锁以及一个该锁的条件等待。
当从队列中获取数据的时候,首先从PriorityQueue队列获取一个元素,如果该元素已经到期,那么直接返回该元素,如果该元素未到期,1. 当leader线程不为空时(这时候leader线程获取到了一个未到期的元素,但是leader线程只阻塞等待到该元素到期,到期之后就会被唤醒),那么当前线程阻塞等待,如果leader为空,表名当前没有线程去获取,那么当前线程先获取。
可以看到DelayQueue中并不是每个线程就一直等待,而是线程在获取元素之后(这时候已经获取到锁),如果元素需要一定时间才能到期,那么只让当前线程等待固定的时间就会唤醒当前线程,避免线程一直等待
以上是关于java同步阻塞队列之DelayQueue实现原理,PriorityQueue原理的主要内容,如果未能解决你的问题,请参考以下文章
java同步阻塞队列之ArrayBlockingQueue实现原理
java同步阻塞队列之LinkedBlockingQueue实现原理,和ArrayBlockingQueue对比