AbstractQueuedSynchronizer解析
Posted gouden
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AbstractQueuedSynchronizer解析相关的知识,希望对你有一定的参考价值。
AbstractQueuedSynchronizer简称为AQS,是juc里很基本的一个包,juc里很多工具类是基于AQS实现的,理解了AQS,其它很多juc工具类也会比较清楚了。
1、方法简述
getState
返回当前state的值,该操作具有volatile读的内存语义。
setState
设置state的值,该操作具有volatile写的内存语义。
compareAndSetState
以cas方式设置state的值,该操作具有volatile读和写的语义。
tryAcquire
尝试以独占模式获取对象状态。默认实现将抛出异常。
tryRelease
尝试以独占模式设置表示释放的状态。默认实现将抛出异常。
tryAcquireShared
尝试以共享模式获取对象状态。默认实现将抛出异常。
tryReleaseShared
尝试以共享模式设置表示释放的状态。默认实现将抛出异常。
isHeldExclusively
当前线程,同步是以独占方式进行的,则返回true。默认实现将抛出异常。
acquire
以独占模式获取对象,忽略中断。
acquireInterruptibly
以独占模式获取对象,如果被中断则被终止。
tryAcquireNanos
与acquireInterruptibly类似,带有等待时间。
release
以独占方式释放对象。
acquireShared
以共享模式获取对象,忽略中断。
acquireSharedInterruptibly
以共享模式获取对象,如果被中断则被终止。
tryAcquireSharedNanos
与acquireSharedInterruptibly类似,带有等待时间。
releaseShared
以共享方式释放对象。
hasQueuedThreads
判断是否有正在等待获取的线程。
hasContented
查询是否其他线程层争用此同步器。
getFirstQueuedThread
获取队列中第一个线程,如果没有返回空。
isQueued
判断给定线程是否在队列中。
getQueueLength
获取队列里等待线程的个数。
getQueuedThreads
获取队列里等待的线程。
getExclusiveQueuedThreads
返回以独占模式获取对象等待的线程。
getSharedQueuedThreads
返回以共享模式获取对象等待的线程。
2、源码解析
先看一下AQS的Node里定义的几个状态:
1 static final int CANCELLED = 1; 2 static final int SIGNAL = -1; 3 static final int CONDITION = -2; 4 static final int PROPAGATE = -3;
CANCELLED状态是队列里的节点因为超时或中断而不再参与争夺资源的状态
SIGNAL状态是线程需要被唤醒的状态,这个状态的使用方式为将当前节点的前面节点设置为此状态,前面节点释放资源是如果检测是这个状态则唤醒当前节点(如果当前节点的状态正常)
CONDITION状态是线程需要等待一个条件成立,条件成立会出队
PROPAGATE状态是在共享状态下,当前节点释放资源时,传播给后面所有节点
再说一下AQS的两种模式:独占模式和共享模式
独占模式可以理解为上课用教室或机房,这个教室或机房里有30个位子,就算只有11个人来上课,也不能有其他人来用这个教室或机房(资源)
共享模式可以理解为排队自习用教室或上机用机房,这个教室或机房如果有空的位置,而且空的位置够排队的第一个人用的,那就让排队的第一个人进来用,第一个人进来后发现还能进第二个人用,那就让第二个人也进来用,以此类推;但如果不够第二个人用而够第三个人用,不让第三个人进来。
acquire及相关方法(独占模式):
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt();//设置中断状态 5 }
该方法会先调用一个非等待的尝试获取方法(这个方法由用户实现),如果获取成功就直接返回,否则就调用addWaiter方法并调用acquireQueued方法,addWaiter方法的代码如下:
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode);//Node是内部类,队列是由Node组成的 3 // Try the fast path of enq; backup to full enq on failure 4 Node pred = tail;//先快速入队 5 if (pred != null) { 6 node.prev = pred; 7 if (compareAndSetTail(pred, node)) { 8 pred.next = node; 9 return node;//如果成功就返回 10 } 11 } 12 enq(node);//失败就比较复杂地入队 13 return node; 14 }
可以看出addWaiter方法就是让当前线程入队。
acquireQueued方法的代码如下:
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) { 6 final Node p = node.predecessor(); 7 if (p == head && tryAcquire(arg)) {//判断当前节点的前一个节点是否为队首,如果是,就尝试获取资源,如果获取成功,就把当前节点设置为队首,获取成功 8 setHead(node); 9 p.next = null; // help GC 10 failed = false; 11 return interrupted; 12 } 13 if (shouldParkAfterFailedAcquire(p, node) &&//获取资源失败,判断是否应该等待,如果应该,就调用parkAndCheckInterrupt方法等待,并检查是否有中断状态;如果不应该等待则说明队列的状态有变化,当前节点有机会获取资源,会再次尝试 14 parkAndCheckInterrupt()) 15 interrupted = true; 16 } 17 } finally { 18 if (failed) 19 cancelAcquire(node); 20 } 21 }
这个方法的作用就是一直尝试获取资源,直到获取了资源或者取消了,中间的过程会根据情况判断是否等待,返回值为是否出现中断的情况。
shouldParkAfterFailedAcquire方法的代码如下:
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 int ws = pred.waitStatus;//获取前面节点的状态 3 if (ws == Node.SIGNAL)//如果前面节点的状态为SIGNAL,则说明已经告诉前面节点前面节点运行完通知当前节点 4 /* 5 * This node has already set status asking a release 6 * to signal it, so it can safely park. 7 */ 8 return true; 9 if (ws > 0) {//如果前面节点状态大于0,则为CANCELLED,则说明前面节点取消了获取,跳过前面节点,找到最近的正常状态的节点,放到该节点后面 10 /* 11 * Predecessor was cancelled. Skip over predecessors and 12 * indicate retry. 13 */ 14 do { 15 node.prev = pred = pred.prev; 16 } while (pred.waitStatus > 0); 17 pred.next = node; 18 } else {//如果前面节点是正常的,则设置为SIGNAL,告诉前面节点运行完通知当前节点 有一种可能,那就是当前刚给前面节点设置了SIGNAL,前面节点就已经获取资源并释放了,所以这种状态就返回false,先不等待,运行一下是否有前面节点刚好释放的情况出现 19 /* 20 * waitStatus must be 0 or PROPAGATE. Indicate that we 21 * need a signal, but don‘t park yet. Caller will need to 22 * retry to make sure it cannot acquire before parking. 23 */ 24 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 25 } 26 return false; 27 }
这个方法的作用就是判断在获取资源失败后是否应该等待。
parkAndCheckInterrupt方法的代码如下:
1 private final boolean parkAndCheckInterrupt() { 2 LockSupport.park(this); 3 return Thread.interrupted(); 4 }
这个方法的作用就是等待并返回是否中断并消除中断状态。
release及相关方法(独占模式):
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { 3 Node h = head; 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h);//唤醒队列里的下一个线程 6 return true; 7 } 8 return false; 9 }
unparkSuccessor方法的代码:
1 private void unparkSuccessor(Node node) { 2 /* 3 * If status is negative (i.e., possibly needing signal) try 4 * to clear in anticipation of signalling. It is OK if this 5 * fails or if status is changed by waiting thread. 6 */ 7 int ws = node.waitStatus; 8 if (ws < 0) 9 compareAndSetWaitStatus(node, ws, 0); 10 11 /* 这里的意思就是如果下一个节点没有取消,则唤醒它;如果取消了,则从尾开始找一个没有取消的节点,唤醒这个节点 12 * Thread to unpark is held in successor, which is normally 13 * just the next node. But if cancelled or apparently null, 14 * traverse backwards from tail to find the actual 15 * non-cancelled successor. 16 */ 17 Node s = node.next; 18 if (s == null || s.waitStatus > 0) { 19 s = null; 20 for (Node t = tail; t != null && t != node; t = t.prev) 21 if (t.waitStatus <= 0) 22 s = t; 23 } 24 if (s != null) 25 LockSupport.unpark(s.thread); 26 }
这个方法的作用就是唤醒后面节点(线程),如果后面节点取消获取,则从队列尾部向前找到一个没有取消的节点,唤醒这个节点。
acquireShared及相关方法(共享模式):
1 public final void acquireShared(int arg) { 2 if (tryAcquireShared(arg) < 0)//尝试获取资源,如果成功就返回,如果失败则调用下面方法 3 doAcquireShared(arg);// 4 }
doAcquireShared方法的代码:
1 private void doAcquireShared(int arg) { 2 final Node node = addWaiter(Node.SHARED);//以SHARED模式添加一个节点 3 boolean failed = true; 4 try { 5 boolean interrupted = false; 6 for (;;) { 7 final Node p = node.predecessor(); 8 if (p == head) { 9 int r = tryAcquireShared(arg);//尝试获取资源 10 if (r >= 0) { 11 setHeadAndPropagate(node, r);//将当前节点设置为head,如果还有剩余资源则可以再唤醒之后的线程 12 p.next = null; // help GC 13 if (interrupted) 14 selfInterrupt(); 15 failed = false; 16 return; 17 } 18 } 19 if (shouldParkAfterFailedAcquire(p, node) && 20 parkAndCheckInterrupt()) 21 interrupted = true; 22 } 23 } finally { 24 if (failed) 25 cancelAcquire(node); 26 } 27 }
这个方法的作用就是以共享模式获取资源。
setHeadAndPropagate方法的代码:
1 private void setHeadAndPropagate(Node node, int propagate) { 2 Node h = head; // Record old head for check below 3 setHead(node); 4 /* 5 * Try to signal next queued node if: 6 * Propagation was indicated by caller, 7 * or was recorded (as h.waitStatus either before 8 * or after setHead) by a previous operation 9 * (note: this uses sign-check of waitStatus because 10 * PROPAGATE status may transition to SIGNAL.) 11 * and 12 * The next node is waiting in shared mode, 13 * or we don‘t know, because it appears null 14 * 15 * The conservatism in both of these checks may cause 16 * unnecessary wake-ups, but only when there are multiple 17 * racing acquires/releases, so most need signals now or soon 18 * anyway. 19 */ 20 if (propagate > 0 || h == null || h.waitStatus < 0 || 21 (h = head) == null || h.waitStatus < 0) {//如果还有剩余的资源,则唤醒下一个节点(线程) 22 Node s = node.next; 23 if (s == null || s.isShared()) 24 doReleaseShared(); 25 } 26 }
这个方法的作用就是将当前节点设置为队首并根据剩余资源数判断是否唤醒下一个节点(线程)。
releaseShared及相关方法(共享模式):
1 public final boolean releaseShared(int arg) { 2 if (tryReleaseShared(arg)) {//尝试释放资源 3 doReleaseShared(); 4 return true; 5 } 6 return false; 7 }
doReleaseShared方法的代码:
1 private void doReleaseShared() { 2 /* 3 * Ensure that a release propagates, even if there are other 4 * in-progress acquires/releases. This proceeds in the usual 5 * way of trying to unparkSuccessor of head if it needs 6 * signal. But if it does not, status is set to PROPAGATE to 7 * ensure that upon release, propagation continues. 8 * Additionally, we must loop in case a new node is added 9 * while we are doing this. Also, unlike other uses of 10 * unparkSuccessor, we need to know if CAS to reset status 11 * fails, if so rechecking. 12 */ 13 for (;;) { 14 Node h = head; 15 if (h != null && h != tail) { 16 int ws = h.waitStatus; 17 if (ws == Node.SIGNAL) { 18 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 19 continue; // loop to recheck cases 20 unparkSuccessor(h); 21 } 22 else if (ws == 0 && 23 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 24 continue; // loop on failed CAS 25 } 26 if (h == head) // loop if head changed 27 break; 28 } 29 }
这个方法的作用就是判断是否唤醒后面节点(线程)。
3、补充
AQS有个有意思的地方,那就是state的变量的值不是直接用cas修改的,而是用一个stateOffset的偏移量修改的,stateOffset的获取是用下面的代码获取的:
1 stateOffset = unsafe.objectFieldOffset 2 (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
注释说是为了以后的加强而没有用AtomicInteger子类的方式实现。个人的能力有限,暂时还没看出巧妙在什么地方,也没有找到描述这个设计巧妙之处的文章。
以上是关于AbstractQueuedSynchronizer解析的主要内容,如果未能解决你的问题,请参考以下文章