源码分析-PriorityBlockingQueue

Posted 千念飞羽

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码分析-PriorityBlockingQueue相关的知识,希望对你有一定的参考价值。

PriorityBLockingQueue-文档部分

doc文档

PriorityBlockingQueue是无界的阻塞队列。当然如果资源耗尽的看情况下也是会出现添加失败的情况。PriorityBlockingQueue提供的迭代器并不保证按照某一顺序顺序迭代所有元素(和有限队列一致,可以见另一篇博客源码分析-PriorityQueue)(它会先按数组迭代,然后再迭代漏掉的元素)。当两个元素相等的时候并不保证陷入先出顺序。所以如果需要保证先入先出顺序必须要保证

源码实现文档

这里使用了一个给予数组的堆实现,所有公共方法都知识用一个锁实现,但是在调整大小的阶段使用了一个自旋锁,以避免并发申请调整大小。这样可以避免等待的消费者的推迟,以及随之而来的元素重建问题。由于需要在申请新内存的过程中退出锁,所以不可以将任务简单的委托给一个PriorityQueue来完成。为了保证兼容性,在序列话的过程中使用了锁。为了增加保证兼容性。在序列化的过程中需要付出双倍的代价(先转换成一个PriorityQueue然后在序列化)

概述

大部分的内容都和之前的PriorityQueue类似。与PriorityQueue的区别主要体现在两点上,一个是在出队和入队的过程中增加了BlockingQueue的操作。另外一个在resize 的过程中使用了自旋锁。

    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    private transient Object[] queue;
    private transient int size;
    private transient Comparator<? super E> comparator;
    private final ReentrantLock lock;
    private final Condition notEmpty;
    private transient volatile int allocationSpinLock;
    private PriorityQueue q;

域的内容比较清楚,只做几点说明

首先一个MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;为什么要限制位Integer.MAX_VALUE-8,为什么要限制为-8。网上有人解释说这是因为有一部分虚拟机的实现是需要使用8位来储存size。在很多地方都出现了,这里做一下解释。
lock锁及其绑定的Condition是用来做阻塞操作的。因为这里是一个无界的队列所以这里只有一个状态变量。用于阻塞take。
而allocationSpinkLock是一个volatile的变量,用这个结合cas来进行自旋。

主要方法

构造器

构造器有很多,这里只看一个使用collection进行构造的构造器

    public PriorityBlockingQueue(Collection<? extends E> c) 
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; // true if not known to be in heap order
        boolean screen = true;  // true if must screen for nulls
        if (c instanceof SortedSet<?>) 
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;
        
        else if (c instanceof PriorityBlockingQueue<?>) 
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;
        
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);
        if (screen && (n == 1 || this.comparator != null)) 
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();
    

这里使用两个boolean值来判断是否满足条件。
heapify表示是否需要进行构造堆。而screen表示是否需要排除null。
然后对这依照这两个变量分情况讨论。如果是sortSet类型,说明有序,则提取其comparator。并使heapify置位(后续不需要构造堆,因为递增序列是堆的一种特殊情况),如果是Priority自身拷贝当然两者都置为false表示都不需要进行。
然后使用Collection的toArray方法转换成Object[]并进行类型检查。并对各项参数进行复制。
heapify方法和之前PriorityQueue的是一样的,对数组的前半部分元素进行下滤操作。

阻塞方法的实现

非阻塞的put方法

BlockingQueue方法的实现都大同小异这里看一下put和take的实现:

  public void put(E e) 
        offer(e); // never need to block
    

首先由于这是无界队列,所以不需要阻塞,只是讲任务委托给了offer来实现。

    public boolean offer(E e) 
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try 
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
         finally 
            lock.unlock();
        
        return true;
    

因为这里没有阻塞,所以这里的锁完全是来进行同步的,不需要条件变量。
加锁后进行大小检查,如果需要增长则调用tryGrow。
否则则将新进入的元素放入队尾,然后进行上移操作。并唤醒notEmpty条件变量。

膨胀及自旋锁

        private void tryGrow(Object[] array, int oldCap) 
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) 
            try 
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0)     // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
             finally 
                allocationSpinLock = 0;
            
        
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        lock.lock();
        if (newArray != null && queue == array) 
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        
    

首先退出锁。
首先这里申请了一个新的引用newArray。然后使用CAS自旋操作使得volatile的allocationSplinLock变成1。
对于自旋成功的线程,会计算出一个新的堆的尺寸,然后建立一个新的数组,并且在最后使得allocationSplinLock变回0.
对于自旋失败的数组,会让步,这里很好理解,因为下一步就是阻塞,所以即使这里争取到了线程也没有太大的意义。因为其newArray是空,而后退出tryGrow,但是该线程的增长并没有成功所以在offer的循环检查中(n = size) >= (cap = (array = queue).length)这里会失败所以需要重新进入tryGrow。这样做没有意义。
而yield会使得成功创建新数组的线程更容易获得锁,然后会使得queue指向新建的数组,这样对于自旋失败的线程通常来说会在自旋成功的线程之后获得锁,这样在offer中的自旋检测中成功通过,这样就避免了一些重复的操作。以提供更高的效率。
这里还需要说明的是yield,并不是100%会让出线程的,这里的yield只是让这个概率更大一些。

阻塞的take方法

    public E take() throws InterruptedException 
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try 
            while ( (result = dequeue()) == null)
                notEmpty.await();
         finally 
            lock.unlock();
        
        return result;
    

take方法则是常规的阻塞操作会重复检查notEmpty变量。
deque是典型的出队操作,类似PriorityQueue。

以上是关于源码分析-PriorityBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章

Mybatis源码分析

Spring源码分析专题——目录

ARouter源码分析

Handler源码分析

Eureka源码分析(六) TimedSupervisorTask

[Netty源码分析]ByteBuf(一)