JDK1.8 ArrayBlockingQueue类说明
Posted zhujm320
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK1.8 ArrayBlockingQueue类说明相关的知识,希望对你有一定的参考价值。
介绍
ArrayBlockingQueue是一种基于数组实现的阻塞队列,它实现了BlockingQueue的接口,线程安全。为了实现队列的入队和出队更高的效率,采用了环形队列的形式,环形队列在入队和出队时的时间复杂度为O(1)。ArrayBlockingQueue初始化时,需要指定队列的大小,属于有界队列。
关于队列的实现原理请参考 队列实现原理和JDK1.8 BlockingQueue接口说明
功能解读
成员属性
/** 队列元素存放数组 */
final Object[] items;
/** 下一次取元素位置 */
int takeIndex;
/** 下一次放元素位置 */
int putIndex;
/** 元素个数 */
int count;
/** 往队列放元素和取元素用的锁 */
final ReentrantLock lock;
/** Condition for waiting takes 用来实现生产和消费者模型*/
private final Condition notEmpty;
/** Condition for waiting puts 用来实现生产和消费者模型*/
private final Condition notFull;
/**
* 迭代器
*/
transient Itrs itrs = null;
构造函数
/*
* @param capacity 队列的大小
*
*/
public ArrayBlockingQueue(int capacity)
this(capacity, false);
/*
* @param capacity 队列的大小
* @param fair true 表示入队或者出队使用公平锁; false 表示非公平锁 (默认情况一般非公平锁,效率更高)
*/
public ArrayBlockingQueue(int capacity, boolean fair)
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
/*
* @param capacity 队列的大小
* @param fair true 表示入队或者出队使用公平锁; false 表示非公平锁 (默认情况一般非公平锁,效率更高)
* @param c 给定一个集合初始化队列
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c)
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try
int i = 0;
try
for (E e : c)
checkNotNull(e);
items[i++] = e;
catch (ArrayIndexOutOfBoundsException ex)
throw new IllegalArgumentException();
count = i;
putIndex = (i == capacity) ? 0 : i;
finally
lock.unlock();
为什么在构造函数中加了锁呢,为了解决可见性问题,即禁止指令重排。在多线程使用的情况下,由于指令重排很有可能一个线程初始化构造函数还未完成的情况下,另外一个线程进行入队和出队操作,这样在没有加锁的情况下,容易导致数组数据错乱。从作者的设计思路上看,items数组是临界资源,任何对items的操作都需要上锁,确保数据的一致性。
入队操作
public boolean add(E e)
return super.add(e);
实际调用是offer(E e)
public boolean offer(E e)
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try
if (count == items.length)
return false;
else
enqueue(e);
return true;
finally
lock.unlock();
非阻塞入队操作,获取锁,如果队列满则返回false,反之则元素入队
private void enqueue(E x)
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) //环形队列
putIndex = 0; //入队标记指向对头
count++;
notEmpty.signal(); //唤醒出队线程,队列已经不为空了,让它们醒来执行出队操作
具体的入队操作,将元素插入到队尾,插入完毕后,通过notEmpty通知消费者,队列非空,可以拿数据了。这里元素数组臆想为一个环,当putIndex == items.length插入元素索引达到数组最后一个值时,此时将putIndex指向数组头。
public void put(E e) throws InterruptedException
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
while (count == items.length)
notFull.await(); //队列满,该线程阻塞,直到队列非满为止,同时会释放lock,以便其他线程获取锁
enqueue(e);
finally
lock.unlock();
阻塞型入队操作,若队列为满,则阻塞该线程,直到队列不为满才返回
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
while (count == items.length)
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos); //线程休眠nanos毫秒
enqueue(e); //具体入队操作
return true;
finally
lock.unlock();
非阻塞入队,如果队列为满,该线程等待指定的时间,若指定时间内,队列还是满,则返回false
出队操作
private E dequeue()
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0; //环形队列,重置出队标记
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal(); //唤醒入队线程,注意此时入队线程并未获取到锁,直到出队线程释放锁后,才可以进行入队操作
return x;
具体出队操作
public E poll()
final ReentrantLock lock = this.lock;
lock.lock(); //获取锁
try
return (count == 0) ? null : dequeue(); //队列为空, 直接返回null
finally
lock.unlock(); //释放锁
非阻塞出队操作,队列为空,直接返回null,反之进行出队操作
public E take() throws InterruptedException
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
while (count == 0)
notEmpty.await(); //队列为空,队列线程阻塞,同时释放锁
return dequeue();
finally
lock.unlock();
阻塞式出队操作
public E poll(long timeout, TimeUnit unit) throws InterruptedException
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
while (count == 0)
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);//等待nanos毫秒,若队列非空则出队,反之返回null
return dequeue();
finally
lock.unlock();
半阻塞式出队,在指定时间内,如果队列非空则出队,反之直接返回null
查看操作
public E peek()
final ReentrantLock lock = this.lock;
lock.lock();
try
return itemAt(takeIndex); // null when queue is empty
finally
lock.unlock();
查看队头元素,如果队列非空返回具体元素,反之返回null
查看队列容量
public int size()
final ReentrantLock lock = this.lock;
lock.lock(); //获取锁
try
return count;
finally
lock.unlock(); //释放锁
获取队列元素个数
public int remainingCapacity()
final ReentrantLock lock = this.lock;
lock.lock();
try
return items.length - count;
finally
lock.unlock();
获取队列空闲大小
删除操作
/**
* Deletes item at array index removeIndex.
* Utility for remove(Object) and iterator.remove.
* Call only when holding lock.
*/
void removeAt(final int removeIndex)
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
if (removeIndex == takeIndex)
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
else
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
for (int i = removeIndex;;)
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex)
items[i] = items[next];
i = next;
else
items[i] = null;
this.putIndex = i;
break;
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
notFull.signal();
删除队列中指定index的元素,具体操作如下:
-
将index+1的元素赋值给index,以此类推直到putIndex(将putIndex赋值置putIndex-1)
-
将putIndex值置null
-
操作过程中注意队列队列为环形队列,在next == items.length时,需要将next指向队头,即next = 0
-
调用此函数前,需要获取锁
public boolean remove(Object o)
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try
if (count > 0)
final int putIndex = this.putIndex;
int i = takeIndex;
do
if (o.equals(items[i]))
removeAt(i);
return true;
if (++i == items.length)
i = 0;
while (i != putIndex);
return false;
finally
lock.unlock();
删除元素,根据元素值将其从队列中删除,具体调用removeAt
是否包含否元素
public boolean contains(Object o)
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try
if (count > 0)
final int putIndex = this.putIndex;
int i = takeIndex;
do
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
while (i != putIndex);
return false;
finally
lock.unlock();
获取锁,然后遍历队列items数组,如果元素在队列中返回true,反之返回false
队列转化成数组
public Object[] toArray()
Object[] a;
final ReentrantLock lock = this.lock;
lock.lock();
try
final int count = this.count;
a = new Object[count];
int n = items.length - takeIndex; //从takeIndex到items.length的空间
if (count <= n)
System.arraycopy(items, takeIndex, a, 0, count);
else
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
finally
lock.unlock();
return a;
将队列转化为数组,注意:
int n = items.length - takeIndex;
即takeIndex到length的空间,而count是队列的大小,它们的关系如下图:
-
count < n的情况
此时只需要将takeIndex~takeIndex+count的元素拷到新的数组接口
2.count >n的情况,说明putIndex必然小于takeIndex,需要环形队列来考虑
此时需要将count1和count2两部分元素拷贝到新的数组中
count1=items.length - takeIndex
count2=puntIndex-1=count-n
public <T> T[] toArray(T[] a)
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try
final int count = this.count;
final int len = a.length;
if (len < count)
a = (T[])java.lang.reflect.Array.newInstance(
a.getClass().getComponentType(), count);
int n = items.length - takeIndex;
if (count <= n)
System.arraycopy(items, takeIndex, a, 0, count);
else
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
if (len > count)
a[count] = null;
finally
lock.unlock();
return a;
将数组拷贝到一个指定数组中,大体操作和toArray类似,不在赘述
清空操作
public void clear()
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try
int k = count;
if (k > 0)
final int putIndex = this.putIndex;
int i = takeIndex;
do
items[i] = null;
if (++i == items.length)
i = 0;
while (i != putIndex);
takeIndex = putIndex;
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
finally
lock.unlock();
清空操作,将队列元素逐个清空,同时如果有有入队线程等在notFull上,进行唤醒
使用模型
阻塞队列主要应用在生产者和消费者模型中使用,消费者线程只需要关注于取数据,生产者则只需要关注往队列中存数据。
问题讨论
在ArrayBlockingQueue中存取元素,为什么只采用了单锁,而不是在读取和存入数据时各采用一把锁?
理论上采用双锁时,支持更大的并发吞吐量,比如在LinkedBlockingQueue中就采取了入队锁和出队锁,支持两个线程同时操作,增大了并发量。为什么ArrayBlockingQueue单锁呢,难道作者没有考虑并发性能这块,这显然是不可能的。个人觉得由于数组是连续的,把整个数组items当作一个临界资源,然后对items的任何操作都需要进行上锁和解锁操作,这种情况下虽然性能会差点,但是可以保障items的绝对安全。
以上是本人的理解,如果有错,请大神指点。
以上是关于JDK1.8 ArrayBlockingQueue类说明的主要内容,如果未能解决你的问题,请参考以下文章
源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue
源码阅读(33):Java中线程安全的QueueDeque结构——ArrayBlockingQueue
源码阅读(33):Java中线程安全的QueueDeque结构——ArrayBlockingQueue