ArrayBlockingQueue核心源码解读

Posted gocode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ArrayBlockingQueue核心源码解读相关的知识,希望对你有一定的参考价值。

 

1 前言

队列是一种在尾部添加元素、从头部删除元素的数据结构,而阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  • ①支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。

  • ②支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

ArrayBlockingQueue继承于抽象类AbstractQueue,并实现了BlockingQueue接口,它内部使用数组来储存元素。它内部主要使用两个”条件“来实现”阻塞插入“、”阻塞移出“,这两个条件分别是”未满“、”非空“。ArrayBlockingQueue是一个有界的阻塞队列,它需要在构造方法中指定队列的容量。

技术图片

 

 

BlockingQueue接口方法说明

技术图片
public interface BlockingQueue<E> extends Queue<E> {
    //若能立即成功插入元素,则返回true,若超出容量不能插入元素,则抛出IllegalStateException
    boolean add(E e);
    //若能立即成功插入元素,则返回true,若容量已满不能插入元素,则返回false
    boolean offer(E e);
    //向队列中插入元素,若容量已满就阻塞等待
    void put(E e) throws InterruptedException;
    //向队列中插入元素,若等待给定的时长后还不能插入元素就返回false
    boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;
    //不限时的等待元素出队
    E take() throws InterruptedException;
    //超时等待元素出队,若等待超时就返回null
    E poll(long timeout, TimeUnit unit)
            throws InterruptedException;
    //返回当前的剩余容量
    int remainingCapacity();
    //从队列中删除这元素,若存在这个元素就返回true
    boolean remove(Object o);
    //当前队列中是否存在此元素
    public boolean contains(Object o);
    //一次性将所有元素出队
    int drainTo(Collection<? super E> c);
    //一次性出队指定个数元素
    int drainTo(Collection<? super E> c, int maxElements);
}
BlockingQueue

 

 注:此处的源码分析基于JDK1.8

2 成员变量与构造方法

1) 成员变量

/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
?
transient Itrs itrs = null;
  • items:保存元素的数组

  • takeIndex : 下次出队或删除元素的索引

  • putIndex:下次入队(插入)元素的索引

  • count:当前队列中的元素个数

  • lock:可重入锁,保证多线程访问时数据的一致性、可见性。

  • notEmpty: 队列”非空“的条件,出队时用到的条件(此条件与lock锁关联)

  • notFull: 队列”未满“的条件,入队时用到的条件(此条件与lock锁关联)

  • itrs:共享迭代器的状态,是迭代器Itr的辅助工具

2) 构造方法

可以看出构造方法主要涉及对items 、lock、notEmpty、notFull这四个实例变量的实例化。

构造方法中必须要指定容量,因为在数组items初始化时必须知其长度;另外还可以选择锁的公平性,默认情况使用非公平锁。

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity]; 
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();//与lock锁关联的条件
    notFull =  lock.newCondition();//与lock锁关联的条件
}
public ArrayBlockingQueue(int capacity, boolean fair,  Collection<? extends E> c) {
    this(capacity, fair);
    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) { //逐个放入
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

 

3 主要方法

1) 添加元素

add 、offer 、put 、offer这几个方法都逻辑很相似:在方法最外层使用lock锁来保证线程安全,然后判断队列是否已满,若队列未满,执行enqueue,将元素入队,而若队列已满,根据方法自身的定义决定是立即返回、或是抛出异常或阻塞等待。

public boolean add(E e) {
    return super.add(e);//父类add方法调用offer方法实现
}
//队列已满,就不添加元素,直接返回false
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
//若队列已满,就阻塞等待到队列不满时再添加元素
public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 不限时长地休眠,直到某些元素被删除时被唤醒(notFull.signal())
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
//若队列已满,就超时等待.若在超时等待返回后,队列仍是已满状态就返回false
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    checkNotNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            //休眠给定的时长nanos,但有可能因队列中某元素被删除而被提前唤醒(notFull.signal())
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

 

上面的几个方法者调用了enqueue(e)方法将元素添加到队列的尾部。

enqueue主要逻辑:先将元素e放入对数组items的对应索引处,然后唤醒一个等待”非空“条件的线程

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;//
    if (++putIndex == items.length)//防止下标越界,将putIndex重置为0
        putIndex = 0;
    count++;//元素个数加1
    //唤醒一个等待非空条件的线程(因为在添加一个元素后,队列中至少含有一个元素了,队列不再是空的)
      //这里不是signalAll(),signal()只会唤醒一个线程,这样做的目的是减少线程竞争
    notEmpty.signal();
}

 

2) 元素出队

poll、take 这3个方法都逻辑很相似:在方法最外层使用lock锁来保证线程安全,然后判断队列是否”非空“,若队列非空 ,执行dequeue将队列头部元素出队,若队列已空,根据方法自身的定义决定是立即返回还是阻塞等待。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}
?
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
        // 不限时长地休眠,直到有元素入队后被唤醒(notEmpty.signal())
            notEmpty.await();
        return dequeue();//出队
    } finally {
        lock.unlock();
    }
}
//超时版本的poll
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            //休眠给定的时长nanos,但有可能因有元素入队而被提前唤醒(notEmpty.signal())
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();//出队
    } finally {
        lock.unlock();
    }
}

 

dequeue()方法将队列的头部元素从队列中移除并返回此元素

enqueue主要逻辑:先将队列头部位置的元素引用清空,然后唤醒一个等待”未满“条件的线程。

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;//将指定索引处引用清空,元素从队列中移除
    if (++takeIndex == items.length)//防止下标越界,重置takeIndex,回到起点索引0
        takeIndex = 0;
    count--;//无数个数减1
    if (itrs != null)
        itrs.elementDequeued();
  //唤醒一个等待"未满"条件的线程(因为在删除一个元素后,队列中至少剩余一个空位,队列不再是满的)
   //这里不是signalAll(),signal()只会唤醒一个线程,在队列已满的情况下删除一个元素后,最多只能让一个元素入队,
 //所以只唤醒一个线程,能有效减少线程竞争
    notFull.signal();
    return x;
}

 

3) 删除指定元素

remove(Object)方法用于从队列中移除指定元素. 它的逻辑很简单:它循环遍历队列中的所有元素,若存在此元素就将其移除,返回true,若不存在此元素就返回false.

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {//队列中含有元素才能移除元素
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {//遍历查找队列中是否存在此元素
                if (o.equals(items[i])) {
                        //如果相等,就移除此元素
                    removeAt(i);
                    return true;
                }
                //与当前查找的元素不等,将索引位置加1,准备比较队列中的下个元素
                if (++i == items.length)//防止下标越界,i重置为零
                    i = 0;
            } while (i != putIndex);//直到重新回到队列的头部,才退出循环。
        }
        return false;
    } finally {
        lock.unlock();
    }
}

 

 removeAt() 移除指定下标处的元素。removeAt()的主要逻辑:

①如果移除的元素在队列的头部,则与dequeue的处理方式相同,这种情况下比较简单。 ②但如果移除的元素不在队列的头部,则需要将待移除元素之后的所有元素整体向前移动一个索引位,然后将下次入队位置putIndex也前移一个索引位。

void removeAt(final int removeIndex) {
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {//要移除的元素在队列的头部,与dequeue方法类似
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove
?
        // slide over all others up through putIndex.
        final int putIndex = this.putIndex;
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)//防止下标越界,重置为零
                next = 0;
            //在队列的中部删除元素,需要移除位置removeIndex之后的元素向前移动一位
            //第一次向前移动时item[removeIndex]的引用就被后继元素item[removeIndex+1]给覆盖了,
            //因此指定索引处的元素被移除了
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } else {
                //next=putIndex, 即i+1=putIndex
                //向前移的元素位置和下次添加元素的位置相同,将items[i]清空,
                //因为items[i]是唯一一个没有后继元素可覆盖它的元素
                items[i] = null;
           //删除了一个元素,所以下次添加元素位置也要前移一位,即this.putIndex要减一,
                //即this.putIndex=oldPutIndex-1=(i+1)-1=i
                this.putIndex = i;
                break;
            }
        }
        count--;//元素个数减1
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();//唤醒一个等待‘未满‘条件的线程
}

  

4) 批量出队

drainTo系列方法用于批量出队,它相对于多次单元素出队,能提高性能、减少阻塞。

 public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
 }
?
public int drainTo(Collection<? super E> c, int maxElements) {
    checkNotNull(c);
    if (c == this)
        throw new IllegalArgumentException();
    if (maxElements <= 0)
        return 0;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int n = Math.min(maxElements, count);
        int take = takeIndex;
        int i = 0;
        try {
            //从头部开始,将队列中的元素引用清空,并将这些元素添加到集合c中
            while (i < n) {
                @SuppressWarnings("unchecked")
                E x = (E) items[take];
                c.add(x);
                items[take] = null;
                if (++take == items.length)
                    take = 0;
                i++;
            }
            return n;
        } finally {
            // Restore invariants even if c.add() threw
            if (i > 0) {
                count -= i;//更新元素个数
                takeIndex = take;//更新下次出队的位置
                if (itrs != null) {
                    if (count == 0)
                        itrs.queueIsEmpty();
                    else if (i > take)
                        itrs.takeIndexWrapped();
                }
                //唤醒i个等待"未满"条件的线程
                for (; i > 0 && lock.hasWaiters(notFull); i--)
                    notFull.signal();
            }
        }
    } finally {
        lock.unlock();
    }
}

 

5) 其他方法

contains()方法返回 队列中是否存在指定元素的布尔值。它主要是从队列的头部到队列的尾部遍历所有元素,查看是否存在此元素。

public boolean contains(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;//从头部开始
            do {
                if (o.equals(items[i]))
                    return true;
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);//到尾部结束
        }
        return false;
    } finally {
        lock.unlock();
    }
}

 

size()方法返回队列中的元素个数

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return count;
    } finally {
        lock.unlock();
    }
}

 

clear()清空队列中的所有元素

public void clear() {
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int k = count;
        if (k > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                items[i] = null;
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
            takeIndex = putIndex;//入队位置和出队位置相同,也能表示队列中没有元素了
            count = 0;
            if (itrs != null)
                itrs.queueIsEmpty();
            //唤醒count个等待”未满“条件的线程
            for (; k > 0 && lock.hasWaiters(notFull); k--)
                notFull.signal();
        }
    } finally {
        lock.unlock();
    }
}

 

4 总结

技术图片

① 此阻塞队列实现的关键在于两个条件, ‘未满‘和‘非空’。在删除元素后可以唤醒等待“未满”条件的线程,而在添加元素后可以唤醒等待“非空”条件的线程。两者的唤醒条件恰好相反地对应。

②在删除一个元素或添加一个元素之后,只会唤醒等待相应条件的一个线程,在批量删除n个元素后,也只唤醒n个等待”未满“条件的线程.这样做的目的是减少线程竞争,唤醒更多的线程无助于提高入队或出队的效率,相反,多出来的线程不能继续入队或出队,它们只是加剧线程竞争而已。

 

 

 

以上是关于ArrayBlockingQueue核心源码解读的主要内容,如果未能解决你的问题,请参考以下文章

PostgreSQL 源码解读(24)- 查询语句#9(查询重写)

并发阻塞队列BlockingQueue解读

JVM源码分析之堆外内存完全解读

ArrayBlockingQueue 和LinkedBlockingQueue

ArrayBlockingQueue 和LinkedBlockingQueue

SpringBoot 核心源码解读