关于JAVA BlockingQueue的工作原理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于JAVA BlockingQueue的工作原理相关的知识,希望对你有一定的参考价值。
我想问下
BlockingQueue的方法是类似ConcurrentLinkedQueue那样的并行,还是synchronizedCollection那样的同步。换句话说,BlockingQueue可以支持几个线程同时持有“写锁”,还是使用内部锁而每次只能有一个线程持有
Java多线程:BlockingQueue实现原理(Condition原理)
文章目录
BlockingQueue原理
BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等。
它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于put与take操作的原理是类似的。
下面以ArrayBlockingQueue源码为例,来说明BlockingQueue的实现原理
ArrayBlockingQueue源码分析
首先看一下ArrayBlockingQueue的构造函数,它初始化了put和take函数中用到的关键成员变量,这两个变量的类型分别是ReentrantLock和Condition:
- ReentrantLock实例:是AbstractQueuedSynchronizer(AQS)的子类,用于实现锁同步机制
- Condition实例:newCondition函数返回的Condition实例,是定义在AQS类内部的ConditionObject类,该类可以直接调用AQS相关的函数。在这用于实现等待队列机制
构造函数源码如下:
public ArrayBlockingQueue(int capacity, boolean fair)
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。
put函数的源码如下所示:
public void put(E e) throws InterruptedException
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
while (count == items.length)
notFull.await();
enqueue(e);
finally
lock.unlock();
我们会发现put函数使用了wait/notify的机制。
与一般生产者-消费者的实现方式不同,同步队列使用ReentrantLock和Condition相结合的机制,即先获得锁,再等待,而不是synchronized和wait的机制。
再来看一下消费者调用的take函数,take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素
take函数源码如下:
public E take() throws InterruptedException
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
while (count == 0)
notEmpty.await();
return dequeue();
finally
lock.unlock();
问题引出:
我们发现 ArrayBlockingQueue 并没有使用 Object.wait ,而是使用的 Condition.await ,这是为什么呢?
当调用 Condition对象的 await方法后,当前线程会释放锁并等待,而其他线程调用 Condition 对象的 signal 或者 signalall 方法通知并被阻塞的线程,然后自己执行unlock释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁。
可以发现 Condition 对象可以提供和 Object 的 wait 和 notify 一样的行为,但是后者必须先获取synchronized 这个内置的 monitor 锁才能调用( wait 和 notify 必须在synchronized 代码块中使用),而 Condition 是先获取 ReentrantLock 。这两种方式在阻塞等待时都会将相应的锁释放掉,但是 Condition 的等待可以中断,这是二者唯一的区别。
下面我们来看一下Condition的实现原理:
Condition原理
在 Java 程序中,任意一个 Java 对象,都拥有一组监视器方法(定义在java.lang.Object 类上),主要包括 wait()、wait(long)、notify()、notifyAll() 方法,这些方法与 synchronized 关键字配合,可以实现等待 / 通知模式。Condition 接口也提供了类似 Object 的监视器方法,与 Lock 配合可以实现等待 / 通知模式
但是这两者在使用方式以及功能特性上还是有区别的。
Object 的监视器方法与 Condition 接口对比如下:
对比项 | Object 监视器方法 | Condition |
---|---|---|
前置条件 | 获取对象的监视器锁(synchronized关键字) | 调用 Lock.lock() 获取锁调用 Lock.newCondition() 获取 Condition 对象 |
调用方法 | 直接调用如:object.wait() | 直接调用如:condition.await() |
等待队列个数 | 一个 | 多个 |
当前线程释放锁并进入等待队列 | 支持 | 支持 |
当前线程释放锁并进入等待队列,在等待状态中不响应中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
Condition 提供了一系列的方法来对阻塞和唤醒线程,Condition 接口源码如下:
public interface Condition
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
各个方法功能如下:
void await() throws InterruptedException
:当前线程进入等待状态直到被通知(signal)或中断。void awaitUninterruptibly()
:当前线程进入等待状态直到被通知,该方法不响应中断。long awaitNanos(long nanosTimeout) throws InterruptedException
:当前线程进入等待状态直到被通知、中断或者超时,返回值表示剩余超时时间。boolean awaitUntil(Date deadline) throws InterruptedException
:当前线程进入等待状态直到被通知、中断或者到某个时间。如果没有到指定时间就被通知,方法返回 true,否则,表示到了指定时间,返回 false。void signal()
:唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关联的锁。void signalAll()
:唤醒所有等待在 Condition 上的线程,能够从等待方法返回的线程必须获得与 Condition 相关联的锁。
Condition 的实现类
可以通过 Lock.newCondition() 方法获取 Condition 对象,而我们知道 Lock 对于同步状态的实现都是通过内部的自定义同步器实现的,newCondition() 方法也不例外,所以,Condition 接口的唯一实现类是同步器 AQS 的内部类 ConditionObject,因为 Condition 的操作需要获取相关的锁,所以作为同步器的内部类也比较合理,该类定义如下:
public class ConditionObject implements Condition, java.io.Serializable
每个 Condition 对象都包含着一个队列(下面都称为等待队列),该队列是 Condition 对象实现等待 / 通知功能的关键。
在继续说下 ConditioObject。ConditionObject 是同步器AbstractQueuedSynchronzied
的内部类。而ArrayBlockingQueue 是 Condition 的具体应用。
Object 中本身有有 wait ,notify ,notifyAll 等操作, Condition 相当于将 wait ,notify ,notifyAll 转换成想要的对象,将比较难懂的同步操作变成直观可控的对象行为。
如下是Condition的Object监视器模型
Condition监视器模型
在传统的Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock实现类拥有一个同步队列和多个等待队列:
Condition Object 的监视器模型 包含了一个同步多路和多个等待队列,结构如下所示:
Condition三部分
Condition 实现主要包含三个部分:
- 等待队列
- 等待操作
- 通知操作
等待队列类似于同步队列,用于存放调用Condition 阻塞函数的等待线程队列
另外是 Condition 中两个最重要的方法,一个是 await,一个是signal方法
-
await:把当前线程阻塞挂起
-
signal:唤醒阻塞的线程
Condition 和我们的 synchronized 的wait/notify 方法很像,所以我们可以了解到 Condition 这个类就是 J.U.C 用来实现线程的主动阻塞和唤醒功能。
等待队列
如果了解 AQS 原理可以知道, AQS 中有个同步队列的概念。
等待队列和同步队列类似,都是一个 FIFO 队列。队列上每个节点包含一个线程引用,该线程就是 Condition 对象上的等待线程。
等待队列结构如下:
Condition 等待队列,包含首节点(firstWaiter),和尾节点(tailWaiter),如果一个线程调用了 Condition.await() 方法。那么该线程将会释放锁,并以当前线程构造节点加入Condition 等待队列进入等待状态。
等待操作
当调用 Condition 的 await() 方法(或者以 await开头的方法),会使得当前线程进入等待队列,并且释放锁,同时线程的状态变为等待状态。
public final void await() throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程加入等待队列
Node node = addConditionWaiter();
// 释放同步状态,也就是释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// node 不在节点中会一直 park 阻塞下去。达到等待的效果。
while (!isOnSyncQueue(node))
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
调用该方法的线程成功获得了锁的线程,也就是同步队列的首节点,该方法将会将该线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后续节点,然后当前节点会进入等待状态。要注意的是,如果等地队列中的节点被唤醒,唤醒节点的线程开始尝试获取同步状态。
但是如果不是通过 Condition.signal 进行唤醒的,而是对 等待线程 进行中断,那么会抛出 InterruptedException。
调用 Condition await方法后,当前线程会加入到等待队列,如下图所示:
通知操作
调用 Condition.signal() 方法,将会唤醒等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移动到同步队列中。
signal() 方法源码如下:
public final void signal()
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
final boolean transferForSignal(Node node)
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
需要注意的是,调用该方法的前置条件是当前线程必须获得了锁,可以看到 Signal() 方法进行了 isHeldExclusively ()函数进行检查,判断是否获得了锁,接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程。
节点从等待队列,移动到同步队列的操作过程如下:
通过调用同步器的 enq(Node node) 方法,等待队列中的头节点线程安全地移动到同步队列中,当节点移动到同步队列后,当前线程再使用 LockSupport 唤醒该节点的线程。
被唤醒的线程,将从 await() 方法中的 while 循环中退出。
从 await 方法看:
// 当前节点已经在同步队列了,不会在循环下去了
while (!isOnSyncQueue(node))
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
随后调用同步器的 acquireQueued() 方法加入到同步队列的竞争中。
final boolean acquireQueued(final Node node, int arg)
boolean failed = true;
try
boolean interrupted = false;
for (;;)
final Node p = node.predecessor();
if (p == head && tryAcquire(arg))
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
finally
if (failed)
cancelAcquire(node);
成功获取同步状态(获得锁)之后,被唤醒的线程,景从先前调用的 await 方法返回。此时线程已经成功获得了锁。
以上是关于关于JAVA BlockingQueue的工作原理的主要内容,如果未能解决你的问题,请参考以下文章
Java多线程:BlockingQueue实现原理(Condition原理)
Java阻塞队列实现原理分析-ArrayBlockingQueue和LinkedBlockingQueue