源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue
Posted 说好不能打脸
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue相关的知识,希望对你有一定的参考价值。
(接上文《源码阅读(33):Java中线程安全的Queue、Deque结构——ArrayBlockingQueue(3)》)
2.3.3.3、forEachRemaining() 方法
forEachRemaining(Consumer<? super E> action) 方法是JDK 1.8+之后的版本新增的一个方法,是java.util.Iterator接口中定义的一个新方法,该方法类似于java.lang.Iterable接口定义的forEach(Consumer<? super T> action)方法。但是两者是有区别的:
- forEach(Consumer<? super T> action)方法在java.lang.Iterable接口中被定义,表示一个可迭代的java Collection Framework具体集合类(实际上java.lang.Iterable接口比java.util.Collection接口层次更低)。调用者对具体集合类的forEach方法调用多少次,后者就会执行多少次:
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(20);
queue.add("1");
// ......
queue.add("6");
queue.add("7");
// 第一次执行forEach,并且会执行
queue.forEach(item ->
// ......
);
// 第二次执行forEach,并且会执行
queue.forEach(item ->
// ......
);
- forEachRemaining(Consumer<? super E> action) 方法在java.util.Iterator接口中被定义,后者表示一个具体迭代器的实现。无论调用者对具体迭代的的forEachRemaining(Consumer<? super E> action) 方法调用多少次,后者只会执行一次:
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(20);
queue.add("1");
// ......
queue.add("7");
// 创建一个迭代器
Iterator<String> itr = queue.iterator();
// 第一次执行forEachRemaining,并且会执行
itr.forEachRemaining(item ->
// ......
);
// 第二次执行forEachRemaining,但是会执行
itr.forEachRemaining(item ->
// ......
);
究其原因,是因为forEachRemaining(Consumer<? super E> action) 方法默认基于迭代器的hasNext()方法和next()方法配合进行工作,在迭代器对象完成所有的数据的遍历后,第二次调用同一个迭代器对象的forEachRemaining(Consumer<? super E> action) 方法,当然就不会执行了。以下是forEachRemaining方法的默认实现:
public interface Iterator<E>
// ......
default void forEachRemaining(Consumer<? super E> action)
// Consumer过程必须定义,否则就抛出异常
Objects.requireNonNull(action);
// 这里就是hasNext()方法和next()方法的配合工作
while (hasNext())
action.accept(next());
// ......
由于ArrayBlockingQueue内部的循环数组结构和多线程场景下的工作要求,所以ArrayBlockingQueue队列的迭代器中,对forEachRemaining方法的定义进行了调整,如下所示:
private class Itr implements Iterator<E>
// ......
public void forEachRemaining(Consumer<? super E> action)
Objects.requireNonNull(action);
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try
final E e = nextItem;
if (e == null) return;
if (!isDetached())
incorporateDequeues();
// 以上代码片段是操作方法获取到了ArrayBlockingQueue队列的操作权后的规范化处理过程
// 这里就不再进行赘述。我们主要从action.accept(e);这句代码开始介绍
// 该语句将上次next()方法记录的nextItem数据输出给下一消费者(Consumer)
action.accept(e);
// 这里还要再进行一次是否过期的判定,因为incorporateDequeues()方法运行后,当前Itr迭代器可能已经无法取数
// 具体的场景可参见上一篇文章的描述
if (isDetached() || cursor < 0)
return;
final Object[] items = ArrayBlockingQueue.this.items;
// 接着基于当前有效的cursor游标位置,开始对ArrayBlockingQueue队列中还没有遍历(且可遍历)的数据进行遍历
for (int i = cursor, end = putIndex, to = (i < end) ? end : items.length; ; i = 0, to = end)
for (; i < to; i++)
action.accept(itemAt(items, i));
if (to == end) break;
finally
// 当完成所有数据的遍历后(无论成功还是失败)
// 将当前迭代器设置为“独立/无效”工作模式
cursor = nextIndex = lastRet = NONE;
nextItem = lastItem = null;
detach();
lock.unlock();
// ......
2.3.4、Itrs迭代器组的清理过程
本小节我们来详细讲解一下Itrs迭代器组的清理过程。上文已经提到,ArrayBlockingQueue队列集合中所有的迭代器都在Itrs迭代器组中进行管理,这些迭代器将在Itrs迭代器组中以单向链表的方式进行排列。所以ArrayBlockingQueue队列需要在特定的场景下,对已经失效、甚至已经被垃圾回收的迭代器管理节点进行清理。
例如,当ArrayBlockingQueue队列有新的迭代器被创建时(并为非独立/无效工作模式),Itrs迭代器组就会尝试清理那些无效的迭代器,其工作逻辑主要由Itrs.doSomeSweeping(boolean)方法进行实现,代码片段如下所示:
/**
* 该方法负责对迭代器管理组Itrs进行清理。如果清理过程中发现了某个迭代器管理节点Itrs.Node
* 需要被清理,则扫除过程会更努力的打扫后续的节点——“小扫除”变“大扫除”,当前扫除次数也会被重置
* @param tryHarder 该方法采用“大扫除”还是“小扫除”方式进行清理。为true的时候,表示使用“大扫除”模式
*/
void doSomeSweeping(boolean tryHarder)
// assert lock.isHeldByCurrentThread();
// assert head != null;
// 小扫除只会对单向链表清理4次,大扫除会至少清理16次
int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
Node o, p;
// 从这个变量的单词就可以知道,这是一台扫除车
// 这台扫除车,将顺着迭代器组中的单向队列“向前开”
final Node sweeper = this.sweeper;
// 这是一个标记,最直白的理解就是“扫除车”是否开车
boolean passedGo;
// sweeper(垃圾车)为null的情况,主要的场景是Itrs迭代器组中没有迭代器对象
// 也可能是上次的清理操作已经将单向链表中所有的Node节点扫除完毕
if (sweeper == null)
// 从当前单向链表的第一个Node节点开始,顺着链表向后打扫
o = null;
p = head;
passedGo = true;
else
// 从上次扫除结束的Node节点开始,继续顺着单向链表向后打扫
o = sweeper;
p = o.next;
passedGo = false;
// ============ 以上过程决定“扫除车”从哪个地方开始打扫。以下就开始进行具体的打扫了。
// 首先根据之前已经确认的扫除次数,决定是“大扫除”还是“小扫除”,16次清扫(循环)还是4次清扫(循环)
// 注意,如果在清扫过程中,发现已经被回收或者已经“无效”的迭代器对象,则“小扫除”会变成“大扫除”
for (; probes > 0; probes--)
// 如果条件成立,说明当前迭代器组Itrs集合中没有任何迭代器对象,不需要进行扫除
if (p == null)
if (passedGo)
break;
// 否则当p(开始扫除的Itrs.Node位置)为null时,就从单向链表的头节点,开始扫除
o = null;
p = head;
passedGo = true;
// ========== 每一次扫除处理,都会做以下操作:
// 取得当前这个Itrs.Node位置上所关联的迭代器(注意这里是一个弱引用)
final Itr it = p.get();
// 取得当前Itrs.Node位置的下一个Itrs.Node位置
final Node next = p.next;
// 如果条件成立,说明当前Itrs.Node需要被清理
// 那么就是用if块中的代码,进行清理,并且如果发现了这种场景,扫除车就会努力的进行后续的清理
// 这就是该方法在最开始处提到的“更加努力的做后续清理”。否则,这个节点就不需要被清理
if (it == null || it.isDetached())
// found a discarded/exhausted iterator
// 更加努力的做后续清理,“小扫除”会变成“大扫除”,且扫除次数会被重置
probes = LONG_SWEEP_PROBES; // "try harder"
// unlink p
// 在it.isDetached()成立的场景下,主动清理这个弱引用
p.clear();
// 断开p节点和单向链表后续节点的连接关系
p.next = null;
// 如果条件成立,则说明当前清扫的节点是单向链表的头节点
// 那么需要重新设定头节点为当前被清扫节点的后续结点。
// 如果都没有后续结点了,那么说明迭代器管理组中就没有任何节点了,也就没有必要继续清扫下去(return退出)
if (o == null)
head = next;
if (next == null)
// We've run out of iterators to track; retire
itrs = null;
return;
// 其它场景下,则通过该语句将当前被清扫的p节点彻底断开和单向链表各节点的关系
else
o.next = next;
else
o = p;
p = next;
// 扫除结束后,根据最后p节点的引用情况,决定扫除车是停留在扫除结束节点上
// 还是设置为null。核心思路是,如果当前单向链表的所有Itrs.Node都扫除了一次,则扫除车没有存在的必要了
// 否则让扫除车停留在扫除结束的位置上,以便下一次清扫请求被触发时,继续向后进行打扫
this.sweeper = (p == null) ? null : o;
通过以上方法的详细解读,我们知道了迭代器管理组Itrs对其中迭代器对象的清理,主要包括以下关键点:
- 首先根据当前sweeper扫除车的状态,决定本次扫除的开始位置——在单向链表中扫除的开始位置
- 在确认完扫除开始位置后,再依次进行扫除。扫除模式分为“小扫除”和“大扫除”。
- “小扫除”的定义是从扫除开始的位置,向后扫描最多4个节点。“小扫除”的目的在于节约操作步骤的同时,校验链表中的迭代器部分正确。
- “大扫除”的定义是从扫除开始的位置,至少向后扫描16个节点。“大扫除”的目的是保证链表中依然有效的迭代器的管理准确性。
- “小扫除”模式可以转变为“大扫除”模式,既是“小扫除”过程中,发现其中一个Itrs.Node节点已经无效(为null或者isDetached()为true)
可以用下图来表示doSomeSweeping(boolean)方法的主要清扫场景:
3、ArrayBlockingQueue队列的主要构造函数
ArrayBlockingQueue队列中一共有三个构造函数,如下所示:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
// ......
/**
* 该构造函数给定一个capacity大小,以便设定ArrayBlockingQueue队列中环形数组的最大容量
* (也就是)ArrayBlockingQueue队列的最大容量
* 注意:如果capacity < 1,则会抛出异常
*/
public ArrayBlockingQueue(int capacity)
this(capacity, false);
/**
* 该构造函数给定两个值,进行ArrayBlockingQueue队列的实例化
* @param capacity 当前ArrayBlockingQueue队列的最大容量
* @param fair 是否启用公平锁方式,默认情况下不启用
* @throws IllegalArgumentException if @code capacity < 1
*/
public ArrayBlockingQueue(int capacity, boolean fair)
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
/**
* 该构造函数给定三个值,进行ArrayBlockingQueue队列的实例化
* @param capacity 当前ArrayBlockingQueue队列的最大容量
* @param fair 是否启用公平锁方式,默认情况下不启用
* @param c 这是一个外部集合,这个集合不能为null否则要报错。
* 这些集合中的数据将会按照特定的顺序被复制到ArrayBlockingQueue队列中
*/
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try
final Object[] items = this.items;
int i = 0;
try
for (E e : c)
items[i++] = Objects.requireNonNull(e);
catch (ArrayIndexOutOfBoundsException ex)
throw new IllegalArgumentException();
count = i;
putIndex = (i == capacity) ? 0 : i;
finally
lock.unlock();
// ......
ArrayBlockingQueue队列自身的方法,相较于其下的Itr迭代器和Itrs迭代器分组而言,就要简单许多了。例如其三个构造函数,不需要进行逐行注释说明,读者就能看懂其中的意义了。
这里只需要特别注意的是构造函数中创建的两个condition对象,关于condition对象的详细介绍已经在讲解AQS的章节中进行了详细说明,这里notEmpty对象负责在ArrayBlockingQueue队列至少有一个数据的场景下,通知可能处于阻塞状态的消费者线程结束阻塞状态;notFull对象作用正好相反,它负责在ArrayBlockingQueue队列至少有一个空余的索引位可以放入新的数据时,通知可能处于阻塞状态的生产者线程结束阻塞状态。(后文讲解ArrayBlockingQueue队列的具体方法时,会涉及这些过程的详细介绍)
4、主要方法
ArrayBlockingQueue队列实现了java.util.concurrent.BlockingQueue接口,总的来说ArrayBlockingQueue队列中的常用方法遵循相同的处理逻辑,区别点主要在于不能正常操作时的处理方式。这里我们选择几个具有代表性的操作方法进行介绍:
4.1、offer(E e) 方法
根据官方的描述,offer(E e) 方法的主要工作过程是将特定的数据添加到队列尾部,这个数据不能为null。如果添加操作成功,则返回true,其他情况(添加失败)则返回false。
public boolean offer(E e)
// 进行添加的数据对象,不能为null
Objects.requireNonNull(e);
// 获取队列集合的操作权限
final ReentrantLock lock = this.lock;
lock.lock();
try
// 如果条件成立,说明ArrayBlockingQueue队列集合
// 已经没有多余的空间进行添加操作,则返回false
if (count == items.length)
return false;
// 这个else多余了
else
// 使用enqueue方法进行新的数据对象添加
// 最后返回true
enqueue(e);
return true;
finally
lock.unlock();
4.2、put(E e) 方法
和offer(E e) 方法类似的,还有put(E e) 方法,两者的区别是:如果ArrayBlockingQueue队列集合不能(已经没有多余的空间)进行添加操作,那么put(E e) 方法将进入阻塞状态,直到被唤醒并能够进行添加操作为止。代码片段如下:
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*/
public void put(E e) throws InterruptedException
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
// 获取到操作权后,才能进行后续操作
lock.lockInterruptibly();
try
// 如果条件成立,说明当前队列中已经没有空间进行添加操作
// 那么进入阻塞状态
while (count == items.length)
notFull.await();
// 通过enqueue方法进行添加操作
enqueue(e);
finally
lock.unlock();
lock.lock()方法和lock.lockInterruptibly()方法的区别,在之前介绍AQS的文章中已经进行了说明。这里再做一次简单说明:lockInterruptibly()方法在获取锁之前会确认线程中断信号(Thread.interrupted()),如果收到线程中断信号,则会抛出InterruptedException 异常;而lock()方法不会考虑线程中断信号的问题。
4.3、E take() 方法
take()方法可以从ArrayBlockingQueue队列头部获取一个数据对象,如果当前ArrayBlockingQueue队列已经没有数据对象可以获取,则进入阻塞状态。该方法中实际获取数据对象的方法,是前文中已经介绍过的dequeue()方法。
public E take() throws InterruptedException
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
while (count == 0)
notEmpty.await();
return dequeue();
finally
lock.unlock();
========
(ArrayBlockingQueue完,接后文《源码阅读(34):Java中线程安全的Queue、Deque结构——ArrayBlockingQueue(4)》)
以上是关于源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章
源码阅读(32):Java中线程安全的QueueDeque结构——ArrayBlockingQueue
源码阅读(32):Java中线程安全的QueueDeque结构——ArrayBlockingQueue
源码阅读(39):Java中线程安全的QueueDeque结构——LinkedTransferQueue
源码阅读(39):Java中线程安全的QueueDeque结构——LinkedTransferQueue