源码阅读(32):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

Posted 说好不能打脸

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码阅读(32):Java中线程安全的QueueDeque结构——ArrayBlockingQueue相关的知识,希望对你有一定的参考价值。

(接上文《源码阅读(31):Java中线程安全的Queue、Deque结构——ArrayBlockingQueue(1)》)

本篇内容我们专门分析ArrayBlockingQueue中迭代器的工作情况,ArrayBlockingQueue迭代器非常有阅读意义,是java集合框架中比较有代表性的结构之一。

2.3、ArrayBlockingQueue的迭代器

2.3.1、迭代器的使用和产生的问题

在进行ArrayBlockingQueue迭代器的使用讲解前,我们先来看看ArrayBlockingQueue迭代器的基本使用。

// ......
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(20);
// ......
queue.add("4");
queue.add("5");
queue.add("6");
// ......
boolean removed = queue.remove("5");
System.out.println("移除操作是否有效:" + removed);
// 新建一个迭代器
Iterator<String> itr = queue.iterator();
while(itr.hasNext()) 
  System.out.println(itr.next());

System.out.print("队列中还有元素吗:" + !queue.isEmpty());
// 这又是另一个新建的迭代器
itr = queue.iterator();
while(itr.hasNext()) 
  // 取得下一个数据
  String nextItem = itr.next();
  // 删除ArrayBlockingQueue中,最后一次使用next()方法获取的索引位上的数据
  itr.remove();
  System.out.println("//====被移除的元素是:" + nextItem);

System.out.print("队列中还有元素吗:" + !queue.isEmpty());
// ......

以下是这个代码片段的输出结果:

移除操作是否有效:true
1
2
3
4
6
7
队列中还有元素吗:true
//====被移除的元素是:1
//====被移除的元素是:2
//====被移除的元素是:3
//====被移除的元素是:4
//====被移除的元素是:6
//====被移除的元素是:7
队列中还有元素吗:false

这里要说明的是,虽然大部分资料(包括本文)都在试图给读者说明ArrayBlockingQueue是一种符合FIFO规则的有界阻塞队列,但在一些操作场景下,ArrayBlockingQueue也并不绝对遵循FIFO规则——我们可以在ArrayBlockingQueue非头部和尾部的某个索引位置移除一个元素,有以下几种方式:

1、调用ArrayBlockingQueue的remove(Object)方法,从队列的某个位置移除和入参对象“相等”的一个元素,这个方法是由java.util.AbstractCollection抽象类定义。使用方式如下所示:

// ......
// 移除指定的元素,无论它处于队列的哪一个索引位
boolean removed = queue.remove("5");
// ......

2、使用ArrayBlockingQueue的迭代器,并调用迭代器中的remove()方法。后者这个方法可以将移除ArrayBlockingQueue队列中通过本itr迭代器的next()方法最后一次获取到的索引位上的数据,如下所示:

// ......
while(itr.hasNext()) 
  String nextItem = itr.next();
  // 删除ArrayBlockingQueue中,最后一次使用next()方法获取的索引位上的数据
  itr.remove();

// ......

3、另外,有的读者会提到drainTo(Collection , int)方法,该方法将当前队列集合中指定数量的元素移入指定集合,并在当前队列集合中进行删除。这个方法实际上是从队列头(takeIndex)的索引位开始操作的,所以严格意义上还是有细微差异,但是该方法确实会对造成迭代器工作不准确。如下所示:

// ......
ArrayBlockingQueue<String> source = new ArrayBlockingQueue<>(20);
source.add("1");
source.add("2"); 
// ......
source.add("7");
HashSet<String> targetc = new HashSet<>();
source.drainTo(targetc , 3);
// ......

在多线程场景下,各迭代器可能由多个线程同时进行操作,这就导致以下几种可能性:

  • 某迭代器进行了ArrayBlockingQueue队列移除操作,但是另外的迭代器却并不知道,后者依然按照原来ArrayBlockingQueue队列中各索引位位的情况进行读写/遍历操作。

  • ArrayBlockingQueue队列本身发生了数据新增/移除操作,但是多有迭代器都不知道,后者依然按照依然按照原来ArrayBlockingQueue队列中各索引位位的情况进行读写/遍历操作。

加之ArrayBlockingQueue队列内部是一个可循环利用的环形数组,这就使得迭代器在工作时,只是利用ArrayBlockingQueue队列自身的状态情况,很难识别ArrayBlockingQueue队列中的数据是否发生了变化。例如,当以下示意图的情况发生时,我们能肯定ArrayBlockingQueue队列在两次next()方法执行的间隙没有发生变化吗?

如上图所示,在itr迭代器两次调用next()方法之间,另外的线程操作ArrayBlockingQueue队列进行了多次数据添加/移除操作,但由于ArrayBlockingQueue队列内部环形数组的原因,其takeIndex索引、putIndex索引、count数值均没有发生变化。但ArrayBlockingQueue队列中的实际数据已经全变了,takeindex已经在环形数组中“绕场一圈”。

2.3.2、迭代器工作原理概述

由于ArrayBlockingQueue队列的特殊结构,以及上述需要保证的各种特殊工作场景(各种多线程操作,多种数据移除操作),导致Itr迭代器比较复杂——复杂到本专题需要专门花1-2篇文章篇幅,对这个迭代器和其工作原理进行详细介绍。

2.3.2.1、迭代器组

ArrayBlockingQueue为了管理一个和多个迭代器,专门设立了一个Itrs迭代器组的概念,除了detached(独立/无效)工作模式下的迭代器外,ArrayBlockingQueue队列中目前所有正在被使用的迭代器都基于Itrs迭代器组构造成一个单向链表结构,列表中的每个节点使用“弱引用”方式进行对象引用。如下图所示:

迭代器和迭代器组的工作目标是,尽可能正确的完成ArrayBlockingQueue队列中所有数据的遍历操作,而不是,在数据出现遍历差异时,尽可能将迭代器设定独立/无效工作模式。当每次ArrayBlockingQueue发生“取数”操作时,当每次有新的迭代器创建时,Itrs迭代器组都要进行相关判定和维护,以保证所有迭代器的一致性,并对无效/无法维护的迭代器进行清理。

所谓“取数”操作是概指那些需要从ArrayBlockingQueue移除数据的操作,包括:所有需要调用ArrayBlockingQueue.dequeue()方法的操作(例如poll()、take()这些方法)、所有需要调用ArrayBlockingQueue.removeAt(int)方法的操作(例如remove(Object)这样的方法)、以及进行数据批量移除的操作(例如drainTo(Collection)、drainTo(Collection, int )这样的方法)。

detached(独立/无效)工作模式是指,在特定场景下创建的没有任何数据可以遍历的迭代器,或者已经完成所有数据遍历的迭代器。例如当ArrayBlockingQueue队列集合没有任何数据时创建的迭代器。这类迭代器不能遍历任何数据,也就不涉及到要保证遍历时索引位正确性的需求。所以这类“独立/无效”工作模式的迭代器无需加入到迭代器管理组进行管理。

以下是Itrs迭代器组中的重要属性定义,有了这些属性定义,我们就可以为所有在工作中的Itr迭代器扩展ArrayBlockingQueue队列所需的描述了:

// ......
class Itrs 
  /**
   * Node in a linked list of weak iterator references.
   * 这是Itrs迭代器组的一个Node节点定义
   */
  private class Node extends WeakReference<Itr> 
    // next属性指向Itrs迭代器组单向链表中的下一个Node节点
    Node next;
    // 每一个Node节点都弱引用一个iterator迭代器(如上图所示)
    Node(Itr iterator, Node next) 
      super(iterator);
      this.next = next;
    
  
  
  /** 
   * Incremented whenever takeIndex wraps around to 0 
   * 该属性非常重要,它记录takeIndex索引重新回到0号索引位的次数
   * 由此来描述takeIndex索引位的“圈数”
   */
  int cycles;

  /** 
   * Linked list of weak iterator references 
   * 这是Itrs迭代器的第一个Node节点的,以便进行整个单向链表的构建、遍历和管理
   * */
  private Node head;

  /** 
   * Used to expunge stale iterators 
   * Itrs迭代器组在特定的场景下会进行Node单向链表的清理,该属性表示上次一清理到的Node节点
   * 以便在下一次清理时使用(不用回到head处重新遍历了)
   * */
  private Node sweeper;
 
// ......

2.3.2.2、为什么要使用“弱引用”来构建Itrs单向链表

在之前的文章中,我们已经介绍过Java中四种引用类型,以及每种类型的工作特点。被“弱引用”的对象在GC回收器进行可回收扫描时,若发现该对象只有“弱引用”可达时,就会将该对象进行回收。如下图所示:

弱引用的使用场景可以概括归纳为:在被监控对象被其引用者设置为null时,便于该对象相关监控设施的回收。如上图所示:A对象的强引用来自于C对象和D对象,读者可以理解为A、C、D三个对象都是用于处理业务逻辑的对象,而A对象的弱引用来自于B对象,后者引用A对象只是为了实时收集A对象的当前数值状态,以便周期性的写入日志系统

正常情况上来说,A、C、D对象都应该在业务逻辑完成后被GC回收,判定标准就是A、C、D对象引用的不可达,如下图所示:

也就是说一旦A对象引用不可达,就说明A对象可以被GC回收了,但这时如果B对象引用A也是强引用形式,就会导致A对象不能被GC回收器回收。这种情况下,我们就需要设定B对象对A对象的引用是一张弱引用,以便保证A对象在所有强引用都不可达时,能够被GC回收器回收。

以上的示例可以非常贴合的换成我们现在正在讨论的场景:A对象就是我们这里讨论的迭代器,B对象就是ArrayBlockingQueue队列集合中用于监管迭代器运行的Itrs迭代器组中的Node节点对象,C和D对象就是创建并使用迭代器的两个工作对象。

当C、D对象完成了迭代器使用后,将迭代器对象的引用置为null,就此断开和迭代器对象的引用关系(甚至C、D对象本身也不再可达),当GC进行内存清理时,就会将迭代器对象进行清理,而不会考虑Itrs迭代器组中的Node节点是否依然引用了这个迭代器对象。

2.3.2.3、Itr迭代器中的主要属性

Itrs迭代器组我们进行了简要的介绍,接着我们来节点Itr迭代器。要理解Itr迭代器的工作原理,我们就需要首先理解Itr迭代器的主要定义过程,如下代码片段所示:

// ......
private class ArrayBlockingQueue.Itr implements Iterator<E> 
  // ......
  /** Index to look for new nextItem; NONE at end */
  // 当前游标索引位
  private int cursor;
  /** Element to be returned by next call to next(); null if none */
  // 专门为支持hashNext方法和next方法配合所使用的属性,用于在调用next方法返回数据
  private E nextItem;
  /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
  // 专门为支持hashNext方法和next方法配合所使用的属性,记录调用next方法返回数据的索引位
  private int nextIndex;
  /** Last element returned; null if none or not detached. */
  // 最后一次(上一次)迭代器遍历操作时返回的元素
  private E lastItem;
  /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
  // 最后一次(上一次)迭代器遍历操作时返回的元素的索引位
  private int lastRet;
  /** Previous value of takeIndex, or DETACHED when detached */
  // 该变量表示本迭代器最后一次(上一次)从ArrayBlockingQueue队列中获取到的takeIndex索引位
  // 该属性还有一个重要的作用,用来表示当前迭代器是否是“独立”工作模式(或者迭代器是否失效)
  private int prevTakeIndex;
  /** Previous value of iters.cycles */
  // 最后一次(上一次)从ArrayBlockingQueue队列获取的takeIndex索引位回到0号索引位的次数
  // 这个值非常重要,是判定迭代器是否有效的重要依据
  private int prevCycles;
  /** Special index value indicating "not available" or "undefined" */
  private static final int NONE = -1;
  /**
   * Special index value indicating "removed elsewhere", that is,
   * removed by some operation other than a call to this.remove().
   */
  // 该常量表示索引位所表示的值已经被remove()方法以外的操作移除
  private static final int REMOVED = -2;
  /** Special value for prevTakeIndex indicating "detached mode" */
  // 该常量值赋值到prevTakeIndex,以表示当前迭代器变成“独立”(无效)工作模式
  private static final int DETACHED = -3;
  // ......

// ......

特别注意以上三个常量NONE、REMOVED和DETACHED,这三个常量分别代表索引位的三种状态:

  • NONE:一般用来表示指定的索引位已完成任务或者不可用(主要用于Itr迭代器的lastRet索引、nextIndex索引);
  • REMOVED:一般用来表示指定的索引位上的元素已经被其它线程的操作移除(用于Itr迭代器的lastRet索引、nextIndex索引)
  • DETACHED:一般标识在prevTakeIndex变量上,表示当前迭代器为“独立/无效”工作模式(主要用于Itr迭代器的prevTakeIndex索引)。

另外通过以上Itr迭代器属性定义的描述可知,为了保证迭代器操作的正确性,Itr迭代器除了记录当前游标位置外,还完整记录了迭代器开始遍历的索引位置和next()方法将要返回的下一个元素(包括索引位置和对象)。

以上保证迭代器正确工作的一个典型场景就是迭代器的next()方法和hasNext()方法进行配合使用时——迭代器可以保证调用hasNext()方法返回true时,next()方法一定不会返回null。

2.3.2.4、Itr迭代器的实例化过程

以下代码片段描述了Itr迭代器的实例化过程:

// ......
Itr() 
  // assert lock.getHoldCount() == 0;
  lastRet = NONE;
  final ReentrantLock lock = ArrayBlockingQueue.this.lock;
  // 进行迭代器的初始化,也需要获取操作锁
  lock.lock();
  try 
    // 如果当前条件成立,说明这时ArrayBlockingQueue队列集合没有任何元素
    // 这时将当前迭代器作为独立模式进行创建。
    if (count == 0) 
      // assert itrs == null;
      // 当前游标索引无意义
      cursor = NONE;
      // 下一迭代索引位无意义
      nextIndex = NONE;
      // 使用该变量,标识当前迭代器独立工作,无需注册到Itrs迭代器组中
      prevTakeIndex = DETACHED;
     else 
      // 将当前ArrayBlockingQueue队列集合的takeIndex值(下一个取数索引位)
      // 记录到prevTakeIndex变量中,作为当前迭代器开始遍历的索引位置
      final int takeIndex = ArrayBlockingQueue.this.takeIndex;
      prevTakeIndex = takeIndex;
      // 取出当前开始遍历的索引位上的数据,记录到nextItem变量中(nextIndex值也同时设定),作为将要调用的next()方法的返回值
      nextItem = itemAt(nextIndex = takeIndex);
      // 确定下一个游标位(incCursor(int)方法很重要,具体过程请参见该方法上的注释)
      // 迭代器初始化时,第一个游标位是takeIndex索引位的下一个索引位
      // 这是因为遍历起始索引位已经记录在了prevTakeIndex变量中
      cursor = incCursor(takeIndex);
      // 通过以上过程,迭代器的初始化过程基本完成,现在将这个迭代器对象注册到Itrs迭代器组中
      // 如果Itrs迭代器组还没有初始化,则进行Itrs组的初始化,并将当前迭代器对象作为Itrs迭代器组的第一个Node节点
      if (itrs == null) 
        itrs = new Itrs(this);
      
      // 其它情况则将当前迭代器注册到Itrs迭代器组中,并清理Itrs迭代器组中过期/无效的迭代器节点。
      else 
        itrs.register(this); // in this order
        itrs.doSomeSweeping(false);
      
      // Itrs迭代器组中最重要的一个数值就是当前ArrayBlockingQueue队列集合takeIndex变量通过循环数组0号索引位的次数
      // 这个次数记录在Itrs迭代器组的cycles变量中,前者将在这里被赋值给迭代器的prevCycles变量
      prevCycles = itrs.cycles;
      // assert takeIndex >= 0;
      // assert prevTakeIndex == takeIndex;
      // assert nextIndex >= 0;
      // assert nextItem != null;
    
   finally 
    lock.unlock();
  


// 该私有方法根据ArrayBlockingQueue队列集合的固定长度和状态
// 确定下一个游标索引值。
private int incCursor(int index) 
  // 有几种情况:
  // a、如果下一个索引值等于当前队列容量,说明当前遍历的位置跨过了环形数组的0号索引位,这时设置下一游标位置为0
  // b、如果下一个索引值等于ArrayBlockingQueue队列putIndex索引值,说明已经没有能遍历的数据了,这时设置下一游标位置为NONE
  // c、其它情况下,index简单+1,就是下一个游标位置
  // assert lock.getHoldCount() == 1;
  if (++index == items.length)
    index = 0;
  if (index == putIndex)
    index = NONE;
  return index;

// ......

以上代码片段同样使用了非常详细的注释进行说明,并且可以通过以下示意图进行描述(注意,以下示意图只描述了count != 0的情况,而count == 0的情况很简单,这里就不再进行示意图的描述了):

上文中我们创建了一个新的Itr迭代器,由于这时ArrayBlockingQueue队列的中存在数据,所以新创建的Itr迭代器就不是“独立/无效”工作模式,而是需要加入到Itrs迭代器组中进行管理的迭代器。在创建过程中,迭代器的prevTakeIndex属性、nextIndex属性将被赋值为当前ArrayBlockingQueue队列takeIndex属性的值,其nextItem属性将会引用takeIndex索引位上的数据对象,其cursor属性将会指向takeIndex索引位的下一个索引位(incCursor(int)方法将负责修正索引值)。

============
(接下文《源码阅读(33):Java中线程安全的Queue、Deque结构——ArrayBlockingQueue(3)》)

以上是关于源码阅读(32):Java中线程安全的QueueDeque结构——ArrayBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章

源码阅读(33):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

源码阅读(33):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

源码阅读(39):Java中线程安全的QueueDeque结构——LinkedTransferQueue

源码阅读(39):Java中线程安全的QueueDeque结构——LinkedTransferQueue

源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue