前文「」分析了优先队列 PriorityQueue,它既不是阻塞队列,而且线程不安全。本文分析线程安全的阻塞优先队列 PriorityBlockingQueue。它的继承结构如下:

PriorityBlockingQueue 与 PriorityQueue 的内部结构类似,也是物理上由数组、逻辑上由堆结构实现的,并且使用 ReentrantLock 实现线程安全。除此之外,二者大部分操作都是类似的。




// 内部数组的默认初始化容量private static final int DEFAULT_INITIAL_CAPACITY = 11;
// 内部数组的最大容量private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 保存元素的内部数组private transient Object[] queue;
// 队列中元素的数量private transient int size;
// 队列中元素的比较器private transient Comparator<? super E> comparator;
// 互斥锁(保证线程安全)private final ReentrantLock lock;
// 表示队列非空的条件private final Condition notEmpty;
// 扩容时使用的自旋锁,通过 CAS 获取(后面分析)private transient volatile int allocationSpinLock;
// 一个普通的优先队列,主要用于序列化和反序列化private PriorityQueue<E> q;


// 构造器 1:使用默认的初始化容量创建一个对象public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null);}
// 构造器 2:使用给定的容量创建一个对象public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null);}
// 构造器 3:使用给定的容量和比较器创建一个对象public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1)        throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity];}


public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition();    // 是否需要堆化 boolean heapify = true; // true if not known to be in heap order    // 是否需要筛选空值 boolean screen = true; // true if must screen for nulls    // 给定集合为 SortedSet if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; // 已经有序,不需要再堆化 } // 给定集合为 PriorityBlockingQueue else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; // 不需要筛选判空 if (pq.getClass() == PriorityBlockingQueue.class) // exact match            heapify = false// 不需要堆化 }    // 集合转为数组 Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class);    // 集合内所有元素都不能为空 if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a;    this.size = n; if (heapify) heapify(); // 堆化}

堆化操作 heapify 代码如下:

private void heapify() { Object[] array = queue; int n = size; int half = (n >>> 1) - 1; Comparator<? super E> cmp = comparator;    // 根据比较器(Comparator)是否为空,采用不同的策略    // PS: 二者操作基本一样,只是 Comparator 和 Comparable 的区别 if (cmp == null) { for (int i = half; i >= 0; i--) siftDownComparable(i, (E) array[i], array, n); } else { for (int i = half; i >= 0; i--) siftDownUsingComparator(i, (E) array[i], array, n, cmp); }}

siftDownUsingComparator 代码如下:

private static <T> void siftDownUsingComparator(int k, T x, Object[] array, int n, Comparator<? super T> cmp) { if (n > 0) { // 数组的中间位置 int half = n >>> 1; while (k < half) { // 获取索引为 k 的节点左子节点索引 int child = (k << 1) + 1; // 获取 child 的值 Object c = array[child]; // 获取索引为 k 的节点右子节点索引 int right = child + 1; // 比较左右子节点的值,取较小的一个 if (right < n && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; // 给定的元素 x 与其较小的子节点的值比较,若 x 不大于子节点的值,停止交换 if (cmp.compare(x, (T) c) <= 0) break; // 将 x 与其较小的子节点互换位置 array[k] = c; k = child; } array[k] = x; }}

该方法与 PriorityQueue 中的 siftDownUsingComparator 方法操作几乎完全一致,可参考前文的分析,这里不再赘述(siftDownComparable 方法亦是如此)。

入队方法:add(E), put(E), offer(E, timeout, TimeUnit), offer(E)

public boolean add(E e) { return offer(e);}
public void put(E e) { offer(e); // never need to block}
public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); // never need to block}

上述三个方法内部都是通过 offer(e) 方法实现的,因此只需分析 offer(e) 方法即可:

public boolean offer(E e) { // 插入元素不能为空 if (e == null) throw new NullPointerException();    // 获取锁,保证线程安全 final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array;    // 如果容量不够,则进行扩容(注意这里是一个循环)    while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); // 尝试扩容 try { Comparator<? super E> cmp = comparator;        // 根据 Comparator 是否为空采用不同的堆化策略 if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); size = n + 1;        // 有新元素插入了,唤醒 notEmpty 条件下等待的线程(消费者) notEmpty.signal(); } finally {        // 释放锁 lock.unlock(); } return true;}

下面分析一下扩容操作 tryGrow:

private void tryGrow(Object[] array, int oldCap) {   // 释放锁 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null;    // 尝试以 CAS 方式修改 allocationSpinLock 的值(将 0 改为 1) if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try {            // 若旧容量 n 较小(小于 64),则扩容为 2 * n + 2,否则扩容为 1.5 * n int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } // 创建一个新数组 if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { // 将 allocationSpinLock 重置为 0 allocationSpinLock = 0; } } // newArray 为空表示未进行上述扩容操作,则当前线程让出 CPU 时间 if (newArray == null) // back off if another thread is allocating Thread.yield();    // 尝试获取锁 lock.lock();    // 到这里表示扩容成功    // queue == array 保证老数据复制一次 if (newArray != null && queue == array) {        // 扩容后的新数组        queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); }}


1. 为什么刚开始要释放锁?


2. 释放锁之后如何保证线程安全?

这就用到了成员变量 allocationSpinLock,使用了 Unsafe 类的 CAS 操作。它尝试将 allocationSpinLock 的值设置为 1,而一旦操作成功,其他线程就无法进入,直到该线程将它重置为 0. 这就保证了同一时间内只能有一个线程在扩容。

3. 在释放锁后的扩容操作中,先后可能会有多个线程扩容,也即会产生多个新容量的空数组此时它们都未指向原先的数组 queue),如何避免老数据多次复制到新数组呢?

代码里用到了 queue == array 这个判断。

比如线程 T1 和 T2 都对原数组进行了扩容,得到了两个 newArray,在后面复制老数据时,若其中一个线程已经对 queue 重新赋值并复制后,由于 queue 已经改变,后面的线程就不会再复制一次了。

出队方法:poll(), take(), peek()

// 出队public E poll() { final ReentrantLock lock = this.lock; lock.lock();    try { return dequeue(); } finally { lock.unlock(); }}// 出队(队列为空时阻塞)public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result;}
// 有超时等待的出队public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { lock.unlock(); } return result;}

可以看到这几个出队的操作都加了锁,内部都调用了 dequeue 方法:

private E dequeue() {    int n = size - 1; if (n < 0) return null; else { Object[] array = queue; // 取数据中的第一个元素 E result = (E) array[0]; // 获取最后一个元素 E x = (E) array[n];        // 将最后一个元素置空,并恢复堆结构 array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; }}

该方法与 PriorityQueue 的出队操作 poll() 类似,也不再赘述。


1. PriorityBlockingQueue 是优先队列的阻塞方式实现,它与 PriorityQueue 内部结构类似,即物理结构是可变数组、逻辑结构是堆;

2. PriorityBlockingQueue 内部元素不能为空,且可比较,使用 ReentrantLock 保证线程安全。





