源码阅读(33):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

Posted 说好不能打脸

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码阅读(33):Java中线程安全的QueueDeque结构——ArrayBlockingQueue相关的知识,希望对你有一定的参考价值。

(接上文《源码阅读(32):Java中线程安全的Queue、Deque结构——ArrayBlockingQueue(2)》)

2.3.3、迭代器中主要方法分析

一旦Itr迭代器完成初始化,就可以开始使用了。而使用迭代器最常见的方法就是使用hasNext()方法和next()方法进行配合。另外从JDK 1.8+开始,还可以使用Lambda表达式进行表达,最后ArrayBlockingQueue队列集合的迭代器还支持remove()方法的使用。

2.3.3.1、hasNext()方法和next()方法

我们先行来介绍hasNext()方法和next()方法,这也是最常见的迭代器使用方法,从JDK 1.8开始,java.util.Iterator定义了另一种方法void forEachRemaining(Consumer<? super E> action),其默认实现也是基于hasNext()方法和next()方法的配合使用,如下所示:

public interface Iterator<E> 
  // ......
  default void forEachRemaining(Consumer<? super E> action) 
    Objects.requireNonNull(action);
    // hasNext()方法和next()方法进行配合,构成了默认的处理方式
    while (hasNext()) 
      action.accept(next());
    
  
  // ......

当然由于ArrayBlockingQueue队列特殊的内部结构,其对forEachRemaining()方法进行了重写(本文的后续部分会进行介绍),下面我们再来看看最常见的hasNext()方法和next()方法配合使用的方式,代码片段如下所示:

// ......
Iterator<String> itr = queue.iterator();
while(itr.hasNext()) 
  System.out.println(itr.next());

// ......
2.3.3.1.1、最简单的运行情况

当然,hasNext()方法和next()方法在配合使用的过程中有许多处理分支,所以,为了方便读者理解,我们介绍这组方法时会按照这组方法最常见的运行方式去进行讲解,首先我们来看一下hasNext()方法的部分代码,如下所示:

// hashNext方法,如果迭代器还可以遍历下一个数据,则返回true;其它情况返回false
public boolean hasNext() 
  if (nextItem != null)
    return true;
  // ...... hasNext并没有全部展示,我们先将注意力集中在以上条件成立的情况

为了便于理解以下的代码,特别是Itr迭代器中nextItem变量的引用情况,本文这里放置一张上文已经提到的ArrayBlockingQueue队列初始化后的状态图:

通常情况下,hasNext()方法尽可能保证自己不会在获得操作锁以后才能进行工作,这样做是为了尽可能提高并发操作性能。为了达到这个目的hasNext方法通常以nextItem属性的值状态作为判定依据。并且只要迭代器还没有完成遍历,nextItem属性中的一定都会有值。如果hasNext()方法返回true,一般情况下使用者就可以使用next()方法拿取这个遍历点上的数据,代码片段如下所示:

public E next() 
  final E e = nextItem;
  // 注意,如果nextItem中没有数据,则直接抛出异常,这就是为什么在执行next()方法前,
  // 一定要先使用hasNext()方法检查迭代器的有效性
  if (e == null)
    throw new NoSuchElementException();
  final ReentrantLock lock = ArrayBlockingQueue.this.lock;
  // 只有在获得锁的情况下才能执行next遍历操作
  lock.lock();
  try 
    // 如果当前迭代器不是“独立”模式(也就是说没有失效)
    // 则通过incorporateDequeues方法对lastRet、nextIndex、cursor、prevCycles、prevTakeIndex属性进行修正
    // 保证以上这些属性的状态值,和当前ArrayBlockingQueue队列集合的状态一致。
    // incorporateDequeues方法很重要,下文中立即会进行介绍
    if (!isDetached()) 
      incorporateDequeues();
    
    // assert nextIndex != NONE;
    // assert lastItem == null;
    // 将nextIndex索引位置赋值给lastRet,表示之前取next元素的索引位已经变成了“上一个”取出数据的索引位
    lastRet = nextIndex;
    final int cursor = this.cursor;
    // 如果当前游标有效(不为NONE)
    if (cursor >= 0) 
      // 那么游标的索引位置就成为下一个取数的位置
      nextItem = itemAt(nextIndex = cursor);
      // assert nextItem != null;
      // 接着游标索引位+1,注意:这是游标索引位可能为None
      // 代表取出下一个数后,就再无数可取,遍历结束
      this.cursor = incCursor(cursor);
     
    // 否则就认为已无数可取,迭代器工作结束
    else 
      // 这时设定nextIndex为NONE,,设定nextItem为Null
      nextIndex = NONE;
      nextItem = null;
      // 如果条件成立,则标识当前迭代器为“独立”(无效)工作状态
      if (lastRet == REMOVED) 
        detach();
      
    
   finally 
    lock.unlock();
  
  return e;


// 该方法负责将当前Itr迭代器置为“独立/失效”工作状态,既将prevTakeIndex设置为DETACHED
// 这个动作可能发生在以下多种场景下:
// 1、当Itrs迭代器组要停止对某个Itr迭代器进行状态跟踪时。
// 2、当迭代器中已经没有更多的索引位可以遍历时。
// 3、当迭代器发生了一些处理异常时,
// 4、当incorporateDequeues()方法中判定三个关键索引位全部失效时(cursor < 0 && nextIndex < 0 && lastRet < 0)
// 5、.....
private void detach() 
  // Switch to detached mode
  // assert lock.isHeldByCurrentThread();
  // assert cursor == NONE;
  // assert nextIndex < 0;
  // assert lastRet < 0 || nextItem == null;
  // assert lastRet < 0 ^ lastItem != null;
  if (prevTakeIndex >= 0) 
    // assert itrs != null;
    // 设定一个Itr迭代器失效,就是设定prevTakeIndex属性为DETACHED常量
    prevTakeIndex = DETACHED;
    // try to unlink from itrs (but not too hard)
    // 一旦该迭代器被标识为“独立”(无效)工作模式,则试图清理该迭代器对象在Itrs迭代器组中的监控信息
    itrs.doSomeSweeping(true);
  

以上代码片段,如果在ArrayBlockingQueue队列集合有数据的场景下,在迭代器刚完成初始化后第一次运行时,通常来说运行结果类似如下所示:

2.3.3.1.2、考虑多线程介入操作的情况

但实际情况可能要复杂得多,因为ArrayBlockingQueue队列被设计成能在多线程同时操作的场景下正确工作,所以有可能在两次next()方法操作之间很短的时间内,ArrayBlockingQueue队列已经被其它线程进行了多次读写操作。甚至ArrayBlockingQueue队列中关键的takeIndex参数已在环形数组中循环多次(跨越0号索引位多次),如下图所示:

如上图所示,在完成迭代器的next()操作后,ArrayBlockingQueue队列的操作权就被释放(lock.unlock()),这时其它线程操作ArrayBlockingQueue队列做了很多读/写操作,导致ArrayBlockingQueue队列的putIndex索引位置跨过0号索引位1次(iters.cycles值为1),并且停留在1号索引位的位置。而且由于这些读操作,目前1号索引位、2号索引位、3号索引位上已经没有数据,Itr迭代器还没有开始遍历到的2号索引位之后的数据也已经全部更换了一批。

需要注意的是,以上场景中我们不考虑(iters.cycles - prevCycles > 1)的情况,因为这种情况迭代器内部已经做了安全限定。以上情况看似很极端,但实则在多线程场景下是比较常见的,在这样的情况下,Itr迭代的的hasNext()/next()方法又开始被调用了(连续进行了多次调用):

public boolean hasNext() 
  // 由于nextItem已经在上次next()方法中,提前引用了最初存储在2号索引位上的数据
  // 所以这时的判定条件返回的还是true
  if (nextItem != null)
    return true;
  noNext();
  return false;

由于以上hasNext()方法返回了true,所以调用者依然可以合法调用next()方法,代码片段如下所示(一些重复且不会涉及的代码片段就进行省略了):

//......
public E next()  
  // ......
  try  
    if (!isDetached())  
      // 进行有出队数据存在情况下的索引位置判定和调整
      // 我们主要就来分析,该方法如何进行索引位置修正
      incorporateDequeues(); 
     
    // ......
   finally  
    lock.unlock();
  
  return e;

//......

// 该方法用于判定当前迭代器是否采用“独立”模式在运行
boolean isDetached() 
  // assert lock.isHeldByCurrentThread();
  return prevTakeIndex < 0;


// 该方法在用于在Itr迭代器多次操作的间歇间,ArrayBlockingQueue队列状态发生变化的情况下
// 对Itr的重要索引位置进行修正(甚至是让Itr在极端情况下无效)
private void incorporateDequeues() 
  // assert lock.isHeldByCurrentThread();
  // assert itrs != null;
  // assert !isDetached();
  // assert count > 0;
  
  // 这是ArrayBlockingQueue目前记录的takeIndex索引位回到0号索引位的次数
  final int cycles = itrs.cycles;
  // 这是ArrayBlockingQueue目前记录的takeIndex索引位的值
  final int takeIndex = ArrayBlockingQueue.this.takeIndex;
  // 这是本迭代器中上一次获取到的takeIndex索引位回到0号索引位的次数(值为0)
  final int prevCycles = this.prevCycles;
  // 这是本迭代器中上一次获取到的takeIndex索引位的值
  final int prevTakeIndex = this.prevTakeIndex;

  // 如果发现cycles和prevCycles存在差异,或者takeIndex和prevTakeIndex存在差异
  // 则说明在迭代器的两次操作间隔中,ArrayBlockingQueue中的数据发生了变化,那么需要进行修正
  if (cycles != prevCycles || takeIndex != prevTakeIndex) 
    // ArrayBlockingQueue队列中循环数组的容量长度
    // 和代码配套的示意图中,该值为X+1
    final int len = items.length;
    // how far takeIndex has advanced since the previous
    // operation of this iterator
    // 这句计算非常重要,就是计算在所有读取操作后,两次takeIndex索引产生的索引距离(已出队的数据量)
    long dequeues = (long) (cycles - prevCycles) * len + (takeIndex - prevTakeIndex);

    // Check indices for invalidation
    // 判定lastRet索引位置是否失效,如果失效则赋值为-2
    if (invalidated(lastRet, prevTakeIndex, dequeues, len)) 
      lastRet = REMOVED;
    
    // 判定nextIndex索引位是否失效,如果失效则赋值为-2
    if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) 
      nextIndex = REMOVED;
    
    // 判定nextIndex索引位是否失效,如果失效则将ArrayBlockingQueue目前记录的takeIndex索引位的值赋给它
    // 让cursor游标索引位,指向当前ArrayBlockingQueue队列的head位置
    if (invalidated(cursor, prevTakeIndex, dequeues, len)) 
      cursor = takeIndex;
    
    
    // 如果cursor索引、nextIndex索引、lastRet索引,则表示当前Itr游标失效
    // 调用detach()方法将当前Itr迭代器标记为失效,并清理Itrs迭代器组中的Node信息
    if (cursor < 0 && nextIndex < 0 && lastRet < 0) 
      detach();
     
    // 否则(大部分情况)修正Itr迭代器中的状态,以便其能从修正的位置开始进行遍历
    else 
      this.prevCycles = cycles;
      this.prevTakeIndex = takeIndex;
    
  


/**
 * Returns true if index is invalidated by the given number of
 * dequeues, starting from prevTakeIndex.
 * 该方法依据prevTakeIndex索引的位置、两次takeIndex索引移动的距离(已出队的数据量),以便判定给定的index索引位置是否已经失效
 * 如果失效,则返回true,其它情况返回false。
 */
private boolean invalidated(int index, int prevTakeIndex, long dequeues, int length) 
  // 如果需要判定的索引位本来就已经失效了(NONE、REMOVED、DETACHED这些常量都为负数)
  if (index < 0) 
    return false;
  
  // 计算index索引位置和prevTakeIndex索引位置的距离
  // 最简单的就是当前index的索引位减去prevTakeIndex的索引位值
  int distance = index - prevTakeIndex;
  // 如果以上计算出来是一个负值,说明index的索引位已经“绕场一周”
  // 这时在distance的基础上面,增加一个队列长度值,
  if (distance < 0) 
    distance += length;
  
  return dequeues > distance;

以上代码片段中incorporateDequeues()方法非常重要,该方法中有一句代码又是最关键:

long dequeues = (long) (cycles - prevCycles) * len + (takeIndex - prevTakeIndex);

这句代码主要是为了检测在Itr迭代器两次操作的间隔中,ArrayBlockingQueue被其它线程进行读操作,使得ArrayBlockingQueue队列中takeIndex索引位置移动的真实距离,如下图所示:

上图中将ArrayBlockingQueue队列的环形数组结构进行了平展,示意了ArrayBlockingQueue队列进行若干读写操作后,takeIndex索引位置移动的距离,这个距离将作为后续invalidated(int, int , long , int)方法中判定指定的索引位置是否已失效的重要依据。我们将示例场景带入以上代码中的各个计算公式——来看看ArrayBlockingQueue队列被操作后的各种属性的变化:

  • cycles的值为1 ——因为takeIndex索引回到0号索引位的次数总共为1,prevCycles的值为0——因为Itr迭代器上一次获取到的cycles值为0;len长度为X + 1;takeIndex索引位的值为3;prevTakeIndex索引位的值为1,所以最终得到的dequeues变量的值为(X+1)+ 2。

  • 当进行“invalidated(lastRet, prevTakeIndex, dequeues, len)”调用时:lastRet索引位的值为1,prevTakeIndex的值为1,所以计算出来的distance的值为0;那么 “dequeues > distance”的判定将会返回true。所以lastRet索引被标识为-2(已移除)。

  • 当进行“invalidated(nextIndex, prevTakeIndex, dequeues, len)”调用时:nextIndex索引位的值为2,prevTakeIndex的值也为1,所以计算出来的distance的值为1,那么 “dequeues > distance”的判定将会返回true。所以nextIndex索引被标识为-2(已移除)。

  • 当进行“invalidated(cursor, prevTakeIndex, dequeues, len)”调用时:cursor索引位的值为3,prevTakeIndex的值还是为1,所以计算出来的distance的值为2,那么 “dequeues > distance”的判定将会返回true。所以cursor 索引被标识为ArrayBlockingQueue目前记录的takeIndex索引位的值。

  • 请注意:incorporateDequeues()方法的目标是在Itr迭代器两次操作间隙ArrayBlockingQueue队列发生读写操作的情况下,尽可能修正Itr迭代器的索引位值,使它能从下一个正确的索引位置重新开始遍历数据,而不是“尽可能让Itr迭代器作废”。这从incorporateDequeues()方法中确认Itr迭代器过期所使用的相对苛刻的判定条件就可以看出来 “cursor < 0 && nextIndex < 0 && lastRet < 0”。

  • 请特别注意以上条件中的“cursor < 0” 这个条件因子,只要ArrayBlockingQueue队列中增加了之前没有遍历过的元素,且cursor索引值本身并不为NONE;或者ArrayBlockingQueue队列中还有遗留的没有遍历过,没有出队的数据,则cursor索引值都会被修正成一个非负数

为了再为读者进行更多处理情况的说明,这里我们给出更多的场景示意:

incorporateDequeues()方法非常重要还有一个原因,是Itr迭代器内部隐含的一个处理逻辑:既获得ArrayBlockingQueue队列操作权的Itr迭代器,在进行正式操作前,都必须使用incorporateDequeues()方法修正Itr中关键的索引信息(lastRet、nextIndex、cursor)以保证Itr迭代器在多线程并发读/写ArrayBlockingQueue队列的操作场景下索引位置的正确性

2.3.3.1.3、当hasNext()方法发现没有更多数据可遍历时

根据以上介绍的hasNext()方法/next()方法在一般工作场景下的配合使用,以及next()方法中对遍历索引位的修正工作原理,存储在ArrayBlockingQueue队列中的所有数据就可以被遍历了,直到hasNext()方法返回false,告诉调用者再无数据可进行遍历未知,如下所示:

public boolean hasNext() 
  if (nextItem != null)
    return true;
  // 如果再没有数据可以遍历,则调用noNext()方法后,返回false
  noNext();
  return false;


// 该方法主要在再没有数据可以遍历的情况下,修正和设定当前Itr迭代器的状态。
private void noNext() 
  // 首先该方法需要获得ArrayBlockingQueue队列的操作权限
  final ReentrantLock lock = ArrayBlockingQueue.this.lock;
  lock.lock();
  try 
    // assert cursor == NONE;
    // assert nextIndex == NONE;
    // 如果当前Itr迭代器并没有被设定为“独立”(失效)工作模式,则需要进行状态设定
    if (!isDetached()) 
      // assert lastRet >= 0;
      // 首先修正lastRet、nextIndex、cursor三个关键索引值
      // incorporateDequeues()方法非常重要,上文已经进行了说明
      incorporateDequeues(); // might update lastRet
      // 如果代表最后一次(上一次)next()方法返回数据所在索引位的lastRet索引值有效
      // 则还要视图取出这个索引位上的数据。
      if (lastRet >= 0) 
        lastItem = itemAt(lastRet);
        // assert lastItem != null;
        // 设定当前Itr迭代器失效,并清理Itrs迭代器组中的Node信息
        detach();
      
    
    // assert isDetached();
    // assert lastRet < 0 ^ lastItem != null;
   finally 
    lock.unlock();
  

那么有的读者会有这个疑问,为什么当迭代器没有任何数据可以遍历的时候,还要通过incorporateDequeues()方法修正各索引位的值,并且还要视图在取出lastRet索引位上的数据后,才设定迭代器失效呢?为什么不是直接设定Itr迭代器失效就可以了呢?这个原因和remove()方法的处理逻辑有关系。

2.3.3.2、remove()方法

remove()方法的作用是删除Itr迭代器上一次从next()方法获取数据时,其索引位上的数据(lastRet索引位上的数据)——真正的从ArrayBlockingQueue队列中删除。一定要注意不是删除当前cursor游标指向的索引位上的数据。

虽然我们在使用ArrayBlockingQueue队列的Itr迭代器时,不会习惯于使用其中的remove()方法,从ArrayBlockingQueue队列中移除上次调用next()方法时返回的索引位上的数据信息;虽然java.util.Iterator接口默认为该接口的各种迭代器实现类不支持remove()方法,但确实ArrayBlockingQueue队列的Itr迭代器支持调用者有限使用remove()方法。所以我们还是要需要进行介绍,作为对上文中已详细介绍的hasNext()方法、next()方法的补充:

// java.util.Iterator接口默认为该接口的实现不支持remove()方法
public interface Iterator<E> 
  // ......
  default void remove() 
    throw new UnsupportedOperationException("remove");
   
  // ......
 

以下是remove()方法的代码片段:

private class Itr implements Iterator<E> 
  // ......
  // remove方法的操作意义并不是移除当前cursor游标所指向的索引位上的数据
  // 而是移除上一次通过next()方法返回的索引位上的数据,也就是当前lastRet所指向的索引位上的数据
  public void remove() 
    final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    // assert lock.getHoldCount() == 1;
    try 
      // 同样,获取操作权后,首先通过incorporateDequeues()方法
      if (!isDetached()) 
        incorporateDequeues(); // might update lastRet or detach
      
      // 设定Itr中的全局lastRet变量为NONE,以表示该位置的数据将被移除
      // 设定前,将该值存储到局部变量中,以便后续使用
      final int lastRet = this.lastRet;
      this.lastRet = NONE;
      if (lastRet >= 0) 
        // 如果lastRet的索引位有效,且Itr迭代器有效,则移除ArrayBlockingQueue队列中lastRet索引位上的数据
        if (!isDetached()) 
          removeAt(lastRet);
         
        // 如果lastRet的索引位有效,但Itr迭代器无效,
        // 则移除ArrayBlockingQueue队列中lastRet索引位上的数据
        // 还要取消lastItem对数据对象的引用
        else 
          final E lastItem = this.lastItem;
          // assert lastItem != null;
          this.lastItem = null;
          if (itemAt(lastRet) == lastItem) 
            removeAt(lastRet);
          
        
       
      // 如果lastRet已被标识为无效
      // 出现这种情况的场景最有可能是Itr迭代器创建时ArrayBlockingQueue队列中没有任何数据
      // 或者是Itr迭代器创建后,虽然有数据可以遍历,但是还没有使用next()方法读取任何索引位上的数据
      // 这是抛出IllegalStateException异常
      else if (lastRet == NONE) 
        throw new IllegalStateException();
      
      // else lastRet == REMOVED and the last returned element was
      // previously asynchronously removed via an operation other
      // than this.remove(), so nothing to do.
      // 以上这段注释是源代码中Doug Lea书写的注释,大意是说
      // 如果lastRet已被标识为“已移除”(REMOVED)状态
      // 说明这个索引位上数据已经被之前获得操作权的其它操作线程移除队列,所以这里无需做任何处理了
      // 最后,如果以下条件成立,则标识当前Itr迭代器无效
      if (cursor < 0 && nextIndex < 0) 
        detach();
      
     finally 
      lock.unlock();
      // assert lastRet == NONE;
      // assert lastItem == null;
    
  
  // ......

关于ArrayBlockingQueue队列的removeAt(int)方法,我们将在下文中进行介绍。现在我们对本小节开始时的两个问题进行解答:

  • 为什么在noNext()方法开始处理前、remove()方法开始处理前都要通过incorporateDequeues()方法修正各索引位的值?

这就是上文中已经提到的Itr迭代器隐含的处理规则。因为ArrayBlockingQueue、ArrayBlockingQueue.itr的设计者始终考虑的是前者的工作场景是被多线程并发操作的,在本Itr迭代器获取到操作权之前,不能保证当前Itr迭代器的关键索引位置都处于正确的值。所以都需要通过incorporateDequeues()方法进行修正。

  • 为什么noNext()方法处理过程中,还要试图在取出lastRet索引位上的数据后,才设定迭代器失效?

这是因为虽然最后一次有效使用next()方法遍历了ArrayBlockingQueue队列中的最后一个数据,但是还要考虑支持操作者这时通过remove()方法删除这个最后索引位上的数据,所以需要在hasNext()方法中将这个索引位上的数据取出并引用起来,以便在进行remove操作时正确删除数据。

========
(接后文《源码阅读(34):Java中线程安全的Queue、Deque结构——ArrayBlockingQueue(4)》)

以上是关于源码阅读(33):Java中线程安全的QueueDeque结构——ArrayBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章

源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

源码阅读(32):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

源码阅读(32):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

源码阅读(39):Java中线程安全的QueueDeque结构——LinkedTransferQueue

源码阅读(39):Java中线程安全的QueueDeque结构——LinkedTransferQueue