AQS(队列同步器)
Posted yqxx1116
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS(队列同步器)相关的知识,希望对你有一定的参考价值。
目录导引:
一、简介
二、源码解析(JDK8)
三、运用示例
一、简介
AQS(AbstractQueuedSynchronizer)的核心思想是基于volatile int state变量,配合Unsafe工具对其原子性的操作来实现对当前state状态值进行修改。
同步器内部依赖一个FIFO的双向队列来完成资源获取线程的排队工作。
同步器主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,对同步状态的修改或者访问主要通过同步器提供的3个方法:
getState() 获取当前的同步状态
setState(int newState) 设置当前同步状态
compareAndSetState(int expect,int update) 使用CAS设置当前状态,该方法能够保证状态设置的原子性
同步器可以支持独占式的获取同步状态,也可以支持共享式的获取同步状态,这样可以方便实现不同类型的同步组件,利用同步器实现锁的语义。
二、源码解析(JDK8)
内部类Node:
1 static final class Node { 2 3 /** 分享模式节点 */ 4 static final Node SHARED = new Node(); 5 6 /** 独占模式节点 */ 7 static final Node EXCLUSIVE = null; 8 9 /** 节点等待状态:表示线程已取消 */ 10 static final int CANCELLED = 1; 11 12 /** 节点等待状态:表示后继线程需要被唤醒 */ 13 static final int SIGNAL = -1; 14 15 /** 节点等待状态:表示线程在Condtion上 */ 16 static final int CONDITION = -2; 17 18 /** 节点等待状态:表示下一个acquireShared需要无条件的传播 */ 19 static final int PROPAGATE = -3; 20 21 /** 节点的等待状态 */ 22 volatile int waitStatus; 23 24 /** 前一个节点引用 */ 25 volatile Node prev; 26 27 /** 后一个节点引用 */ 28 volatile Node next; 29 30 /** 节点持有的线程 */ 31 volatile Thread thread; 32 33 /** 节点状态:已取消 */ 34 Node nextWaiter; 35 36 /** 分享模式节点 */ 37 final boolean isShared() { 38 return nextWaiter == SHARED; 39 } 40 41 /** 分享模式节点 */ 42 final Node predecessor() throws NullPointerException { 43 Node p = prev; 44 if (p == null) 45 throw new NullPointerException(); 46 else 47 return p; 48 } 49 50 /** 分享模式节点 */ 51 Node() { // Used to establish initial head or SHARED marker 52 } 53 54 /** 分享模式节点 */ 55 Node(Thread thread, Node mode) { // Used by addWaiter 56 this.nextWaiter = mode; 57 this.thread = thread; 58 } 59 60 /** 分享模式节点 */ 61 Node(Thread thread, int waitStatus) { // Used by Condition 62 this.waitStatus = waitStatus; 63 this.thread = thread; 64 } 65 }
内部类ConditionObject:
1 public class ConditionObject implements Condition, java.io.Serializable { 2 3 private static final long serialVersionUID = 1173984872572414699L; 4 5 /** 先占位 */ 6 private transient Node firstWaiter; 7 8 /** 先占位 */ 9 private transient Node lastWaiter; 10 11 /** 先占位 */ 12 public ConditionObject() { } 13 14 /** 先占位 */ 15 private Node addConditionWaiter() { 16 Node t = lastWaiter; 17 // If lastWaiter is cancelled, clean out. 18 if (t != null && t.waitStatus != Node.CONDITION) { 19 unlinkCancelledWaiters(); 20 t = lastWaiter; 21 } 22 Node node = new Node(Thread.currentThread(), Node.CONDITION); 23 if (t == null) 24 firstWaiter = node; 25 else 26 t.nextWaiter = node; 27 lastWaiter = node; 28 return node; 29 } 30 31 /** 先占位 */ 32 private void doSignal(Node first) { 33 do { 34 if ( (firstWaiter = first.nextWaiter) == null) 35 lastWaiter = null; 36 first.nextWaiter = null; 37 } while (!transferForSignal(first) && 38 (first = firstWaiter) != null); 39 } 40 41 /** 先占位 */ 42 private void doSignalAll(Node first) { 43 lastWaiter = firstWaiter = null; 44 do { 45 Node next = first.nextWaiter; 46 first.nextWaiter = null; 47 transferForSignal(first); 48 first = next; 49 } while (first != null); 50 } 51 52 /** 先占位 */ 53 private void unlinkCancelledWaiters() { 54 Node t = firstWaiter; 55 Node trail = null; 56 while (t != null) { 57 Node next = t.nextWaiter; 58 if (t.waitStatus != Node.CONDITION) { 59 t.nextWaiter = null; 60 if (trail == null) 61 firstWaiter = next; 62 else 63 trail.nextWaiter = next; 64 if (next == null) 65 lastWaiter = trail; 66 } 67 else 68 trail = t; 69 t = next; 70 } 71 } 72 73 /** 先占位 */ 74 public final void signal() { 75 if (!isHeldExclusively()) 76 throw new IllegalMonitorStateException(); 77 Node first = firstWaiter; 78 if (first != null) 79 doSignal(first); 80 } 81 82 /** 先占位 */ 83 public final void signalAll() { 84 if (!isHeldExclusively()) 85 throw new IllegalMonitorStateException(); 86 Node first = firstWaiter; 87 if (first != null) 88 doSignalAll(first); 89 } 90 91 /** 先占位 */ 92 public final void awaitUninterruptibly() { 93 Node node = addConditionWaiter(); 94 int savedState = fullyRelease(node); 95 boolean interrupted = false; 96 while (!isOnSyncQueue(node)) { 97 LockSupport.park(this); 98 if (Thread.interrupted()) 99 interrupted = true; 100 } 101 if (acquireQueued(node, savedState) || interrupted) 102 selfInterrupt(); 103 } 104 105 /** 先占位 */ 106 private static final int REINTERRUPT = 1; 107 108 /** 先占位 */ 109 private static final int THROW_IE = -1; 110 111 /** 先占位 */ 112 private int checkInterruptWhileWaiting(Node node) { 113 return Thread.interrupted() ? 114 (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 115 0; 116 } 117 118 /** 先占位 */ 119 private void reportInterruptAfterWait(int interruptMode) 120 throws InterruptedException { 121 if (interruptMode == THROW_IE) 122 throw new InterruptedException(); 123 else if (interruptMode == REINTERRUPT) 124 selfInterrupt(); 125 } 126 127 /** 先占位 */ 128 public final void await() throws InterruptedException { 129 if (Thread.interrupted()) 130 throw new InterruptedException(); 131 Node node = addConditionWaiter(); 132 int savedState = fullyRelease(node); 133 int interruptMode = 0; 134 while (!isOnSyncQueue(node)) { 135 LockSupport.park(this); 136 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 137 break; 138 } 139 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 140 interruptMode = REINTERRUPT; 141 if (node.nextWaiter != null) // clean up if cancelled 142 unlinkCancelledWaiters(); 143 if (interruptMode != 0) 144 reportInterruptAfterWait(interruptMode); 145 } 146 147 /** 先占位 */ 148 public final long awaitNanos(long nanosTimeout) 149 throws InterruptedException { 150 if (Thread.interrupted()) 151 throw new InterruptedException(); 152 Node node = addConditionWaiter(); 153 int savedState = fullyRelease(node); 154 final long deadline = System.nanoTime() + nanosTimeout; 155 int interruptMode = 0; 156 while (!isOnSyncQueue(node)) { 157 if (nanosTimeout <= 0L) { 158 transferAfterCancelledWait(node); 159 break; 160 } 161 if (nanosTimeout >= spinForTimeoutThreshold) 162 LockSupport.parkNanos(this, nanosTimeout); 163 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 164 break; 165 nanosTimeout = deadline - System.nanoTime(); 166 } 167 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 168 interruptMode = REINTERRUPT; 169 if (node.nextWaiter != null) 170 unlinkCancelledWaiters(); 171 if (interruptMode != 0) 172 reportInterruptAfterWait(interruptMode); 173 return deadline - System.nanoTime(); 174 } 175 176 /** 先占位 */ 177 public final boolean awaitUntil(Date deadline) 178 throws InterruptedException { 179 long abstime = deadline.getTime(); 180 if (Thread.interrupted()) 181 throw new InterruptedException(); 182 Node node = addConditionWaiter(); 183 int savedState = fullyRelease(node); 184 boolean timedout = false; 185 int interruptMode = 0; 186 while (!isOnSyncQueue(node)) { 187 if (System.currentTimeMillis() > abstime) { 188 timedout = transferAfterCancelledWait(node); 189 break; 190 } 191 LockSupport.parkUntil(this, abstime); 192 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 193 break; 194 } 195 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 196 interruptMode = REINTERRUPT; 197 if (node.nextWaiter != null) 198 unlinkCancelledWaiters(); 199 if (interruptMode != 0) 200 reportInterruptAfterWait(interruptMode); 201 return !timedout; 202 } 203 204 /** 先占位 */ 205 public final boolean await(long time, TimeUnit unit) 206 throws InterruptedException { 207 long nanosTimeout = unit.toNanos(time); 208 if (Thread.interrupted()) 209 throw new InterruptedException(); 210 Node node = addConditionWaiter(); 211 int savedState = fullyRelease(node); 212 final long deadline = System.nanoTime() + nanosTimeout; 213 boolean timedout = false; 214 int interruptMode = 0; 215 while (!isOnSyncQueue(node)) { 216 if (nanosTimeout <= 0L) { 217 timedout = transferAfterCancelledWait(node); 218 break; 219 } 220 if (nanosTimeout >= spinForTimeoutThreshold) 221 LockSupport.parkNanos(this, nanosTimeout); 222 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 223 break; 224 nanosTimeout = deadline - System.nanoTime(); 225 } 226 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 227 interruptMode = REINTERRUPT; 228 if (node.nextWaiter != null) 229 unlinkCancelledWaiters(); 230 if (interruptMode != 0) 231 reportInterruptAfterWait(interruptMode); 232 return !timedout; 233 } 234 235 /** 先占位 */ 236 final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { 237 return sync == AbstractQueuedSynchronizer.this; 238 } 239 240 /** 先占位 */ 241 protected final boolean hasWaiters() { 242 if (!isHeldExclusively()) 243 throw new IllegalMonitorStateException(); 244 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 245 if (w.waitStatus == Node.CONDITION) 246 return true; 247 } 248 return false; 249 } 250 251 /** 先占位 */ 252 protected final int getWaitQueueLength() { 253 if (!isHeldExclusively()) 254 throw new IllegalMonitorStateException(); 255 int n = 0; 256 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 257 if (w.waitStatus == Node.CONDITION) 258 ++n; 259 } 260 return n; 261 } 262 263 /** 先占位 */ 264 protected final Collection<Thread> getWaitingThreads() { 265 if (!isHeldExclusively()) 266 throw new IllegalMonitorStateException(); 267 ArrayList<Thread> list = new ArrayList<Thread>(); 268 for (Node w = firstWaiter; w != null; w = w.nextWaiter) { 269 if (w.waitStatus == Node.CONDITION) { 270 Thread t = w.thread; 271 if (t != null) 272 list.add(t); 273 } 274 } 275 return list; 276 } 277 }
成员属性:
1 /** 双端队列:头节点 */ 2 private transient volatile Node head; 3 4 /** 双端队列:尾节点 */ 5 private transient volatile Node tail; 6 7 /** 同步状态 */ 8 private volatile int state; 9 10 /** 超时时间 */ 11 static final long spinForTimeoutThreshold = 1000L; 12 13 /** CAS各字段的long值属性 */ 14 private static final Unsafe unsafe = Unsafe.getUnsafe(); 15 private static final long stateOffset; 16 private static final long headOffset; 17 private static final long tailOffset; 18 private static final long waitStatusOffset; 19 private static final long nextOffset; 20 21 static { 22 try { 23 stateOffset = unsafe.objectFieldOffset 24 (AbstractQueuedSynchronizer.class.getDeclaredField("state")); 25 headOffset = unsafe.objectFieldOffset 26 (AbstractQueuedSynchronizer.class.getDeclaredField("head")); 27 tailOffset = unsafe.objectFieldOffset 28 (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); 29 waitStatusOffset = unsafe.objectFieldOffset 30 (Node.class.getDeclaredField("waitStatus")); 31 nextOffset = unsafe.objectFieldOffset 32 (Node.class.getDeclaredField("next")); 33 34 } catch (Exception ex) { throw new Error(ex); } 35 }
核心方法-独占式获取同步状态:
1 /** 独占式获取同步状态 */ 2 public final void acquire(int arg) { 3 /** 先尝试获取同步状态(由子类实现),成功则直接返回,失败则创建一个当前线程的Node加入队列队尾,然后自旋获取同步状态 */ 4 if (!tryAcquire(arg) && 5 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 6 /** 当前线程自我中断 */ 7 selfInterrupt(); 8 } 9 10 11 /** 把Node节点加入双端队列队尾 */ 12 private Node addWaiter(Node mode) { 13 /** 新建一个Node,持有当前线程的引用 */ 14 Node node = new Node(Thread.currentThread(), mode); 15 // Try the fast path of enq; backup to full enq on failure 16 Node pred = tail; 17 /** 如果队列不为空,先尝试一次快速入队:新增Node节点的前一个节点指向原tail节点, 18 * 然后尝试cas更改tail属性的引用为新增Node节点,最后把原tail节点的下一个节点指向新增Node节点,完成插入队尾 */ 19 if (pred != null) { 20 node.prev = pred; 21 if (compareAndSetTail(pred, node)) { 22 pred.next = node; 23 return node; 24 } 25 } 26 /** 如果双端队列未初始化,或者cas更改失败,tail节点已经变更,则执行自旋入队方法 */ 27 enq(node); 28 return node; 29 } 30 31 32 /** 自旋方式入队 */ 33 private Node enq(final Node node) { 34 for (;;) { 35 Node t = tail; 36 /** 如果队列未初始化,则采用cas方式新增一个Node,并使头尾节点相同 */ 37 if (t == null) { // Must initialize 38 if (compareAndSetHead(new Node())) 39 tail = head; 40 } else { 41 /** 如果队列已经初始化,则采用快速入队方式尝试入队,失败则继续自旋 */ 42 node.prev = t; 43 if (compareAndSetTail(t, node)) { 44 t.next = node; 45 return t; 46 } 47 } 48 } 49 } 50 51 52 /** 自旋获取同步状态 */ 53 final boolean acquireQueued(final Node node, int arg) { 54 boolean failed = true; 55 try { 56 boolean interrupted = false; 57 for (;;) { 58 final Node p = node.predecessor(); 59 /** 如果当前节点的前一个节点是head节点,说明前一个节点可能已经释放了同步状态,并唤醒了自己,则尝试一次去获取同步状态 60 * 如果获取同步状态成功,则设置当前节点为head节点,前一个节点(指向原head节点)为null,并设置原head节点的后一个节点为null,断开引用,使原head节点被GC */ 61 if (p == head && tryAcquire(arg)) { 62 setHead(node); 63 p.next = null; // help GC 64 failed = false; 65 return interrupted; 66 } 67 /** 如果获取同步状态失败,则判断是否需要挂起当前线程,如果需要挂起,则挂起当前线程,并检查中断状态,如果被中断过哪怕一次,也设置标志位 被中断 */ 68 if (shouldParkAfterFailedAcquire(p, node) && 69 parkAndCheckInterrupt()) 70 interrupted = true; 71 } 72 } finally { 73 /** 正常情况下,不断自旋,failed的值会是false,只有子类实现的tryAcquire()方法抛出异常时,才会触发下面的逻辑,则把该节点置为取消状态*/ 74 if (failed) 75 cancelAcquire(node); 76 } 77 } 78 79 80 /** 判断是否需要挂起当前线程 */ 81 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 82 int ws = pred.waitStatus; 83 /** 如果前一个节点的等待状态为SIGNAL,则表示前一个节点释放同步状态后会唤醒自己,可以放心的挂起了 */ 84 if (ws == Node.SIGNAL) 85 /* 86 * This node has already set status asking a release 87 * to signal it, so it can safely park. 88 */ 89 return true; 90 /** 如果前一个节点的等待状态为CANCELLED,则表示前一个节点被取消了,则往前找第一个没有被取消的节点 */ 91 if (ws > 0) { 92 /* 93 * Predecessor was cancelled. Skip over predecessors and 94 * indicate retry. 95 */ 96 do { 97 /** 原队列 98 * pp --> pred --> node 99 * pp <-- pred <-- node 100 * 处理 101 * pred = pp 102 * node.prev = pp 103 * 新队列 104 * pp <-- node 105 * pp --> node 106 * 使取消的Node断开队列引用,被GC,如果新的前一个节点也被取消,则继续循环 107 */ 108 node.prev = pred = pred.prev; 109 } while (pred.waitStatus > 0); 110 pred.next = node; 111 } else { 112 /* 113 * waitStatus must be 0 or PROPAGATE. Indicate that we 114 * need a signal, but don‘t park yet. Caller will need to 115 * retry to make sure it cannot acquire before parking. 116 */ 117 /** 如果前一个节点的等待状态为0或者PROPAGATE,则设置前一个节点为SIGNAL,告诉它记得唤醒我。如果cas设置失败,则在外自旋acquireQueued()方法中会再次进入此方法尝试cas更改,因为只有前一个节点等待状态变更为SIGNAL,才会返回true,才能执行下一步挂起操作 */ 118 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 119 } 120 return false; 121 } 122 123 124 /** 挂起线程,并检查线程是否中断 */ 125 private final boolean parkAndCheckInterrupt() { 126 LockSupport.park(this); 127 return Thread.interrupted(); 128 }
以上是关于AQS(队列同步器)的主要内容,如果未能解决你的问题,请参考以下文章