AQS(队列同步器)

Posted yqxx1116

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS(队列同步器)相关的知识,希望对你有一定的参考价值。

目录导引

  一、简介

  二、源码解析(JDK8)

  三、运用示例

一、简介

  AQS(AbstractQueuedSynchronizer)的核心思想是基于volatile int state变量,配合Unsafe工具对其原子性的操作来实现对当前state状态值进行修改。

  同步器内部依赖一个FIFO的双向队列来完成资源获取线程的排队工作。

  同步器主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,对同步状态的修改或者访问主要通过同步器提供的3个方法:

  getState() 获取当前的同步状态

  setState(int newState) 设置当前同步状态

  compareAndSetState(int expect,int update) 使用CAS设置当前状态,该方法能够保证状态设置的原子性

  同步器可以支持独占式的获取同步状态,也可以支持共享式的获取同步状态,这样可以方便实现不同类型的同步组件,利用同步器实现锁的语义。

 技术图片

 二、源码解析(JDK8)

  内部类Node:

 1 static final class Node {
 2 
 3         /** 分享模式节点 */
 4         static final Node SHARED = new Node();
 5 
 6         /** 独占模式节点 */
 7         static final Node EXCLUSIVE = null;
 8 
 9         /** 节点等待状态:表示线程已取消 */
10         static final int CANCELLED =  1;
11 
12         /** 节点等待状态:表示后继线程需要被唤醒 */
13         static final int SIGNAL    = -1;
14 
15         /** 节点等待状态:表示线程在Condtion上 */
16         static final int CONDITION = -2;
17 
18         /** 节点等待状态:表示下一个acquireShared需要无条件的传播 */
19         static final int PROPAGATE = -3;
20 
21         /** 节点的等待状态 */
22         volatile int waitStatus;
23 
24         /** 前一个节点引用 */
25         volatile Node prev;
26 
27         /** 后一个节点引用 */
28         volatile Node next;
29 
30         /** 节点持有的线程 */
31         volatile Thread thread;
32 
33         /** 节点状态:已取消 */
34         Node nextWaiter;
35 
36         /** 分享模式节点 */
37         final boolean isShared() {
38             return nextWaiter == SHARED;
39         }
40 
41         /** 分享模式节点 */
42         final Node predecessor() throws NullPointerException {
43             Node p = prev;
44             if (p == null)
45                 throw new NullPointerException();
46             else
47                 return p;
48         }
49 
50         /** 分享模式节点 */
51         Node() {    // Used to establish initial head or SHARED marker
52         }
53 
54         /** 分享模式节点 */
55         Node(Thread thread, Node mode) {     // Used by addWaiter
56             this.nextWaiter = mode;
57             this.thread = thread;
58         }
59 
60         /** 分享模式节点 */
61         Node(Thread thread, int waitStatus) { // Used by Condition
62             this.waitStatus = waitStatus;
63             this.thread = thread;
64         }
65     }

 

  内部类ConditionObject:

  1 public class ConditionObject implements Condition, java.io.Serializable {
  2 
  3         private static final long serialVersionUID = 1173984872572414699L;
  4 
  5         /** 先占位 */
  6         private transient Node firstWaiter;
  7 
  8         /** 先占位 */
  9         private transient Node lastWaiter;
 10 
 11         /** 先占位 */
 12         public ConditionObject() { }
 13 
 14         /** 先占位 */
 15         private Node addConditionWaiter() {
 16             Node t = lastWaiter;
 17             // If lastWaiter is cancelled, clean out.
 18             if (t != null && t.waitStatus != Node.CONDITION) {
 19                 unlinkCancelledWaiters();
 20                 t = lastWaiter;
 21             }
 22             Node node = new Node(Thread.currentThread(), Node.CONDITION);
 23             if (t == null)
 24                 firstWaiter = node;
 25             else
 26                 t.nextWaiter = node;
 27             lastWaiter = node;
 28             return node;
 29         }
 30 
 31         /** 先占位 */
 32         private void doSignal(Node first) {
 33             do {
 34                 if ( (firstWaiter = first.nextWaiter) == null)
 35                     lastWaiter = null;
 36                 first.nextWaiter = null;
 37             } while (!transferForSignal(first) &&
 38                     (first = firstWaiter) != null);
 39         }
 40 
 41         /** 先占位 */
 42         private void doSignalAll(Node first) {
 43             lastWaiter = firstWaiter = null;
 44             do {
 45                 Node next = first.nextWaiter;
 46                 first.nextWaiter = null;
 47                 transferForSignal(first);
 48                 first = next;
 49             } while (first != null);
 50         }
 51 
 52         /** 先占位 */
 53         private void unlinkCancelledWaiters() {
 54             Node t = firstWaiter;
 55             Node trail = null;
 56             while (t != null) {
 57                 Node next = t.nextWaiter;
 58                 if (t.waitStatus != Node.CONDITION) {
 59                     t.nextWaiter = null;
 60                     if (trail == null)
 61                         firstWaiter = next;
 62                     else
 63                         trail.nextWaiter = next;
 64                     if (next == null)
 65                         lastWaiter = trail;
 66                 }
 67                 else
 68                     trail = t;
 69                 t = next;
 70             }
 71         }
 72 
 73         /** 先占位 */
 74         public final void signal() {
 75             if (!isHeldExclusively())
 76                 throw new IllegalMonitorStateException();
 77             Node first = firstWaiter;
 78             if (first != null)
 79                 doSignal(first);
 80         }
 81 
 82         /** 先占位 */
 83         public final void signalAll() {
 84             if (!isHeldExclusively())
 85                 throw new IllegalMonitorStateException();
 86             Node first = firstWaiter;
 87             if (first != null)
 88                 doSignalAll(first);
 89         }
 90 
 91         /** 先占位 */
 92         public final void awaitUninterruptibly() {
 93             Node node = addConditionWaiter();
 94             int savedState = fullyRelease(node);
 95             boolean interrupted = false;
 96             while (!isOnSyncQueue(node)) {
 97                 LockSupport.park(this);
 98                 if (Thread.interrupted())
 99                     interrupted = true;
100             }
101             if (acquireQueued(node, savedState) || interrupted)
102                 selfInterrupt();
103         }
104 
105         /** 先占位 */
106         private static final int REINTERRUPT =  1;
107 
108         /** 先占位 */
109         private static final int THROW_IE    = -1;
110 
111         /** 先占位 */
112         private int checkInterruptWhileWaiting(Node node) {
113             return Thread.interrupted() ?
114                     (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
115                     0;
116         }
117 
118         /** 先占位 */
119         private void reportInterruptAfterWait(int interruptMode)
120                 throws InterruptedException {
121             if (interruptMode == THROW_IE)
122                 throw new InterruptedException();
123             else if (interruptMode == REINTERRUPT)
124                 selfInterrupt();
125         }
126 
127         /** 先占位 */
128         public final void await() throws InterruptedException {
129             if (Thread.interrupted())
130                 throw new InterruptedException();
131             Node node = addConditionWaiter();
132             int savedState = fullyRelease(node);
133             int interruptMode = 0;
134             while (!isOnSyncQueue(node)) {
135                 LockSupport.park(this);
136                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
137                     break;
138             }
139             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
140                 interruptMode = REINTERRUPT;
141             if (node.nextWaiter != null) // clean up if cancelled
142                 unlinkCancelledWaiters();
143             if (interruptMode != 0)
144                 reportInterruptAfterWait(interruptMode);
145         }
146 
147         /** 先占位 */
148         public final long awaitNanos(long nanosTimeout)
149                 throws InterruptedException {
150             if (Thread.interrupted())
151                 throw new InterruptedException();
152             Node node = addConditionWaiter();
153             int savedState = fullyRelease(node);
154             final long deadline = System.nanoTime() + nanosTimeout;
155             int interruptMode = 0;
156             while (!isOnSyncQueue(node)) {
157                 if (nanosTimeout <= 0L) {
158                     transferAfterCancelledWait(node);
159                     break;
160                 }
161                 if (nanosTimeout >= spinForTimeoutThreshold)
162                     LockSupport.parkNanos(this, nanosTimeout);
163                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
164                     break;
165                 nanosTimeout = deadline - System.nanoTime();
166             }
167             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
168                 interruptMode = REINTERRUPT;
169             if (node.nextWaiter != null)
170                 unlinkCancelledWaiters();
171             if (interruptMode != 0)
172                 reportInterruptAfterWait(interruptMode);
173             return deadline - System.nanoTime();
174         }
175 
176         /** 先占位 */
177         public final boolean awaitUntil(Date deadline)
178                 throws InterruptedException {
179             long abstime = deadline.getTime();
180             if (Thread.interrupted())
181                 throw new InterruptedException();
182             Node node = addConditionWaiter();
183             int savedState = fullyRelease(node);
184             boolean timedout = false;
185             int interruptMode = 0;
186             while (!isOnSyncQueue(node)) {
187                 if (System.currentTimeMillis() > abstime) {
188                     timedout = transferAfterCancelledWait(node);
189                     break;
190                 }
191                 LockSupport.parkUntil(this, abstime);
192                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
193                     break;
194             }
195             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
196                 interruptMode = REINTERRUPT;
197             if (node.nextWaiter != null)
198                 unlinkCancelledWaiters();
199             if (interruptMode != 0)
200                 reportInterruptAfterWait(interruptMode);
201             return !timedout;
202         }
203 
204         /** 先占位 */
205         public final boolean await(long time, TimeUnit unit)
206                 throws InterruptedException {
207             long nanosTimeout = unit.toNanos(time);
208             if (Thread.interrupted())
209                 throw new InterruptedException();
210             Node node = addConditionWaiter();
211             int savedState = fullyRelease(node);
212             final long deadline = System.nanoTime() + nanosTimeout;
213             boolean timedout = false;
214             int interruptMode = 0;
215             while (!isOnSyncQueue(node)) {
216                 if (nanosTimeout <= 0L) {
217                     timedout = transferAfterCancelledWait(node);
218                     break;
219                 }
220                 if (nanosTimeout >= spinForTimeoutThreshold)
221                     LockSupport.parkNanos(this, nanosTimeout);
222                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
223                     break;
224                 nanosTimeout = deadline - System.nanoTime();
225             }
226             if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
227                 interruptMode = REINTERRUPT;
228             if (node.nextWaiter != null)
229                 unlinkCancelledWaiters();
230             if (interruptMode != 0)
231                 reportInterruptAfterWait(interruptMode);
232             return !timedout;
233         }
234 
235         /** 先占位 */
236         final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
237             return sync == AbstractQueuedSynchronizer.this;
238         }
239 
240         /** 先占位 */
241         protected final boolean hasWaiters() {
242             if (!isHeldExclusively())
243                 throw new IllegalMonitorStateException();
244             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
245                 if (w.waitStatus == Node.CONDITION)
246                     return true;
247             }
248             return false;
249         }
250 
251         /** 先占位 */
252         protected final int getWaitQueueLength() {
253             if (!isHeldExclusively())
254                 throw new IllegalMonitorStateException();
255             int n = 0;
256             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
257                 if (w.waitStatus == Node.CONDITION)
258                     ++n;
259             }
260             return n;
261         }
262 
263         /** 先占位 */
264         protected final Collection<Thread> getWaitingThreads() {
265             if (!isHeldExclusively())
266                 throw new IllegalMonitorStateException();
267             ArrayList<Thread> list = new ArrayList<Thread>();
268             for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
269                 if (w.waitStatus == Node.CONDITION) {
270                     Thread t = w.thread;
271                     if (t != null)
272                         list.add(t);
273                 }
274             }
275             return list;
276         }
277     }

 

  成员属性:

 1     /** 双端队列:头节点 */
 2     private transient volatile Node head;
 3 
 4     /** 双端队列:尾节点 */
 5     private transient volatile Node tail;
 6 
 7     /** 同步状态 */
 8     private volatile int state;
 9 
10     /** 超时时间 */
11     static final long spinForTimeoutThreshold = 1000L;
12 
13     /** CAS各字段的long值属性 */
14     private static final Unsafe unsafe = Unsafe.getUnsafe();
15     private static final long stateOffset;
16     private static final long headOffset;
17     private static final long tailOffset;
18     private static final long waitStatusOffset;
19     private static final long nextOffset;
20 
21     static {
22         try {
23             stateOffset = unsafe.objectFieldOffset
24                     (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
25             headOffset = unsafe.objectFieldOffset
26                     (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
27             tailOffset = unsafe.objectFieldOffset
28                     (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
29             waitStatusOffset = unsafe.objectFieldOffset
30                     (Node.class.getDeclaredField("waitStatus"));
31             nextOffset = unsafe.objectFieldOffset
32                     (Node.class.getDeclaredField("next"));
33 
34         } catch (Exception ex) { throw new Error(ex); }
35     }

 

  核心方法-独占式获取同步状态:

  1     /** 独占式获取同步状态 */
  2     public final void acquire(int arg) {
  3         /** 先尝试获取同步状态(由子类实现),成功则直接返回,失败则创建一个当前线程的Node加入队列队尾,然后自旋获取同步状态 */
  4         if (!tryAcquire(arg) &&
  5                 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6         /** 当前线程自我中断 */
  7             selfInterrupt();
  8     }
  9 
 10 
 11     /** 把Node节点加入双端队列队尾 */
 12     private Node addWaiter(Node mode) {
 13         /** 新建一个Node,持有当前线程的引用 */
 14         Node node = new Node(Thread.currentThread(), mode);
 15         // Try the fast path of enq; backup to full enq on failure
 16         Node pred = tail;
 17         /** 如果队列不为空,先尝试一次快速入队:新增Node节点的前一个节点指向原tail节点,
 18          * 然后尝试cas更改tail属性的引用为新增Node节点,最后把原tail节点的下一个节点指向新增Node节点,完成插入队尾 */
 19         if (pred != null) {
 20             node.prev = pred;
 21             if (compareAndSetTail(pred, node)) {
 22                 pred.next = node;
 23                 return node;
 24             }
 25         }
 26         /** 如果双端队列未初始化,或者cas更改失败,tail节点已经变更,则执行自旋入队方法 */
 27         enq(node);
 28         return node;
 29     }
 30 
 31 
 32     /** 自旋方式入队 */
 33     private Node enq(final Node node) {
 34         for (;;) {
 35             Node t = tail;
 36             /** 如果队列未初始化,则采用cas方式新增一个Node,并使头尾节点相同 */
 37             if (t == null) { // Must initialize
 38                 if (compareAndSetHead(new Node()))
 39                     tail = head;
 40             } else {
 41                 /** 如果队列已经初始化,则采用快速入队方式尝试入队,失败则继续自旋 */
 42                 node.prev = t;
 43                 if (compareAndSetTail(t, node)) {
 44                     t.next = node;
 45                     return t;
 46                 }
 47             }
 48         }
 49     }
 50 
 51 
 52     /** 自旋获取同步状态 */
 53     final boolean acquireQueued(final Node node, int arg) {
 54         boolean failed = true;
 55         try {
 56             boolean interrupted = false;
 57             for (;;) {
 58                 final Node p = node.predecessor();
 59                 /** 如果当前节点的前一个节点是head节点,说明前一个节点可能已经释放了同步状态,并唤醒了自己,则尝试一次去获取同步状态
 60                  *  如果获取同步状态成功,则设置当前节点为head节点,前一个节点(指向原head节点)为null,并设置原head节点的后一个节点为null,断开引用,使原head节点被GC */
 61                 if (p == head && tryAcquire(arg)) {
 62                     setHead(node);
 63                     p.next = null; // help GC
 64                     failed = false;
 65                     return interrupted;
 66                 }
 67                 /** 如果获取同步状态失败,则判断是否需要挂起当前线程,如果需要挂起,则挂起当前线程,并检查中断状态,如果被中断过哪怕一次,也设置标志位 被中断 */
 68                 if (shouldParkAfterFailedAcquire(p, node) &&
 69                         parkAndCheckInterrupt())
 70                     interrupted = true;
 71             }
 72         } finally {
 73             /** 正常情况下,不断自旋,failed的值会是false,只有子类实现的tryAcquire()方法抛出异常时,才会触发下面的逻辑,则把该节点置为取消状态*/
 74             if (failed)
 75                 cancelAcquire(node);
 76         }
 77     }
 78 
 79 
 80     /** 判断是否需要挂起当前线程 */
 81     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 82         int ws = pred.waitStatus;
 83         /** 如果前一个节点的等待状态为SIGNAL,则表示前一个节点释放同步状态后会唤醒自己,可以放心的挂起了 */
 84         if (ws == Node.SIGNAL)
 85             /*
 86              * This node has already set status asking a release
 87              * to signal it, so it can safely park.
 88              */
 89             return true;
 90         /** 如果前一个节点的等待状态为CANCELLED,则表示前一个节点被取消了,则往前找第一个没有被取消的节点 */
 91         if (ws > 0) {
 92             /*
 93              * Predecessor was cancelled. Skip over predecessors and
 94              * indicate retry.
 95              */
 96             do {
 97                 /** 原队列
 98                  *  pp --> pred --> node
 99                  *  pp <-- pred <-- node
100                  *  处理
101                  *  pred = pp
102                  *  node.prev = pp
103                  *  新队列
104                  *  pp <-- node
105                  *  pp --> node
106                  *  使取消的Node断开队列引用,被GC,如果新的前一个节点也被取消,则继续循环
107                  */
108                 node.prev = pred = pred.prev;
109             } while (pred.waitStatus > 0);
110             pred.next = node;
111         } else {
112             /*
113              * waitStatus must be 0 or PROPAGATE.  Indicate that we
114              * need a signal, but don‘t park yet.  Caller will need to
115              * retry to make sure it cannot acquire before parking.
116              */
117             /** 如果前一个节点的等待状态为0或者PROPAGATE,则设置前一个节点为SIGNAL,告诉它记得唤醒我。如果cas设置失败,则在外自旋acquireQueued()方法中会再次进入此方法尝试cas更改,因为只有前一个节点等待状态变更为SIGNAL,才会返回true,才能执行下一步挂起操作 */
118             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
119         }
120         return false;
121     }
122 
123 
124     /** 挂起线程,并检查线程是否中断 */
125     private final boolean parkAndCheckInterrupt() {
126         LockSupport.park(this);
127         return Thread.interrupted();
128     }

 

 

  

 

以上是关于AQS(队列同步器)的主要内容,如果未能解决你的问题,请参考以下文章

队列同步器——AQS

AQS(队列同步器)

Java并发之AQS同步器学习

[Java并发] AQS抽象队列同步器源码解析--锁获取过程

五:抽象队列同步器AQS应用Lock详解

同步队列器AQS之condition等待队列的实现分析