并发编程之ArrayBlockingQueue
Posted microhex
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程之ArrayBlockingQueue相关的知识,希望对你有一定的参考价值。
最近在学习并发编程,在看队列,今天先上一篇 ArrayBlockingArray。
文章目录
队列的概念:
是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,在表的尾端(end)进行插入操作。
和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称之为队尾,进行删除操作的端成为队头。
插入元素称之为入队,删除元素成为出队.
在列队中只允许一端插入,在另一端删除,所有只有最早进入队列的元素才能最先从队列中删除,所以队列中又称之为 先进先出(FIFO → First In First Out)
阻塞队列:
a. 支持阻塞的插入方法:意思就是当队列满时,队列会阻塞插入元素的线程,直到队列不满;
b. 支持阻塞的移除方法:意思在队列为空时,获取元素的线程会等待队列变为非空。
ArrayBlockingQueue
数组实现的有界阻塞队列,遵循FIFO原则,对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞的队列先访问。非公平性是对先等待的线程是非公平性的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的队列后访问队列。
ArrayBlockingQueue 构造方法
存在三个构造方法:
需要一个初始化容量,还可以定制可重入锁是否公平,最后还有一种是直接初始化数组内容的构造方式
// 默认需要一个容量
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 默认需要一个容量
// fair 规定可重入锁是公平还是不公平
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
// 默认初始化数组内容
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
当然,还有其它的一些操作变量:
/** items index for next take, poll, peek or remove */
// 取出元素的位置
int takeIndex;
/** items index for next put, offer, or add */
// 放入元素的位置
int putIndex;
/** Number of elements in the queue */
// 元素的个数
int count;
/** Main lock guarding all access */
// 可重入锁对象
final ReentrantLock lock;
/** Condition for waiting takes */
// 不为空的信号量
private final Condition notEmpty;
/** Condition for waiting puts */
// 队列不满的信号量
private final Condition notFull;
队列操作方法
1. add
方法
public boolean add(E e) {
return super.add(e);
}
直接看其父类:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer
方法是抽象的,可以直接查看 ArrayBlockingQueue
的实现:
可以看到, offer
方法是不能插入空值的
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
其中 enqueue
方法为,直接往数组添加数据:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
2. peek
方法
使用了可重入锁,取出数组中 takeIndex处的元素:
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
3. put
/ take
方法
是一对互斥操作,put
是向队列中放入数据,take
是向队列中取出数据, 但是和 add
/peek
的实现方式不一样,其中 put
方法为:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
然后 take
方法为:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
可以看出,put/take 方法是阻塞方法,put()
会等待队列不满仓,take()
会等待队列不为空.
4. offer
/ poll
方法
它们之间也是一对互斥操作,offer()
是放入元素,poll()
是取出元素.
offer()
上面已经贴图看过,现在主要来看一下poll()
方法:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
5. offer(E e, long timeout, TimeUnit unit)
/ poll (long timeout, TimeUnit unit)
是 offer
和 poll
的加强版. 在队列满时指定了重试的时间,如果超过指定的时间后还是无法添加或取出则返回false。
直接看 offer(E e, long timeout, TimeUnit unit)
方法:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
对于 poll (long timeout, TimeUnit unit)
方法:
在规定时间内重试,超过固定时间则直接返回null.
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
队列方法的比较
add系列方法比较:
peek系列方法比较:
其中,只有peek()
方法只返回数组指定位置的元素,但是并没有移除该元素;
poll()
、take()
和 poll()
如果在队列不为空的情况下,获取到元素时,同时也会从队列中移除该元素。
以上是关于并发编程之ArrayBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章
深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue
20.并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解