Java多线程:BlockingQueue实现原理(Condition原理)

Posted 杨 戬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程:BlockingQueue实现原理(Condition原理)相关的知识,希望对你有一定的参考价值。

文章目录

BlockingQueue原理

BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。

它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于put与take操作的原理是类似的。

下面以ArrayBlockingQueue源码为例,来说明BlockingQueue的实现原理

ArrayBlockingQueue源码分析

首先看一下ArrayBlockingQueue的构造函数,它初始化了put和take函数中用到的关键成员变量,这两个变量的类型分别是ReentrantLock和Condition:

  • ReentrantLock实例:是AbstractQueuedSynchronizer(AQS)的子类,用于实现锁同步机制
  • Condition实例:newCondition函数返回的Condition实例,是定义在AQS类内部的ConditionObject类,该类可以直接调用AQS相关的函数。在这用于实现等待队列机制

构造函数源码如下:

public ArrayBlockingQueue(int capacity, boolean fair) 
    if (capacity <= 0)
    	throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();

put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。

put函数的源码如下所示:

public void put(E e) throws InterruptedException 
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try 
    while (count == items.length)
        	notFull.await();
        enqueue(e);
     finally 
    	lock.unlock();
    

我们会发现put函数使用了wait/notify的机制。

与一般生产者-消费者的实现方式不同,同步队列使用ReentrantLock和Condition相结合的机制,即先获得锁,再等待,而不是synchronized和wait的机制。

再来看一下消费者调用的take函数,take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素

take函数源码如下:

public E take() throws InterruptedException 
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try 
        while (count == 0)
        	notEmpty.await();
        return dequeue();
     finally 
    	lock.unlock();
    

问题引出:

我们发现 ArrayBlockingQueue 并没有使用 Object.wait ,而是使用的 Condition.await ,这是为什么呢?

当调用 Condition对象的 await方法后,当前线程会释放锁并等待,而其他线程调用 Condition 对象的 signal 或者 signalall 方法通知并被阻塞的线程,然后自己执行unlock释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁。

可以发现 Condition 对象可以提供和 Object 的 wait 和 notify 一样的行为,但是后者必须先获取synchronized 这个内置的 monitor 锁才能调用( wait 和 notify 必须在synchronized 代码块中使用),而 Condition 是先获取 ReentrantLock 。这两种方式在阻塞等待时都会将相应的锁释放掉,但是 Condition 的等待可以中断,这是二者唯一的区别

下面我们来看一下Condition的实现原理:

Condition原理

在 Java 程序中,任意一个 Java 对象,都拥有一组监视器方法(定义在java.lang.Object 类上),主要包括 wait()、wait(long)、notify()、notifyAll() 方法,这些方法与 synchronized 关键字配合,可以实现等待 / 通知模式。Condition 接口也提供了类似 Object 的监视器方法,与 Lock 配合可以实现等待 / 通知模式

但是这两者在使用方式以及功能特性上还是有区别的。

Object 的监视器方法与 Condition 接口对比如下:

对比项Object 监视器方法Condition
前置条件获取对象的监视器锁(synchronized关键字)调用 Lock.lock() 获取锁调用 Lock.newCondition() 获取 Condition 对象
调用方法直接调用如:object.wait()直接调用如:condition.await()
等待队列个数一个多个
当前线程释放锁并进入等待队列支持支持
当前线程释放锁并进入等待队列,在等待状态中不响应中断不支持支持
当前线程释放锁并进入超时等待状态支持支持
当前线程释放锁并进入等待状态到将来的某个时间不支持支持
唤醒等待队列中的一个线程支持支持
唤醒等待队列中的全部线程支持支持

Condition 提供了一系列的方法来对阻塞和唤醒线程,Condition 接口源码如下:

public interface Condition 
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();

各个方法功能如下:

  • void await() throws InterruptedException:当前线程进入等待状态直到被通知(signal)或中断。
  • void awaitUninterruptibly():当前线程进入等待状态直到被通知,该方法不响应中断。
  • long awaitNanos(long nanosTimeout) throws InterruptedException:当前线程进入等待状态直到被通知、中断或者超时,返回值表示剩余超时时间。
  • boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知、中断或者到某个时间。如果没有到指定时间就被通知,方法返回 true,否则,表示到了指定时间,返回 false。
  • void signal():唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关联的锁。
  • void signalAll():唤醒所有等待在 Condition 上的线程,能够从等待方法返回的线程必须获得与 Condition 相关联的锁。

Condition 的实现类

可以通过 Lock.newCondition() 方法获取 Condition 对象,而我们知道 Lock 对于同步状态的实现都是通过内部的自定义同步器实现的,newCondition() 方法也不例外,所以,Condition 接口的唯一实现类是同步器 AQS 的内部类 ConditionObject,因为 Condition 的操作需要获取相关的锁,所以作为同步器的内部类也比较合理,该类定义如下:

public class ConditionObject implements Condition, java.io.Serializable

每个 Condition 对象都包含着一个队列(下面都称为等待队列),该队列是 Condition 对象实现等待 / 通知功能的关键。

在继续说下 ConditioObject。ConditionObject 是同步器AbstractQueuedSynchronzied 的内部类。而ArrayBlockingQueue 是 Condition 的具体应用。

Object 中本身有有 wait ,notify ,notifyAll 等操作, Condition 相当于将 wait ,notify ,notifyAll 转换成想要的对象,将比较难懂的同步操作变成直观可控的对象行为

如下是Condition的Object监视器模型

Condition监视器模型

在传统的Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock实现类拥有一个同步队列和多个等待队列

Condition Object 的监视器模型 包含了一个同步多路和多个等待队列,结构如下所示:

Condition三部分

Condition 实现主要包含三个部分:

  • 等待队列
  • 等待操作
  • 通知操作

等待队列类似于同步队列,用于存放调用Condition 阻塞函数的等待线程队列

另外是 Condition 中两个最重要的方法,一个是 await,一个是signal方法

  • await:把当前线程阻塞挂起

  • signal:唤醒阻塞的线程

Condition 和我们的 synchronized 的wait/notify 方法很像,所以我们可以了解到 Condition 这个类就是 J.U.C 用来实现线程的主动阻塞和唤醒功能。

等待队列

如果了解 AQS 原理可以知道, AQS 中有个同步队列的概念。

等待队列和同步队列类似,都是一个 FIFO 队列。队列上每个节点包含一个线程引用,该线程就是 Condition 对象上的等待线程。

等待队列结构如下:

Condition 等待队列,包含首节点(firstWaiter),和尾节点(tailWaiter),如果一个线程调用了 Condition.await() 方法。那么该线程将会释放锁,并以当前线程构造节点加入Condition 等待队列进入等待状态

等待操作

当调用 Condition 的 await() 方法(或者以 await开头的方法),会使得当前线程进入等待队列,并且释放锁,同时线程的状态变为等待状态。

public final void await() throws InterruptedException 
    if (Thread.interrupted())
        throw new InterruptedException();
    // 当前线程加入等待队列
    Node node = addConditionWaiter();
    // 释放同步状态,也就是释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // node 不在节点中会一直 park 阻塞下去。达到等待的效果。
    while (!isOnSyncQueue(node)) 
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);

调用该方法的线程成功获得了锁的线程,也就是同步队列的首节点,该方法将会将该线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后续节点,然后当前节点会进入等待状态。要注意的是,如果等地队列中的节点被唤醒,唤醒节点的线程开始尝试获取同步状态。

但是如果不是通过 Condition.signal 进行唤醒的,而是对 等待线程 进行中断,那么会抛出 InterruptedException。

调用 Condition await方法后,当前线程会加入到等待队列,如下图所示:

通知操作

调用 Condition.signal() 方法,将会唤醒等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移动到同步队列中。

signal() 方法源码如下:

public final void signal() 
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);

final boolean transferForSignal(Node node) 
    /*
	* If cannot change waitStatus, the node has been cancelled.
    */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    /*
	* Splice onto queue and try to set waitStatus of predecessor to
	* indicate that thread is (probably) waiting. If cancelled or
	* attempt to set waitStatus fails, wake up to resync (in which
	* case the waitStatus can be transiently and harmlessly wrong).
	*/
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;

需要注意的是,调用该方法的前置条件是当前线程必须获得了锁,可以看到 Signal() 方法进行了 isHeldExclusively ()函数进行检查,判断是否获得了锁,接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程。

节点从等待队列,移动到同步队列的操作过程如下:

通过调用同步器的 enq(Node node) 方法,等待队列中的头节点线程安全地移动到同步队列中,当节点移动到同步队列后,当前线程再使用 LockSupport 唤醒该节点的线程。

被唤醒的线程,将从 await() 方法中的 while 循环中退出。

从 await 方法看:

//  当前节点已经在同步队列了,不会在循环下去了
while (!isOnSyncQueue(node)) 
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;

随后调用同步器的 acquireQueued() 方法加入到同步队列的竞争中。

final boolean acquireQueued(final Node node, int arg) 
    boolean failed = true;
    try 
        boolean interrupted = false;
        for (;;) 
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) 
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        
     finally 
        if (failed)
            cancelAcquire(node);
    

成功获取同步状态(获得锁)之后,被唤醒的线程,景从先前调用的 await 方法返回。此时线程已经成功获得了锁。

以上是关于Java多线程:BlockingQueue实现原理(Condition原理)的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程(五)之BlockingQueue深入分析

Java多线程:BlockingQueue实现生产者消费者模型

java多线程-BlockingQueue

Java多线程-BlockingQueue

多线程-生产者消费者(BlockingQueue实现)

关于JAVA BlockingQueue的工作原理