Java数据结构及算法实战系列009:Java队列03——数组实现的阻塞队列ArrayBlockingQueue
Posted _waylau
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java数据结构及算法实战系列009:Java队列03——数组实现的阻塞队列ArrayBlockingQueue相关的知识,希望对你有一定的参考价值。
顾名思义,ArrayBlockingQueue是基于数组实现的有界阻塞队列。该队列对元素进行FIFO排序。队列的首元素是在该队列中驻留时间最长的元素。队列的尾部是在该队列中停留时间最短的元素。新的元素被插入到队列的尾部,队列检索操作获取队列头部的元素。
ArrayBlockingQueue是一个经典的“有界缓冲区(bounded buffer)”,其中内部包含了一个固定大小的数组,用于承载包含生产者插入的和消费者提取的元素。ArrayBlockingQueue的容量一旦创建,不可更改。试图将一个元素放入一个满队列将导致操作阻塞;试图从空队列中取出一个元素也同样会阻塞。
ArrayBlockingQueue支持排序的可选公平策略,用于等待生产者和消费者线程。默认情况下,不保证此顺序。然而,一个由公平性设置为true构造的队列允许线程以FIFO顺序访问。公平性一般会降低吞吐量,但可以减少可变性,避免线程饿死。
ArrayBlockingQueue类及其迭代器实现了Collection和Iterator接口的所有可选方法。ArrayBlockingQueue是Java Collections Framework的一个成员。
1. ArrayBlockingQueue的声明
ArrayBlockingQueue的接口和继承关系如下
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
…
完整的接口继承关系如下图所示。
从上述代码可以看出,ArrayBlockingQueue既实现了BlockingQueue<E>和java.io.Serializable接口,又继承了java.util.AbstractQueue<E>。其中,AbstractQueue是Queue接口的抽象类,核心代码如下。
package java.util;
public abstract class AbstractQueue<E>
extends AbstractCollection<E>
implements Queue<E>
protected AbstractQueue()
public boolean add(E e)
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
public E remove()
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
public E element()
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
public void clear()
while (poll() != null)
;
public boolean addAll(Collection<? extends E> c)
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
boolean modified = false;
for (E e : c)
if (add(e))
modified = true;
return modified;
2. ArrayBlockingQueue的成员变量和构造函数
以下是ArrayBlockingQueue的构造函数和成员变量。
// 元素数组
final Object[] items;
// 消费索引,用于take、poll、peek或remove操作
int takeIndex;
// 生产索引,用于put、offer或add操作
int putIndex;
// 队列中的元素个数
int count;
/*
* 使用经典的双条件算法(two-condition algorithm)实现并发控制
*/
// 操作数组确保原子性的锁
final ReentrantLock lock;
// 数组非空,唤醒消费者
private final Condition notEmpty;
// 数组非满,唤醒生产者
private final Condition notFull;
// 迭代器状态
transient Itrs itrs;
public ArrayBlockingQueue(int capacity)
this(capacity, false);
public ArrayBlockingQueue(int capacity, boolean fair)
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c)
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // 只锁可见,不互斥
try
final Object[] items = this.items;
int i = 0;
try
for (E e : c)
items[i++] = Objects.requireNonNull(e);
catch (ArrayIndexOutOfBoundsException ex)
throw new IllegalArgumentException();
count = i;
putIndex = (i == capacity) ? 0 : i;
finally
lock.unlock(); // 解锁
从上述代码可以看出,构造函数有三种。构造函数中的参数含义如下
l capacity用于设置队列容量
l fair用于设置访问策略,如果为true,则对线程的访问在插入或移除时被阻塞,则按FIFO顺序处理;如果为false,则访问顺序未指定
l c用于设置最初包含给定集合的元素,按集合迭代器的遍历顺序添加
类成员items是一个数组,用于存储队列中的元素。关键字final指明了,当ArrayBlockingQueue构造完成之后,通过new Object[capacity]的方式初始化items数组完成后,则后续items的容量将不再变化。
访问策略是通过ReentrantLock来实现的。通过两个加锁条件notEmpty、notFull来实现并发控制。这是典型的双条件算法(two-condition algorithm)。
ArrayBlockingQueue生产则增加putIndex,消费则增加takeIndex。
Itrs用于记录当前活动迭代器的共享状态,如果已知不存在任何迭代器,则为null。 允许队列操作更新迭代器状态。迭代器状态不是本节的重点,不再深入探讨。
3. ArrayBlockingQueue的核心方法
以下对ArrayBlockingQueue常用核心方法的实现原理进行解释。
3.1. offer(e)
执行offer(e)方法后有两种结果
l 队列未满时,返回 true
l 队列满时,返回 false
ArrayBlockingQueue的offer (e)方法源码如下:
public boolean offer(E e)
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
try
if (count == items.length)
return false;
else
enqueue(e); // 入队
return true;
finally
lock.unlock(); // 解锁
从上面代码可以看出,执行offer(e)方法时,分为以下几个步骤:
l 为了确保并发操作的安全先做了加锁处理。
l 而后判断count是否与数组items的长度一致,如果一致则证明队列已经满了,直接返回false;否则执行enqueue(e)方法做元素的入队,并返回true。
l 最后解锁。
enqueue(e)方法源码如下:
private void enqueue(E e)
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal(); // 唤醒等待中的线程
上面代码比较简单,在当前索引(putIndex)位置放置待入队的元素,而后putIndex和count分别递增,并通过signal()方法唤醒等待中的线程。其中一个注意点是,当putIndex 等于数组items长度时,putIndex置为0。
思考:当putIndex 等于数组items长度时,putIndex为什么置为0呢?
3.2. put(e)
执行put(e)方法后有两种结果:
•
l 队列未满时,直接插入没有返回值
l 队列满时,会阻塞等待,一直等到队列未满时再插入
ArrayBlockingQueue的put (e)方法源码如下:
public void put(E e) throws InterruptedException
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 获取锁
try
while (count == items.length)
notFull.await(); // 使线程等待
enqueue(e); // 入队
Java数据结构及算法实战系列009:Java队列03——数组实现的阻塞队列ArrayBlockingQueue
Java数据结构及算法实战系列008:Java队列02——阻塞队列BlockingQueue
Java数据结构及算法实战系列008:Java队列02——阻塞队列BlockingQueue
Java 数据结构及算法实战系列 013:Java队列07——双端队列Deque