关于JAVA BlockingQueue的工作原理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于JAVA BlockingQueue的工作原理相关的知识,希望对你有一定的参考价值。

我想问下
BlockingQueue的方法是类似ConcurrentLinkedQueue那样的并行,还是synchronizedCollection那样的同步。换句话说,BlockingQueue可以支持几个线程同时持有“写锁”,还是使用内部锁而每次只能有一个线程持有

参考技术A BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)没有 必要自动执行,除非在实现中特别说明。因此,举例来说,在只添加了 c 中的一些元素后,addAll(c) 有可能失败(抛出一个异常)。

Java多线程:BlockingQueue实现原理(Condition原理)

文章目录

BlockingQueue原理

BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。

它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于put与take操作的原理是类似的。

下面以ArrayBlockingQueue源码为例,来说明BlockingQueue的实现原理

ArrayBlockingQueue源码分析

首先看一下ArrayBlockingQueue的构造函数,它初始化了put和take函数中用到的关键成员变量,这两个变量的类型分别是ReentrantLock和Condition:

  • ReentrantLock实例:是AbstractQueuedSynchronizer(AQS)的子类,用于实现锁同步机制
  • Condition实例:newCondition函数返回的Condition实例,是定义在AQS类内部的ConditionObject类,该类可以直接调用AQS相关的函数。在这用于实现等待队列机制

构造函数源码如下:

public ArrayBlockingQueue(int capacity, boolean fair) 
    if (capacity <= 0)
    	throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();

put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。

put函数的源码如下所示:

public void put(E e) throws InterruptedException 
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try 
    while (count == items.length)
        	notFull.await();
        enqueue(e);
     finally 
    	lock.unlock();
    

我们会发现put函数使用了wait/notify的机制。

与一般生产者-消费者的实现方式不同,同步队列使用ReentrantLock和Condition相结合的机制,即先获得锁,再等待,而不是synchronized和wait的机制。

再来看一下消费者调用的take函数,take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素

take函数源码如下:

public E take() throws InterruptedException 
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try 
        while (count == 0)
        	notEmpty.await();
        return dequeue();
     finally 
    	lock.unlock();
    

问题引出:

我们发现 ArrayBlockingQueue 并没有使用 Object.wait ,而是使用的 Condition.await ,这是为什么呢?

当调用 Condition对象的 await方法后,当前线程会释放锁并等待,而其他线程调用 Condition 对象的 signal 或者 signalall 方法通知并被阻塞的线程,然后自己执行unlock释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁。

可以发现 Condition 对象可以提供和 Object 的 wait 和 notify 一样的行为,但是后者必须先获取synchronized 这个内置的 monitor 锁才能调用( wait 和 notify 必须在synchronized 代码块中使用),而 Condition 是先获取 ReentrantLock 。这两种方式在阻塞等待时都会将相应的锁释放掉,但是 Condition 的等待可以中断,这是二者唯一的区别

下面我们来看一下Condition的实现原理:

Condition原理

在 Java 程序中,任意一个 Java 对象,都拥有一组监视器方法(定义在java.lang.Object 类上),主要包括 wait()、wait(long)、notify()、notifyAll() 方法,这些方法与 synchronized 关键字配合,可以实现等待 / 通知模式。Condition 接口也提供了类似 Object 的监视器方法,与 Lock 配合可以实现等待 / 通知模式

但是这两者在使用方式以及功能特性上还是有区别的。

Object 的监视器方法与 Condition 接口对比如下:

对比项Object 监视器方法Condition
前置条件获取对象的监视器锁(synchronized关键字)调用 Lock.lock() 获取锁调用 Lock.newCondition() 获取 Condition 对象
调用方法直接调用如:object.wait()直接调用如:condition.await()
等待队列个数一个多个
当前线程释放锁并进入等待队列支持支持
当前线程释放锁并进入等待队列,在等待状态中不响应中断不支持支持
当前线程释放锁并进入超时等待状态支持支持
当前线程释放锁并进入等待状态到将来的某个时间不支持支持
唤醒等待队列中的一个线程支持支持
唤醒等待队列中的全部线程支持支持

Condition 提供了一系列的方法来对阻塞和唤醒线程,Condition 接口源码如下:

public interface Condition 
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();

各个方法功能如下:

  • void await() throws InterruptedException:当前线程进入等待状态直到被通知(signal)或中断。
  • void awaitUninterruptibly():当前线程进入等待状态直到被通知,该方法不响应中断。
  • long awaitNanos(long nanosTimeout) throws InterruptedException:当前线程进入等待状态直到被通知、中断或者超时,返回值表示剩余超时时间。
  • boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知、中断或者到某个时间。如果没有到指定时间就被通知,方法返回 true,否则,表示到了指定时间,返回 false。
  • void signal():唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关联的锁。
  • void signalAll():唤醒所有等待在 Condition 上的线程,能够从等待方法返回的线程必须获得与 Condition 相关联的锁。

Condition 的实现类

可以通过 Lock.newCondition() 方法获取 Condition 对象,而我们知道 Lock 对于同步状态的实现都是通过内部的自定义同步器实现的,newCondition() 方法也不例外,所以,Condition 接口的唯一实现类是同步器 AQS 的内部类 ConditionObject,因为 Condition 的操作需要获取相关的锁,所以作为同步器的内部类也比较合理,该类定义如下:

public class ConditionObject implements Condition, java.io.Serializable

每个 Condition 对象都包含着一个队列(下面都称为等待队列),该队列是 Condition 对象实现等待 / 通知功能的关键。

在继续说下 ConditioObject。ConditionObject 是同步器AbstractQueuedSynchronzied 的内部类。而ArrayBlockingQueue 是 Condition 的具体应用。

Object 中本身有有 wait ,notify ,notifyAll 等操作, Condition 相当于将 wait ,notify ,notifyAll 转换成想要的对象,将比较难懂的同步操作变成直观可控的对象行为

如下是Condition的Object监视器模型

Condition监视器模型

在传统的Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock实现类拥有一个同步队列和多个等待队列

Condition Object 的监视器模型 包含了一个同步多路和多个等待队列,结构如下所示:

Condition三部分

Condition 实现主要包含三个部分:

  • 等待队列
  • 等待操作
  • 通知操作

等待队列类似于同步队列,用于存放调用Condition 阻塞函数的等待线程队列

另外是 Condition 中两个最重要的方法,一个是 await,一个是signal方法

  • await:把当前线程阻塞挂起

  • signal:唤醒阻塞的线程

Condition 和我们的 synchronized 的wait/notify 方法很像,所以我们可以了解到 Condition 这个类就是 J.U.C 用来实现线程的主动阻塞和唤醒功能。

等待队列

如果了解 AQS 原理可以知道, AQS 中有个同步队列的概念。

等待队列和同步队列类似,都是一个 FIFO 队列。队列上每个节点包含一个线程引用,该线程就是 Condition 对象上的等待线程。

等待队列结构如下:

Condition 等待队列,包含首节点(firstWaiter),和尾节点(tailWaiter),如果一个线程调用了 Condition.await() 方法。那么该线程将会释放锁,并以当前线程构造节点加入Condition 等待队列进入等待状态

等待操作

当调用 Condition 的 await() 方法(或者以 await开头的方法),会使得当前线程进入等待队列,并且释放锁,同时线程的状态变为等待状态。

public final void await() throws InterruptedException 
    if (Thread.interrupted())
        throw new InterruptedException();
    // 当前线程加入等待队列
    Node node = addConditionWaiter();
    // 释放同步状态,也就是释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // node 不在节点中会一直 park 阻塞下去。达到等待的效果。
    while (!isOnSyncQueue(node)) 
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);

调用该方法的线程成功获得了锁的线程,也就是同步队列的首节点,该方法将会将该线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后续节点,然后当前节点会进入等待状态。要注意的是,如果等地队列中的节点被唤醒,唤醒节点的线程开始尝试获取同步状态。

但是如果不是通过 Condition.signal 进行唤醒的,而是对 等待线程 进行中断,那么会抛出 InterruptedException。

调用 Condition await方法后,当前线程会加入到等待队列,如下图所示:

通知操作

调用 Condition.signal() 方法,将会唤醒等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移动到同步队列中。

signal() 方法源码如下:

public final void signal() 
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);

final boolean transferForSignal(Node node) 
    /*
	* If cannot change waitStatus, the node has been cancelled.
    */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    /*
	* Splice onto queue and try to set waitStatus of predecessor to
	* indicate that thread is (probably) waiting. If cancelled or
	* attempt to set waitStatus fails, wake up to resync (in which
	* case the waitStatus can be transiently and harmlessly wrong).
	*/
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;

需要注意的是,调用该方法的前置条件是当前线程必须获得了锁,可以看到 Signal() 方法进行了 isHeldExclusively ()函数进行检查,判断是否获得了锁,接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程。

节点从等待队列,移动到同步队列的操作过程如下:

通过调用同步器的 enq(Node node) 方法,等待队列中的头节点线程安全地移动到同步队列中,当节点移动到同步队列后,当前线程再使用 LockSupport 唤醒该节点的线程。

被唤醒的线程,将从 await() 方法中的 while 循环中退出。

从 await 方法看:

//  当前节点已经在同步队列了,不会在循环下去了
while (!isOnSyncQueue(node)) 
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;

随后调用同步器的 acquireQueued() 方法加入到同步队列的竞争中。

final boolean acquireQueued(final Node node, int arg) 
    boolean failed = true;
    try 
        boolean interrupted = false;
        for (;;) 
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) 
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        
     finally 
        if (failed)
            cancelAcquire(node);
    

成功获取同步状态(获得锁)之后,被唤醒的线程,景从先前调用的 await 方法返回。此时线程已经成功获得了锁。

以上是关于关于JAVA BlockingQueue的工作原理的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程:BlockingQueue实现原理(Condition原理)

BlockingQueue 使用(生产者-消费者)

BlockingQueue详解

Java阻塞队列实现原理分析-ArrayBlockingQueue和LinkedBlockingQueue

JUC源码分析-集合篇BlockingQueue 阻塞式队列实现原理

BlockingQueue 原理 分析