ArrayBlockingQueue图解源码分析
Posted 野生java研究僧
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ArrayBlockingQueue图解源码分析相关的知识,希望对你有一定的参考价值。
ArrayBlockingQueue目录结构
ArrayBlockingQueue简介:
-
基于数组的阻塞队列实现,在 ArrayBlockingQueue 内部,维护了一个定长数 组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数 组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
-
ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个 锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于 LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue 完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea 之 所以没这样去做,也许是因为 ArrayBlockingQueue 的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其 在性能上完全占不到任何便宜。
-
ArrayBlockingQueue 和 LinkedBlockingQueue 间还有一个明显的不同之处在于,前者在插入或删除 元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的 Node 对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于 GC 的影响还是存在一定的区别。而在创建 ArrayBlockingQueue 时,我们还 可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
ArrayBlockingQueue继承关系图
我们先来大概瞄一眼这个数组是怎么工作的,已经他的一个出队和入队操作
ArrayBlock的核心方法:
// BlockingQueue的核心方法:
// 向队列中添加元素,如果队列已经满了就抛出:IllegalStateException
boolean add(E e)
// 向队列中添加元素,如果队列已经满了,就直接返回false,并不会让当前添加元素的线程进行阻塞,成功返回true
boolean offer(E e)
// 在指定的时间内把把元素添加到队列中,如果队列满了则会进行阻塞,直到队列有空位置或等待超时结束该方法
boolean offer(E e, long timeout, TimeUnit unit)
// 往队列中添加元素。如果队列已满就进行阻塞,直到队列中有空位置,别的线程将其唤醒为止
void put(E e)
// 获取并移除队列头的第一个元素,如果队列中没有元素直接返回null,并不会阻塞
E poll()
// 在指定时间内获取并移除掉队列头的元素,如果超时队列中还没有元素就直接返回null
E poll(long timeout, TimeUnit unit)
// 从队列的头部获取一个元素,如果队列为空就一直进行阻塞,直到别的线程添加完元素,有机会将其唤醒,在阻塞过程中可打断
E take()
// 一次性从队列中获取指定个数可用的元素,放到一个新的集合中,该方法可以提升效率,避免分批获取多次加锁
int drainTo(Collection<? super E> c, int maxElements)
ArrayBlockingQueue的构造方法:
// 传递一个初始化容量,构建一个新的ArrayBlockingQueue队列
public ArrayBlockingQueue(int capacity)
// 根据传递过来的容量大小创建一个非公平的ArrayBlockIngQueue队列
this(capacity, false);
//指定初始化容量,指定是公平锁还是非公平锁来构建一个新的ArrayBlockingQueue队列
public ArrayBlockingQueue(int capacity, boolean fair)
// 如果初始化容量小于等于0直接抛出异常
if (capacity <= 0)throw new IllegalArgumentException();
// 根据传递过来的容量初始队列数组
this.items = new Object[capacity];
// 创建一个指定锁,具体是公平还是非公平根据传递过来的参数决定,false表示非公平
lock = new ReentrantLock(fair);
// 创建一个队列不为空的等待条件变量,如take()的时候队列中没有数据,那么take()的线程就进入该条件变量上进行等待
notEmpty = lock.newCondition();
// 创建一个队列未满的一个等待条件变量,如put()的时候队列的数据已满,那么put的线程就进入该条件变量上进行等待
notFull = lock.newCondition();
/**
* 传递一个集合过来,将集合中的元素添加到队列中
* @param capacity 初始容量
* @param fair 是否采用公平锁
* @param c 装有元素的集合,不能为空,如果为空抛出NullPointerException
*/
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
// 调用构造方法根据传递过来的参数初始队列
this(capacity, fair);
final ReentrantLock lock = this.lock;
// 进行加锁操作
lock.lock(); // Lock only for visibility, not mutual exclusion
try
// 定义一个临时变量i,用于遍历集合
int i = 0;
try
// 遍历集合中的元素
for (E e : c)
// 如果遍历到集合中有null的元素直接抛出异常
checkNotNull(e);
// 将遍历出来的元素放入到队列内部的数组中去
items[i++] = e;
// 如果集合中的元素个数超过队列的初始容量直接抛出参数异常
catch (ArrayIndexOutOfBoundsException ex)
throw new IllegalArgumentException();
// 传递过来的集合中元素有效个数赋值给队列的计数器
count = i;
// 根据集合中的元素个数记录putIndex,也就是下次要添加在数组中那个下标的位置,
//如果i==capacity则下次添加的时候就会抛出异常添加失败,因为下次添加的时候会用count和capacity进行比较
putIndex = (i == capacity) ? 0 : i;
finally
// 释放锁
lock.unlock();
重要核心属性和方法
接下来我们看下ArrayBlockingQueue内部的重要属性和一些方法:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
private static final long serialVersionUID = -817911632652898426L;
// 存放具体元素的itmes
final Object[] items;
// takeIndex在 take, poll, peek or remove的下一个索引
int takeIndex;
// putIndex put, offer, or add 的时候用到记录下一个元素需要插入在数组中的那个下标位置
int putIndex;
// 队列中元素的个数
int count;
// lock锁,对添加元素和获取元素进行上锁,用的是同一把锁
final ReentrantLock lock;
// 在获取元素的时候,如果队列为空。此次获取元素的线程就进入该条件变量等待
private final Condition notEmpty;
// 在添加元素时队列已满,添加的线程就进入到该条件变量上进行等待
private final Condition notFull;
// 继承自Itorator的一个迭代器
transient Itrs itrs = null;
final int dec(int i)
return ((i == 0) ? items.length : i) - 1;
// 根据下标返回itmes数组中的元素
@SuppressWarnings("unchecked")
final E itemAt(int i)
return (E) items[i];
// 检测空指针异常
private static void checkNotNull(Object v)
if (v == null)
throw new NullPointerException();
// 添加元素到items数组中去,调用该方法的时候会进行上锁
private void enqueue(E x)
// 先将itmes数组赋值给一个临时变量,在自己的线程中进行操作
final Object[] items = this.items;
// 将要添加的元素添加到items数组中去
items[putIndex] = x;
// 如果putIndex达到数组的最大容量,将putIndex置为0
if (++putIndex == items.length)
putIndex = 0;
// 将队列中记录元素个数的计数器+1
count++;
// 添加完后唤醒获取元素阻塞的线程
notEmpty.signal();
// 从items数组中移除掉元素,调用该方法之前会进行上锁
private E dequeue()
// 先将itmes数组赋值给一个临时变量,在自己的线程中进行操作
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// 从队列的头部根据takeIndex获取到该元素
E x = (E) items[takeIndex];
// 将本次要移除的元素所在下标为止置空,可以帮助GC
items[takeIndex] = null;
// 如果takeIndex==itmes的数组长度,说明已经达到数组尾部,将takeIndex置为0
if (++takeIndex == items.length)
takeIndex = 0;
// 队列中的元素个数-1
count--;
if (itrs != null)
itrs.elementDequeued();
// 移除完后唤醒想要添加元素但正在阻塞的线程
notFull.signal();
return x;
// 指定删除索引上的元素,删除完后唤醒想要添加但是正在阻塞的线程,在执行该方法时会进行一个上锁
void removeAt(final int removeIndex)
// 先将itmes数组赋值给一个临时变量,在自己的线程中进行操作
final Object[] items = this.items;
if (removeIndex == takeIndex)
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
else
final int putIndex = this.putIndex;
for (int i = removeIndex;;)
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex)
items[i] = items[next];
i = next;
else
items[i] = null;
this.putIndex = i;
break;
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
notFull.signal();
入队操作
add(E e): 调用 offer(e)方法向队列中添加元素,如果队列已经满了就抛出:IllegalStateException
public boolean add(E e)
// 直接调用offer(e)方法将该元素添加到队列的尾部,如果成功返回ture,失败说明对面已满,抛出异常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
offer(E e): 向队列中添加元素,如果队列已经满了,就直接返回false,并不会让当前添加元素的线程进行阻塞,成功返回true
public boolean offer(E e)
// 先检查传递过来的元素是否为null,如果为null直接抛出异常
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 进行上锁操作,具体是公平还是非公平由构造方法传递,默认非公平
lock.lock();
try
// 如果队列已满直接返回false
if (count == items.length)
return false;
else
// 如果队列没有满直接调用enqueue(e)方法添加到队列尾部并返回true
enqueue(e);
return true;
finally
// 释放锁
lock.unlock();
offer(E e, long timeout, TimeUnit unit):在指定的时间内把把元素添加到队列中,如果队列满了则会进行阻塞,直到队列有空位置或等待超时结束该方法
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
// 检测传递过来的元素是否null,为null直接抛出异常
checkNotNull(e);
// 将传递过来的时间统一转化为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 添加可打断锁
lock.lockInterruptibly();
try
// 如果队列已满,进入notFull条件变量等够时间
while (count == items.length)
// 如果等的诗句到了,队列还是满的,直接返回false
if (nanos <= 0)
return false;
// 进入notFull条件变量等够时间,如果等的过程中有别的线程从队列获取了元素,将其唤醒,他就可以往队列中添加元素
nanos = notFull.awaitNanos(nanos);
// 如果队列未满,直接调用enqueue(e)方法将元素添加到队列的尾部
enqueue(e);
return true;
finally
// 释放锁
lock.unlock();
put(E e) : 往队列中添加元素。如果队列已满就进行阻塞,直到队列中有空位置,别的线程将其唤醒为止,如果添加的元素为null会抛出异常
public void put(E e) throws InterruptedException
// 先检查传递过的元素是不是为null,如果为null就直接抛出异常了
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 添加上一个可打断的lock锁
lock.lockInterruptibly();
try
// 判断队列中是否还有空位置,如果没有就进入notFull条件变量上进行等待
while (count == items.length)
notFull.await();
// 队列未满,直接添加到队列尾部,添加完后会唤醒想要获取但正在阻塞的线程
enqueue(e);
finally
// 释放锁
lock.unlock();
出队操作
remove(): 从队列的头部移除掉一个元素
public E remove()
// 直接调用poll()方法从队列的头部移除掉一个元素,并返回该元素
E x = poll();
// 如果移除的时候成功返回该元素说明移除成功返回该元素,如果没有找到抛出异常
if (x != null)
return x;
else
throw new NoSuchElementException();
remove(Object o):
public boolean remove(Object o)
// 如果传递过来的元素为null直接returnfalse
if (o == null) return false;
// 先将itmes数组赋值给一个临时变量,在自己的线程中进行操作
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
// 移除的时候进行上锁
lock.lock();
try
// 判断队列中是否还有元素
if (count > 0)
// 获取到putIndex
final int putIndex = this.putIndex;
// 获取到 takeIndex,循环遍历items找到需要删除的元素,takeIndex之前的元素都是null所以直接跳过
int i = takeIndex;
do
// 判断当前元素是否在items中,如果找到直接删除。返回 ture
if (o.equals(items[i]))
// 根据指定下标删除items中的元素
removeAt(i);
return true;
// 如果达到最大容量将i=0,因为达到最大容量的时候,putIndex会置为0
if (++i == items.length)
i = 0;
// 如果takeIndex==putIndex说明已经遍历到数组的尾部(items数组中最后一个有效元素)
while (i != putIndex);
// 如果在items数组未找到要删除的元素直接返回false
return false;
finally
// 释放锁
lock.unlock();
E poll(): 获取并移除队列头的第一个元素,如果队列中没有元素直接返回null,并不会阻塞
public E poll()
final ReentrantLock lock = this.lock;
// 进行上锁
lock.lock();
try
// 如果队列中还有元素,直接从对头部获取一个元素返回并从队列中移除掉该元素
return (count == 0) ? null : dequeue();
finally
// 释放锁
lock.unlock();
**E poll(long timeout, TimeUnit unit):**在指定时间内获取并移除掉队列头的元素,如果超时队列中还没有元素就直接返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException
// 将传递过来的时间统一转化为纳秒
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 加上一个可中断锁
lock.lockInterruptibly();
try // 如果队列还没有元素,就进入notEmpty条件变量上进行等待
while (count == 0)
// 等待时间到了,队列中还没有元素返回null
if (nanos <= 0)
return null;
// 进入notEmpty条件变量等待,直到等待的时间超过我们设置的超时时间
nanos = notEmpty.awaitNanos(nanos);
// while循环不成立,说明队列中有元素,直接返回队列头部的第一个元素并移除
return dequeue();
finally
lock.unlock();
// 具体等待的方法(因为时间有限就不一一分析下去了,有空在深入分析)
public final long awaitNanos(long nanosTimeout) throws InterruptedException
// 如果在等待的过程中被打断直接抛出异常
if (Thread.interrupted()) throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node))
if (nanosTimeout <= 0L)
transferAfterCancelledWait(node);
break;
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
ArrayBlockingQueue源码分析