JUC源码分析-集合篇PriorityBlockingQueue

Posted binarylei

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC源码分析-集合篇PriorityBlockingQueue相关的知识,希望对你有一定的参考价值。

JUC源码分析-集合篇(七)PriorityBlockingQueue

PriorityBlockingQueue 是带优先级的无界阻塞队列,每次出队都返回优先级最高的元素,是二叉树最小堆的实现。 PriorityBlockingQueue 数据结构和 PriorityQueue 一致,而线程安全性使用的是 ReentrantLock。

1. 基本属性

// 最大可分配队列容量 Integer.MAX_VALUE - 8,减 8 是因为有的 VM 实现在数组头有些内容
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默认队列容量11,这里不是 HashMap,不需要 hash 取余,因此不必是 2^n
private static final int DEFAULT_INITIAL_CAPACITY = 11;

// 数组结构,是二叉树最小堆的实现
private transient Object[] queue;
private transient int size;

PriorityBlockingQueue 使用 ReentrantLock 保证数据安全性,数据结构使用的是数组。PriorityBlockingQueue 数组的结构和 PriorityQueue 一致,是基于平衡二叉堆实现,父节点下标是 n,左节点则是 2n + 1,右节点是 2n + 2。queue[0] 永远都是最小的元素。

public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

2. 入队 offer

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // 1. 扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator<? super E> cmp = comparator;
        // 2. 将节点 e 插入数据 array 的第 n 个位置
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}

代码很简单,添加元素时 offer 做了两件事:一是判断是否需要扩容(tryGrow),二是将元素 e 插入到数组中(siftUpComparable)。先看一下如何进行扩容的,至于元素添加在 poll 时再一起分析。

// 集合中元素个数 size>=queue.length 则进行扩容
while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);

// tryGrow 最终只有一个线程能扩容成功,其它线程通过 while 自旋检查当前扩容是否完毕
private void tryGrow(Object[] array, int oldCap) {
    // 1. 释放锁,这样在数组扩容期间其它线程可以正常出队
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // 2. allocationSpinLock 是数组扩容的独占锁
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            // 3.1 如果 oldCap=MAX_ARRAY_SIZE 则 newCap 就会变成负数
            // 3.2 如果 queue 已经改变,则有其它线程已经完成扩容 ok
            //     线程1已经完成扩容,线程2执行到这里时 queue=newArray
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    // 4. 线程1 cas 成功后,线程2会进入这个地方,然后线程2让出 cpu
    //    尽量让线程1执行下面代码获取锁,但是这得不到肯定的保证。
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();

    // 5. 重新获取锁,只有一个线程可以最终完成数组的扩容。
    //    cas 只进行了数组的初始化,即 newArray=new Object[newCap],可能有多个线程都成功了
    lock.lock();
    // 6. 数组元素拷贝到新数组中,完成扩容。可能有多个线程都初始化了 newArray
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

tryGrow 目的是扩容,这里要思考下为啥在扩容前要先释放锁,然后使用 cas 控制只有一个线程可以扩容成功呢?

其实这里不先释放锁也是可以的,也就是在整个扩容期间一直持有锁,但是扩容是需要花时间的,如果扩容的时候还占用锁,那么其他线程在这个时候是不能进行出队和入队操作的,这大大降低了并发性。

所以在扩容前释放锁,这允许其他出队线程可以进行出队操作,但是由于释放了锁,所以也允许在扩容时候进行入队操作,这就会导致多个线程进行扩容会出现问题,所以这里使用了一个 spinlock 用 cas 控 制只有一个线程可以进行扩容,失败的线程调用 Thread.yield() 让出 cpu,目的意在让扩容线程扩容后优先调用 lock.lock 重新获取锁,但是这得不到一定的保证,有可能调用 Thread.yield() 的线程先获取了锁。如果这时候扩容线程还没扩容完毕,其他线程是通过自旋检查当前扩容是否完毕。

那 copy 元素数据到新数组为啥放到获取锁后面那?原因应该是因为可见性问题,因为 queue 并没有被 volatile 修饰。另外有可能在扩容时候进行了出队操作,如果直接拷贝可能看到的数组元素不是最新的。而通过调用 Lock 后,获取的数组则是最新的,并且在释放锁前 数组内容不会变化。

3. 出队 poll

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}
// 出队
private E dequeue() {
    int n = size - 1;
    // 1. 没有元素直接返回 null
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        // 2. array[0] 永远都是最小的元素
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        // 3. 因为 array[0] 已经出队,现在需要将元素 array[n] 插入到 0 这个位置
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        // 4. 返回 array[0]
        return result;
    }
}

4. 数据结构

// 数组结构,是二叉树最小堆的实现,array[0] 永远是优先级最高的元素
private transient Object[] queue;

// offer 时将元素 e 插入到节点 n 位置
siftUpComparable(n, e, array);
// poll 时将最后一个元素 array[n] 插入到 0 位置
siftDownComparable(0, x, array, n);

(1) siftUpComparable

// 将元素 x 插入数据 array 的第 k 个位置
private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

这个排序看过去有些奇怪,怎么有 parent,并且下标是 (k-1)>>>1 呢?其实这里的操作就反应了优先队列的真正数据结构,其实际上是一个二叉树,将二叉树存储在数组之中而已。根节点就是数组的 0 位。下图给出其具体结构:

技术图片

PriorityQueue 是一个完全二叉树,且不允许出现 null 节点,其父节点都比叶子节点小,这个是堆排序中的小顶堆。二叉树存入数组的方式很简单,就是从上到下,从左到右。完全二叉树可以和数组中的位置一一对应:

  • 左叶子节点 = 父节点下标 * 2 + 1
  • 右叶子节点 = 父节点下标 * 2 + 2
  • 父节点 = (叶子节点 - 1) / 2

现在在看 siftUpComparable 代码就轻松多了,实际上就是将要插入的元素 x 和它的父节点做对比,如果比父节点大就一直向上移动。因为比较后元素是在向上移动,所以叫 siftUpComparable

(2) siftDownComparable

// 将元素 x 插入数据 array 的第 k 个位置,n 表示当前数组的最后一个位置
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child];
            int right = child + 1;
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                c = array[child = right];
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

siftDownComparable(0, x, array, n) 将元素 x 和第 0 位的左右子节点进行比较,如果 x 大于这两个子节点则向下移动,小的子节点则上移。这样 array[0] 又变成最小的值了。

参考:

  1. 并发队列-无界阻塞优先级队列PriorityBlockingQueue原理探究
  2. Java之集合(六)PriorityQueue

每天用心记录一点点。内容也许不重要,但习惯很重要!

以上是关于JUC源码分析-集合篇PriorityBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章

JUC源码分析-集合篇ConcurrentLinkedQueue

JUC源码分析16-集合-ConcurrentSkipListMapConcurrentSkipListSet

JUC学习之线程安全集合类

Java多线程系列--“JUC集合”02之 CopyOnWriteArrayList

Java多线程系列--“JUC集合”03之 CopyOnWriteArraySet

Java多线程系列--“JUC集合”05之 ConcurrentSkipListMap