JAVA多线程之ArrayBlockingQueue看Condition的实现
Posted Java_宇宁
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA多线程之ArrayBlockingQueue看Condition的实现相关的知识,希望对你有一定的参考价值。
今天所要介绍的Condition,大家平时业务开发中可能少会用,一般会在框架或中间件比较常见,本文以JAVA中ArrayBlockingQueue中的Condition的使用以及实现原理。
ArrayBlockingQueue中Condition的属性定义和构造函数
final ReentrantLock lock;
/** Condition for waiting takes */
//等待在非空
private final Condition notEmpty;
/** Condition for waiting puts */
//等待队列没有满
private final Condition notFull;
复制代码
ArrayBlockingQueue构造函数,两个Condition都是从ReentrantLock的newConditon方法构造的 ConditionObject对象,
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
通过看ReentrantLock的newCondition方法可以看到实际Condition实现类就是ConditionObject,这就是今天要分析的重点对象.
final ConditionObject newCondition() {
return new ConditionObject();
}
ConditionObject类结构
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */\\
//头节点
private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
//尾节点
private transient ConditionNode lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
ArrayBlockingQueue的notfull和notEmpty的使用如下
1 notFull是放入队列前,如果队列满了,则执行notFull.await,让线程等待, 在元素入队后,通过调用notEmpty.signal唤醒消息者线程.
2.notEmpty是消息队列时,如果队列为空,则执行notEmpty.await让消费者线程等待, 元素消费者消息完后,执行notFull.signal唤醒等待的生产者线程去往队列放元素.
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
下面我们着重分析下ConditionObject中await和signal是如何实现的。
ConditionObject中await实现
主要步骤: 1.创建一个新建等待节点加入条件等待队列
2.释放所有锁(即重置state变量为0) 3.判断node是否在CLH双端等待队列中,如果不在则直接LockSupport.park阻塞线程,r然后检查是否中断
4.如果CLH的等待队列中,则通过CLH队列中获取锁,
5.如果node的nextWaiter不为空,则表示有新的Condtion等待节点,则Conditon的单向队列中移除取消状态的节点
6.判断是否需要中断或者抛出异常
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//加入条件等待队列尾部
Node node = addConditionWaiter();
//全部释放锁
long savedState = fullyRelease(node);
int interruptMode = 0;
//是否在条件等待队列中
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//获取等待队列
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//判断当前node下一个等待不为空,说明已经是在等待队列中,
//则清除取消状态的等待节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//说明是处于中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
如果最后的等待Node状态不是Node.Condition,则需要将取消的节点从等待队列中去掉, 然后创建一个Node,参数中线程是当前线程,Node的状态是Node.Condition,加入到队列 的尾部。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
完成释放锁的操作,从这里可以看出,执行await操作是会释放锁的,, 首先获取state的状态值,然后重置为0,成为无锁状态.返回有锁之前的state 的值, 如果失败,则重置Node的waitStatus状态为CANCELLED状态
final long fullyRelease(Node node) {
boolean failed = true;
try {
long savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
等待队列中的节点去争抢锁,这个之前在ReentrantLock中分析过,这里就不再赘述,
final boolean acquireQueued(final Node node, long arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取新建node的前驱
final Node p = node.predecessor();
//如果前驱是head,并且tryAcquire或
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
ConditionObject中signal实现
ConditionObect的signal去唤醒firstWaiter(即Condition等待队列中第一个节点)
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
doSignal是从firstWaiter开始循环,唤醒第一个没有被取消的等待节点
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal则是将等待的node的waitStatus重置为0,然后从Condition队列中转移加入CLH的 双端等待队列。
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//加入CLH等待队列
Node p = enq(node);
int ws = p.waitStatus;
//如果等待状态是取消状态,或者waitStatus设置为Signal失败,则唤醒node的线程去争抢锁,
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
下图是表示从Condition的等待队列中,会转移到CLH的等待对队列中.
总结
今天介绍的ConditonObject主要是条件等待队列,可以创建多个条件等待队列, ReentrantLock是只有CLH的双向队列,都是在双向队列上进行的等待,相比之下, Condition控制更加灵活,控制粒度更细。
以上是关于JAVA多线程之ArrayBlockingQueue看Condition的实现的主要内容,如果未能解决你的问题,请参考以下文章