源码阅读(31):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

Posted 说好不能打脸

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码阅读(31):Java中线程安全的QueueDeque结构——ArrayBlockingQueue相关的知识,希望对你有一定的参考价值。

1、概述

ArrayBlockingQueue是一种经常使用的线程安全的Queue结构,上文也已经提过,它是一种内部基于数组的,使用在高并发场景下的阻塞队列,也是一种容量有界的队列。该队列符合先进先出(FIFO)的工作原则,也就是说该队列头部的元素是最先进入队列集合的,也是最先被调用者取出的元素;该队列尾部的元素是最后进入队列集合的,也是按时间顺序会最后被调用者取出的元素。

在多线程同时读写ArrayBlockingQueue队列集合中的元素时,该队列还支持一种公平性策略,这是一种为生产者/消费者工作模式提供的配置模式(可以把ArrayBlockingQueue队列集合的读取操作线程看成消费者角色;把写入操作线程看成生产者角色),一旦启用了这个配置,则ArrayBlockingQueue队列会分别保证多个生产者线程和多个消费者线程获取ArrayBlockingQueue操作权限的顺序——先请求操作的线程会先获得操作权限(后文会给出示例)。

该队列的公平性策略实际上基于ReentrantLock——基于AQS机制的可重入锁的公平性功能,下面我们描述几种使用ArrayBlockingQueue的基本场景。

1.1、ArrayBlockingQueue的最基础使用

// ......
// 设置一个最大容量为5的队列
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
// 前文已经介绍过,add方法是在Queue接口定义的方法(BlockingQueue接口中有重复的定义)
// 其作用没有阻塞线程的特性,如果add方法发现队列集合的容量已经达到上限,就会抛出异常
queue.add("1");
queue.add("2");
queue.add("3");
queue.add("4");
try 
  // put方法是在BlockingQueue接口中定义的方式,它有阻塞线程的特点
  // 如果put方法发现队列集合的容量达到上限,就会阻塞线程
  queue.put("5");
  // 也就是说该代码的工作位置,线程将被阻塞
  queue.put("6");
 catch(InterruptedException e) 
  Thread.currentThread().interrupt();
  e.printStackTrace(System.out);

// ......  

以上代码所示,是一种在单个线程场景下,演示的ArrayBlockingQueue队列集合的使用方式。当然按照之前内容的介绍,我们并不推荐在单线程场景下使用实现了BlockingQueue接口的集合,正式的业务编码活动中,完全可以使用ArrayList这样的集合进行替代。

1.2、在多线程操作场景下使用ArrayBlockingQueue

包括ArrayBlockingQueue在内的所有实现了BlockingQueue接口的队列集合,其多线程下的典型使用场景是生产者和消费者操作场景——由一个或多个生产者进行数据生产,然后按照队列特性放入队列中,并由另一个或多个消费者从队列中取出后进行处理。而不同的阻塞式队列对生产者如何放入元素、元素在队列中如何排列、消费者如何取出元素的规则都有不同的约定特点。

例如ArrayBlockingQueue对于生产者如何放入队列的规定就可以满足,如果当前队列中没有多余的空间可供生产者们向队列添加元素,那么生产者就可以被阻塞起来,直到队列中有新的空间出现;另外该队列还提供在出现上述场景时,生产者直接抛出异常的处理方式。

下面我们看一下基于ArrayBlockingQueue实现生产者和消费者的典型代码:

  • 生产者线程代码:
/**
 * 生产者,非常简单,不用过多注释说明
 * @author yinwenjie
 */
public static class Producer implements Runnable 
  // 生产者生产的数据,将放入该队列
  private BlockingQueue<String> queue;
  
  public Producer(BlockingQueue<String> queue) 
    this.queue = queue;
  

  @Override
  public void run() 
    String uuid = UUID.randomUUID().toString();
    int count = 0;
    while(count++ < Integer.MAX_VALUE) 
      // 如果不能添加到队列,则本生产者线程阻塞等待
      try 
        this.queue.put(uuid);
       catch (InterruptedException e) 
        e.printStackTrace(System.out);
      
    
  

  • 消费者线程代码
/**
 * 消费者,代码也很简单,也不用过多注释说明
 * @author yinwenjie
 */
public static class Consumer implements Runnable 
  // 消费者将从该队列中取出数据进行处理
  private BlockingQueue<String> queue;
  
  public Consumer(BlockingQueue<String> queue) 
    this.queue = queue;
  
  
  @Override
  public void run() 
    int count = 0;
    while(count++ < Integer.MAX_VALUE) 
      try 
        String value = this.queue.take();
        // 这里省略处理过程
       catch (InterruptedException e) 
        e.printStackTrace(System.out);
      
    
  

请注意,以上给出的生产者代码和消费者代码适用于多种实现了BlockingQueue接口的队里集合的讲解,后续文章中我们不会再给出生产者和消费者的基础代码。接下来使用以下代码将生产者和消费者启动起来。

  • 将消费者和生产者启动起来
// ......
// 本文建议使用线程池管理线程,而不是直接创建Thread对象
ThreadPoolExecutor serviceExecutor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
// 这个队列用来承载连接生成者和消费者的数据关系,队列容量上限100
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
// 提交5个生产者和5个消费者
serviceExecutor.submit(new Producer(queue));
serviceExecutor.submit(new Producer(queue));
serviceExecutor.submit(new Producer(queue));
serviceExecutor.submit(new Producer(queue));
serviceExecutor.submit(new Producer(queue));
serviceExecutor.submit(new Consumer(queue));
serviceExecutor.submit(new Consumer(queue));
serviceExecutor.submit(new Consumer(queue));
serviceExecutor.submit(new Consumer(queue));
serviceExecutor.submit(new Consumer(queue));
// ......

线程池本身的使用,并不在本文描述的内容范围内,如果需要进行详细了解,可以查看此篇文章《线程基础:线程池(5)——基本使用(上)》、《线程基础:线程池(6)——基本使用(中)》、《线程基础:线程池(7)——基本使用(下)

1.3、使用ArrayBlockingQueue的公平性策略

在上一小节的代码中,当队列集合没有多余的存储空间时,所有生产者线程都将先后进入阻塞状态,并在队列集合有空余位置时被唤醒,但是ArrayBlockingQueue队列集合并不保证线程唤醒的公平性,也就是说队列集合并不保证最先进入阻塞状态的生产者线程最先被唤醒

如果在一些特定的业务场景下,需要保证生产者线程和消费者线程的公平性原则,则需要启用ArrayBlockingQueue的公平性策略,如下方式所示:

// ......
// if true then queue accesses for threads blockedon insertion or removal, are processed in FIFO order;
// if false the access order is unspecified.
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100, true);
// ......

2、工作原理

实际上ArrayBlockingQueue是一个可循环使用数组空间的有界、阻塞队列,使用可复用的环形数组进行数据记录。其内部使用一个takeIndex变量代表队列头(队列头可在数组的任何有效索引位),使用一个putIndex变量代码队列尾(队列为可不是数组最后一个索引位);从takeIndex到putIndex的索引位置,是数组中已经放置了元素的位置,从putIndex到takeIndex的索引位置是数组中还可以放置新的元素的位置。

这句话是不是不好理解,请看以下原理图:

从上图可以看出ArrayBlockingQueue的数组构成了一个环形结构,使得数组本身“首尾相连”。takeIndex变量指向的索引位是下一个将被取出的元素索引位,putIndex变量指向的索引位是下一个将被添加的元素索引位。为了支撑这个环形结构的工作,ArrayBlockingQueue队列集合还使用了很多辅助的变量信息。如下小节所述:

2.1、ArrayBlockingQueue的主要变量

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable 
  // 这个数据就是ArrayBlockingQueue队列集合用来存储数据的数组
  final Object[] items;
  // 该变量指向的索引位上的元素是下一次将从队列集合中移除的元素,这个移除操作可以是take, poll, peek 或者 remove。
  int takeIndex;
  // 该变量指向的索引位是下一个添加到队列的元素存储的索引位,这个添加操作可以是 put, offer 或者 add
  int putIndex;
  // 该变量标示当前在队列集合中的总的元素数量
  int count;
  // ArrayBlockingQueue队列使用基于AQS机制的可重入锁ReentrantLock进行线程安全性控制
  // 并采用双条件控制方式对移除、添加操作进行交互控制
  final ReentrantLock lock;
  // 控制着移除操作条件
  private final Condition notEmpty;
  // 控制着添加操作条件
  private final Condition notFull;
  // ArrayBlockingQueue的迭代器(以及后续几个队列集合的迭代器)很有趣,我们将在后文专门剖析
  transient Itrs itrs = null;

这里特别注意两个Condition对象,notEmpty条件对象负责在队列集合变为非空的场景下,进行生产者线程和消费者线程的工作协调,具体来说就是给消费者线程发信号,告诉它们线程队列集合中又有新的数据可以取出了;notFull条件对象负责在队列集合变为非满的场景下,进行生产者线程和消费者线程的工作协调,具体来说就是给生产者线程发信号,告诉它们线程队列集合中又有新的索引位可以放置新的数据了。

2.2、环形队列的入队和出队过程

实际上在ArrayBlockingQueue队列集合中负责向队列添加数据的方法只有一个,就是enqueue()方法;向队列移除数据的方法也只有一个,就是dequeue()方法。基本上ArrayBlockingQueue队列向外暴露的操作方法,都是对上述两个方法的封装调用。所以我们首先需要剖析这两个方法:

  • enqueue()方法
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  implements BlockingQueue<E>, java.io.Serializable 

  // ......
  /**
   * 该方法负责在putIndex变量指定的索引位置添加新的数据
   * 该方法内部虽然没有做线程安全性的操作,但是对该方法的调用者都有“持有锁”的要求:
   * Call only when holding lock.
   */
  private void enqueue(E x) 
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    // 将入参的x元素添加到指定的数组索引位置
    items[putIndex] = x;
    // 添加后,如果下一个索引位超出了边界,则索引位置重新指向0
    if (++putIndex == items.length)
      putIndex = 0;
    // 集合总数据量的计数器 + 1
    count++;
    // 发出信号,帮助那些在集合为空时处于阻塞状态的线程(消费者线程),解除阻塞状态
    notEmpty.signal();
  
  // ......  

这个方法很简单,将新的元素添加到队列集合的操作过程归纳为一句话就是,在putIndex的索引位置放入新的元素,并将putIndex指向的索引位向后移动一位,如果移动后超出数组边界,则重新指向0号索引位。

  • dequeue()方法
// TODO 继续增加注释
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  implements BlockingQueue<E>, java.io.Serializable 

  // .....
  /**
   * 该方法负责从takeIndex指向的索引位移除一个元素
   * 该方法内部虽然没有做线程安全性的操作,但是对该方法的调用者都有“持有锁”的要求:
   * Call only when holding lock.
   */
  private E dequeue() 
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    // 将已经移除了数据的索引位置为null,以便帮助可能的GC动作
    items[takeIndex] = null;
    // 移除后,如果下一个索引位超出了边界,则索引位置重新指向0
    if (++takeIndex == items.length)
      takeIndex = 0;
    // 集合总数据量的计数器 - 1
    count--;
    // 如果存在迭代器(们),则迭代器也需要进行数据清理
    if (itrs != null)
      itrs.elementDequeued();
    // 发出信号,帮助那些在集合已满时进入阻塞状态的线程(生产者线程),解除阻塞状态
    notFull.signal();
    return x;
  
  // .....

这个方法也很简单,将队列集合移除数据的操作过程归纳为一句话就是:在takeIndex的索引位的数据将被移除,并将takeIndex指向的索引位向后移动一位,如果移动后超出数组边界,则重新指向0号索引位。另外,再次强调,enqueue方法和dequeue方法都没有做线程安全性控制,而需要这两个方法的调用者自行控制线程安全。

============================
(接下文《源码阅读(32):Java中线程安全的Queue、Deque结构——ArrayBlockingQueue(2)》)

以上是关于源码阅读(31):Java中线程安全的QueueDeque结构——ArrayBlockingQueue的主要内容,如果未能解决你的问题,请参考以下文章

源码阅读(32):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

源码阅读(32):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

源码阅读(39):Java中线程安全的QueueDeque结构——LinkedTransferQueue

源码阅读(39):Java中线程安全的QueueDeque结构——LinkedTransferQueue

源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue

源码阅读(34):Java中线程安全的QueueDeque结构——ArrayBlockingQueue