Java数据结构及算法实战系列009:Java队列03——数组实现的阻塞队列ArrayBlockingQueue

Posted _waylau

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java数据结构及算法实战系列009:Java队列03——数组实现的阻塞队列ArrayBlockingQueue相关的知识,希望对你有一定的参考价值。

顾名思义,ArrayBlockingQueue是基于数组实现的有界阻塞队列。该队列对元素进行FIFO排序。队列的首元素是在该队列中驻留时间最长的元素。队列的尾部是在该队列中停留时间最短的元素。新的元素被插入到队列的尾部,队列检索操作获取队列头部的元素。

 

ArrayBlockingQueue是一个经典的“有界缓冲区(bounded buffer)”,其中内部包含了一个固定大小的数组,用于承载包含生产者插入的和消费者提取的元素。ArrayBlockingQueue的容量一旦创建,不可更改。试图将一个元素放入一个满队列将导致操作阻塞;试图从空队列中取出一个元素也同样会阻塞。

 

ArrayBlockingQueue支持排序的可选公平策略,用于等待生产者和消费者线程。默认情况下,不保证此顺序。然而,一个由公平性设置为true构造的队列允许线程以FIFO顺序访问。公平性一般会降低吞吐量,但可以减少可变性,避免线程饿死。

 

ArrayBlockingQueue类及其迭代器实现了Collection和Iterator接口的所有可选方法。ArrayBlockingQueue是Java Collections Framework的一个成员。

 

 

1.   ArrayBlockingQueue的声明

ArrayBlockingQueue的接口和继承关系如下

 

public class ArrayBlockingQueue<E> extends AbstractQueue<E>

        implements BlockingQueue<E>, java.io.Serializable

 

   …

 

 

完整的接口继承关系如下图所示。

 

 

 

 

 

从上述代码可以看出,ArrayBlockingQueue既实现了BlockingQueue<E>和java.io.Serializable接口,又继承了java.util.AbstractQueue<E>。其中,AbstractQueue是Queue接口的抽象类,核心代码如下。

 

 

package java.util;

 

public abstract class AbstractQueue<E>

    extends AbstractCollection<E>

    implements Queue<E>

 

    protected AbstractQueue()

   

 

    public boolean add(E e)

        if (offer(e))

            return true;

        else

            throw new IllegalStateException("Queue full");

   

 

    public E remove()

        E x = poll();

        if (x != null)

            return x;

        else

            throw new NoSuchElementException();

   

 

    public E element()

        E x = peek();

        if (x != null)

            return x;

        else

            throw new NoSuchElementException();

   

 

    public void clear()

        while (poll() != null)

            ;

   

 

    public boolean addAll(Collection<? extends E> c)

        if (c == null)

            throw new NullPointerException();

        if (c == this)

            throw new IllegalArgumentException();

        boolean modified = false;

        for (E e : c)

            if (add(e))

                modified = true;

        return modified;

   

 

 

 

2.   ArrayBlockingQueue的成员变量和构造函数

 

 

以下是ArrayBlockingQueue的构造函数和成员变量。

 

    // 元素数组

    final Object[] items;

 

    // 消费索引,用于takepollpeekremove操作

    int takeIndex;

 

    // 生产索引,用于putofferadd操作

    int putIndex;

 

    // 队列中的元素个数

    int count;

 

    /*

     * 使用经典的双条件算法(two-condition algorithm)实现并发控制

     */

 

    // 操作数组确保原子性的锁

    final ReentrantLock lock;

 

    // 数组非空,唤醒消费者

    private final Condition notEmpty;

 

    // 数组非满,唤醒生产者

private final Condition notFull;

 

// 迭代器状态

transient Itrs itrs;

 

public ArrayBlockingQueue(int capacity)

        this(capacityfalse);

   

 

    public ArrayBlockingQueue(int capacityboolean fair)

        if (capacity <= 0)

            throw new IllegalArgumentException();

        this.items = new Object[capacity];

        lock = new ReentrantLock(fair);

        notEmpty = lock.newCondition();

        notFull =  lock.newCondition();

   

 

    public ArrayBlockingQueue(int capacityboolean fair,

                              Collection<? extends E> c)

        this(capacityfair);

 

        final ReentrantLock lock = this.lock;

        lock.lock(); // 只锁可见,不互斥

        try 

            final Object[] items = this.items;

            int i = 0;

            try 

                for (E e : c)

                    items[i++] = Objects.requireNonNull(e);

             catch (ArrayIndexOutOfBoundsException ex)

                throw new IllegalArgumentException();

           

            count = i;

            putIndex = (i == capacity) ? 0 : i;

         finally 

            lock.unlock();  // 解锁

       

 

 

从上述代码可以看出,构造函数有三种。构造函数中的参数含义如下

 

l  capacity用于设置队列容量

l  fair用于设置访问策略,如果为true,则对线程的访问在插入或移除时被阻塞,则按FIFO顺序处理;如果为false,则访问顺序未指定

l  c用于设置最初包含给定集合的元素,按集合迭代器的遍历顺序添加

 

类成员items是一个数组,用于存储队列中的元素。关键字final指明了,当ArrayBlockingQueue构造完成之后,通过new Object[capacity]的方式初始化items数组完成后,则后续items的容量将不再变化。

 

访问策略是通过ReentrantLock来实现的。通过两个加锁条件notEmpty、notFull来实现并发控制。这是典型的双条件算法(two-condition algorithm)。

 

ArrayBlockingQueue生产则增加putIndex,消费则增加takeIndex。

 

Itrs用于记录当前活动迭代器的共享状态,如果已知不存在任何迭代器,则为null。 允许队列操作更新迭代器状态。迭代器状态不是本节的重点,不再深入探讨。

3.   ArrayBlockingQueue的核心方法

以下对ArrayBlockingQueue常用核心方法的实现原理进行解释。

 

 

3.1.     offer(e)

执行offer(e)方法后有两种结果

 

l  队列未满时,返回 true

l  队列满时,返回 false

 

ArrayBlockingQueue的offer (e)方法源码如下:

 

public boolean offer(E e)

        Objects.requireNonNull(e);

        final ReentrantLock lock = this.lock;

        lock.lock(); // 加锁

        try 

            if (count == items.length)

                return false;

            else 

                enqueue(e); // 入队

                return true;

           

         finally 

            lock.unlock();  // 解锁

       

 

 

从上面代码可以看出,执行offer(e)方法时,分为以下几个步骤:

 

l  为了确保并发操作的安全先做了加锁处理。

l  而后判断count是否与数组items的长度一致,如果一致则证明队列已经满了,直接返回false;否则执行enqueue(e)方法做元素的入队,并返回true。

l  最后解锁。

 

enqueue(e)方法源码如下:

 

 

private void enqueue(E e)

        final Object[] items = this.items;

        items[putIndex] = e;

        if (++putIndex == items.lengthputIndex = 0;

        count++;

        notEmpty.signal(); // 唤醒等待中的线程

 

上面代码比较简单,在当前索引(putIndex)位置放置待入队的元素,而后putIndex和count分别递增,并通过signal()方法唤醒等待中的线程。其中一个注意点是,当putIndex 等于数组items长度时,putIndex置为0

 

思考:当putIndex 等于数组items长度时,putIndex为什么置为0呢?

 

3.2.     put(e)

执行put(e)方法后有两种结果:

•      

l  队列未满时,直接插入没有返回值

l  队列满时,会阻塞等待,一直等到队列未满时再插入

 

ArrayBlockingQueue的put (e)方法源码如下:

 

public void put(E ethrows InterruptedException

        Objects.requireNonNull(e);

        final ReentrantLock lock = this.lock;

        lock.lockInterruptibly();  // 获取锁

        try 

            while (count == items.length)

                notFull.await();  // 使线程等待

            enqueue(e);  // 入队

Java数据结构及算法实战系列009:Java队列03——数组实现的阻塞队列ArrayBlockingQueue

Java数据结构及算法实战系列008:Java队列02——阻塞队列BlockingQueue

Java数据结构及算法实战系列008:Java队列02——阻塞队列BlockingQueue

Java 数据结构及算法实战系列 013:Java队列07——双端队列Deque

Java 数据结构及算法实战系列 013:Java队列07——双端队列Deque

Java 数据结构及算法实战系列 013:Java队列07——双端队列Deque