AbstractQueuedSynchronizer源码
Posted emoji-emoji
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AbstractQueuedSynchronizer源码相关的知识,希望对你有一定的参考价值。
/** * 独占模式下进行请求,如果当前线程被中断,放弃方法执行(抛异常), * 1.检查当前线程的中断状态,然后至少执行一次tryAcquire, * 2.如果成功,方法返回; * 3.如果失败,当前线程会在同步等待队列中排队,直到方法返回成功或者线程被中断。 */ public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } //acquireQueued类似,但是这里响应中断(抛异常) private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } //通过抛异常,来响应中断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
指定时间内尝试请求控制权,
/** * 独占模式下进行请求,如果当前线程被中断,响应中断(抛异常),返回 * 指定时间内尝试请求控制权, */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
共享模式下请求控制权,
/** * 1.调用tryAcquireShared尝试进行控制权请求,如果成功,返回。 * 2.如果请求失败,那么会用当前线程建立一个共享模式的节点,然后将节点 * 放到同步等待队列的尾部,进入循环。 * 3.循环中会判断当前同步等待队列中是否有其他线程,如果没有,再次调用tryAcquireShared * 4.如果请求成功,将当前节点设置为同步等待队列的头节点,同时检查是否需要 * 继续唤醒下一个共享模式的节点,如果需要就继续执行唤醒动作。这里还会想上传递中断 * 状态,然后退出循环。 * 5.如果在同步等待队列中,在当前线程前面有其他线程,或者第3步失败, * 那么首先需要检查当前节点是否已经设置等待唤醒标记,即将非取消状态前驱节点的等待 * 状态设置为SIGNAL。 * 6.如果没有设置等待唤醒标记,进行设置,然后继续循环,进入第三步 * 7.如果已经设置等待唤醒标记,那么阻塞当前线程 * 8.当前线程被唤醒后,设置传递中断标记,然后继续循环,继续第3步。 * 9.最后在循环退出后,要判断请求是否失败,如果失败,当前线程取消请求、 */ /** * 共享模式下请求控制权,忽略中断, */ public final void acquireShared(int arg) { //如果以共享模式尝试请求失败 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * 在共享模式下尝试请求(控制权),需要先查看一下对象的状态是否允许在共享 * 模式下请求,如果允许在进行请求。 * * 这个方法总是被请求线程执行,如果方法执行失败,会将当前线程放到同步等待队列 * 中(如果当前线程还不在同步等待队列中),直到被其他线程的释放操作唤醒。 * * 留给子类实现 */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } private void doAcquireShared(int arg) { //将当前线程以共享模式加入同步等待队列。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; //循环 for (;;) { //获取当前节点的前驱p final Node p = node.predecessor(); //如果p是头节点, if (p == head) { //调用tryAcquireShared int r = tryAcquireShared(arg); if (r >= 0) { //到这里,说明tryAcquireShared执行成功, //即在共享模式下获取控制权成功, setHeadAndPropagate(node, r); p.next = null; // help GC //检测中断状态, if (interrupted) selfInterrupt(); // failed = false; return; } } //如果当前节点的前驱节点不是头节点,判断当前节点请求失败后 //是否需要被阻塞,如果需要,阻塞并保存当前线程的中断状态 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * 将node设置为同步等待队列的头节点,并且检测一下node的后继节点是否 * 在共享模式下等待,如果是,并且propagate>0或者之前头节点的等待状态是 * PROPAGATE,唤醒后继节点。 */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /** * 尝试去唤醒队列中的下一个节点,如果满足下面的条件: * 传递(propagate>0), * 或者h.waitStatus为PROPAGATE,且下一个节点处于共享模式或者为null */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } /** * 共享模式下释放控制权,唤醒后继节点并确保传递。 * 注:在独占模式下,释放仅仅意味着,唤醒头结点的后继节点。 */ private void doReleaseShared() { /** * 保证释放动作传递(向同步等待队列尾部),即使没有其他正在进行的请求或释放动作。 * 如果头节点的后继节点需要唤醒,那么执行唤醒动作;如果不需要唤醒,将头节点的等待状态 * 设置为PROPAGATE保证唤醒传递。另外,为了防止过程中有新节点进入(队列), * 这里必须循环,所以,和其他unparkSuccessor方法使用方式不一样的是,如果 * 头结点等待状态设置失败,重新检测。 */ for (;;) { Node h = head; if (h != null && h != tail) { //如果同步等待队列不为空,获取头节点的等待状态。 int ws = h.waitStatus; //如果等待状态是SIGNAL。说明后继节点需要唤醒 if (ws == Node.SIGNAL) { //尝试修改等待状态 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //修改失败,重新循环 continue; // loop to recheck cases //修改成功,唤醒头节点的后继节点 unparkSuccessor(h); } //如果等待状态是0,尝试将头节点设置为PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //失败,继续循环 continue; // loop on failed CAS } //头节点没有发生变化才退出循环 if (h == head) // loop if head changed break; } }
共享模式下尝试请求,响应中断
//共享模式下尝试请求,响应中断,抛异常 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
共享模式下尝试请求,响应中断,且支持自定义时间,
//共享模式下尝试请求,响应中断,且支持自定义时间 public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
独占模式下的释放方法,
/** * 独占模式下的释放方法。 * 如果tryRelease返回true,会唤醒一个或多个线程。 */ public final boolean release(int arg) { if (tryRelease(arg)) { //如果tryRelease成功 Node h = head; //判断同步等待队列里面是否右需要唤醒的线程, if (h != null && h.waitStatus != 0) //如果有,就唤醒 unparkSuccessor(h); return true; } return false; }
共享模式下的释放方法:
/** * 共享模式下的释放方法。 * 如果tryReleaseShared返回true,会唤醒一个或者多个线程 */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } /** * 共享模式下释放控制权,唤醒后继节点并确保传递。 * 注:在独占模式下,释放仅仅意味着,唤醒头结点的后继节点。 */ private void doReleaseShared() { /** * 保证释放动作传递(向同步等待队列尾部),即使没有其他正在进行的请求或释放动作。 * 如果头节点的后继节点需要唤醒,那么执行唤醒动作;如果不需要唤醒,将头节点的等待状态 * 设置为PROPAGATE保证唤醒传递。另外,为了防止过程中有新节点进入(队列), * 这里必须循环,所以,和其他unparkSuccessor方法使用方式不一样的是,如果 * 头结点等待状态设置失败,重新检测。 */ for (;;) { Node h = head; if (h != null && h != tail) { //如果同步等待队列不为空,获取头节点的等待状态。 int ws = h.waitStatus; //如果等待状态是SIGNAL。说明后继节点需要唤醒 if (ws == Node.SIGNAL) { //尝试修改等待状态 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //修改失败,重新循环 continue; // loop to recheck cases //修改成功,唤醒头节点的后继节点 unparkSuccessor(h); } //如果等待状态是0,尝试将头节点设置为PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //失败,继续循环 continue; // loop on failed CAS } //头节点没有发生变化才退出循环 if (h == head) // loop if head changed break; } }
//返回同步等待队列中第一个线程,没有返回null。 public final Thread getFirstQueuedThread() { // handle only fast path, else relay return (head == tail) ? null : fullGetFirstQueuedThread(); } private Thread fullGetFirstQueuedThread() { /** * 通常情况下,头节点的next指向的就是队列里第一个节点。 * 尝试获取第一个节点的线程,保证读取的一致性:如果线程为null, * 或者第一个节点的前驱节点已经不是头节点,那么说明其他线程正在 * 调用setHead方法。这里尝试获取两次,如果失败,再进行下面的遍历 */ Node h, s; Thread st; if (((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null) || ((h = head) != null && (s = h.next) != null && s.prev == head && (st = s.thread) != null)) return st; /** * 头节点的next可能还没有设置,或者已经在setHead后被重置。 * 所以我们必须验证尾节点是否是真的是第一个节点。如果不是, * 从尾节点反向遍历去查找头节点,确保程序退出。 */ Node t = tail; Thread firstThread = null; while (t != null && t != head) { Thread tt = t.thread; if (tt != null) firstThread = tt; t = t.prev; } return firstThread; }
/** * 如果同步等待队列中第一个线程是独占模式,返回true。 * 如果这个方法返回true,并且当前线程正在尝试在共享模式下请求, * 那么可以保证当前线程不是同步等待队列里的第一个线程。 */ final boolean apparentlyFirstQueuedIsExclusive() { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null; }
看看内部类ConditionObject:(条件等待队列)
await方法:
/** * 1.如果当前线程有中断状态,抛异常,响应中断。 * 2.添加当前线程当条件等待队列。 * 3.释放当前线程对AQS的控制权,并保存释放前AQS的状态。 * 4.进入条件循环,条件为,判断当前线程是否在AQS同步队列中, * 如果不在,那么阻塞当前线程;如果在同步队列中,跳到第7步 * 5.当前线程被(其他线程)唤醒后,要检查等待过程中是否被中断或者 * 取消,如果没有继续循环,开始第4步 * 6.如果是,保存中断状态和模式,然后退出条件循环。 * 7.请求AQS控制权,然后做一些收尾工作,如果被取消,清理一些条件等待队列。 * 然后按照中断模式处理一下中断。 */ //可中断的条件等待方法。 public final void await() throws InterruptedException { //检查线程中断状态,响应中断 if (Thread.interrupted()) throw new InterruptedException(); //如果不是中断状态,将当前线程添加到条件等待队列。 Node node = addConditionWaiter(); //释放当前线程对AQS的控制权,并返回当前AQS中的state的值 int savedState = fullyRelease(node); int interruptMode = 0; //判断当前线程是否在AQS的同步等待队列中 while (!isOnSyncQueue(node)) { //如果不在,阻塞当前线程。 LockSupport.park(this); //其他线程调用相同条件上的signal/sinalAll方法时,会将这个节点 //从条件队列转义到AQS的同步等待队列中。 //被唤醒后需要检查是否在等待过程中被中断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //如果有中断,退出循环 break; } //重新请求AQS的控制权 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0)//如果发生过中断,在这里处理 reportInterruptAfterWait(interruptMode); } /** * 向同步等待队列中添加一个新的节点 */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //根据当前线程创建一个节点 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) //到这里,说明node是队列中的第一个节点,那么将firstWaiter指向这个节点 firstWaiter = node; else //如果队列中已经存在其他节点,那么将t的nextWaiter指向该node节点 t.nextWaiter = node; //将lastWaiter指向node节点 lastWaiter = node; return node;//返回node } /** * 移除条件等待队列中的取消状态节点。 * 这个方法一定是在持有锁(拥有AQS控制权)的情况下被调用的(所以不存在竞争)。 * 当等待条件时被(节点的线程)取消,或者当lastWaiter被取消后,条件等待队列中进入了 * 一个新节点时会调用这个方法。这个方法需要避免由于没有signal而引起的 * 垃圾滞留。所以尽管方法内会做一个完全遍历,也只有超时获取或取消时(没有 * signal的情况下)才被调用。方法中会遍历所有节点,切断所有指向垃圾节点的引用, * 而不是一次取消切断一个引用。 */ private void unlinkCancelledWaiters() { //获取条件等待队列中的头节点t Node t = firstWaiter; Node trail = null; while (t != null) { //如果队列中有等待节点。获取头结点的nextWaiter节点next Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { //如果t被取消,将t的nextWaiter清空 t.nextWaiter = null; if (trail == null) //将next设置为头节点(移除之前的取消节点) firstWaiter = next; //否则说明队列前端有未取消的节点,这里链接(移除中间的 // 取消节点) else trail.nextWaiter = next; if (next == null) lastWaiter = trail;//设置尾节点 } else//如果t没被取消,将trail指向t trail = t; t = next; } } /** * 调用release方法,并传入当前的state * 成功,返回之前的state * 失败,抛异常,并取消当前节点 */ final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } /** * 如果一个node最初放在一个条件队列里,而现在正在AQS的同步等待队列里, * 返回true */ final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果有后继节点,说明肯定在AQS同步等待队列里面 if (node.next != null) // If has successor, it must be on queue return true; /** * node.prev不为空并不能说明节点在AQS的同步等待队列里面, * 因为后续的CAS操作可能会失败,这里从尾节点反向遍历。 */ return findNodeFromTail(node); }
signal方法:
/** * 将条件等待队列里面等待时间最长(链表最前面)的线程(如果存在的话), * 移动到AQS同步等待队列里面。 */ public final void signal() { //判断AQS的控制权是否被当前线程以独占的方式持有。 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null)//判断条件队列里面是否有线程等待, //如果有,执行唤醒操作 doSignal(first); } //唤醒指定节点 private void doSignal(Node first) { do { //移除first if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; //调用transferForSignal,如果调用失败,且条件等待队列不为空, //继续循环操作 } while (!transferForSignal(first) && (first = firstWaiter) != null); } //将一个节点从条件等待队列转移到同步等待队列,如果成功,返回true。 final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //如果设置等待状态失败,说明节点已经被取消了,返回false。 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //将node加入到AQS同步等待队列中,并返回node的前驱 Node p = enq(node); int ws = p.waitStatus; //如果前驱节点被取消,或者尝试设置前驱节点的状态SIGNAL(表示node需要唤醒) //失败,那么唤醒node节点上的线程。 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
signalAll方法:
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } //唤醒所有节点 private void doSignalAll(Node first) { //将条件等待队列的头尾节点置空 lastWaiter = firstWaiter = null; //循环遍历清空所有节点。 do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); }
以上是关于AbstractQueuedSynchronizer源码的主要内容,如果未能解决你的问题,请参考以下文章