AbstractQueuedSynchronizer解析

Posted gouden

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AbstractQueuedSynchronizer解析相关的知识,希望对你有一定的参考价值。

AbstractQueuedSynchronizer简称为AQS,是juc里很基本的一个包,juc里很多工具类是基于AQS实现的,理解了AQS,其它很多juc工具类也会比较清楚了。

1、方法简述

getState

  返回当前state的值,该操作具有volatile读的内存语义。

setState

  设置state的值,该操作具有volatile写的内存语义。

compareAndSetState

  以cas方式设置state的值,该操作具有volatile读和写的语义。

tryAcquire

  尝试以独占模式获取对象状态。默认实现将抛出异常。

tryRelease

  尝试以独占模式设置表示释放的状态。默认实现将抛出异常。

tryAcquireShared

  尝试以共享模式获取对象状态。默认实现将抛出异常。

tryReleaseShared

  尝试以共享模式设置表示释放的状态。默认实现将抛出异常。

isHeldExclusively

  当前线程,同步是以独占方式进行的,则返回true。默认实现将抛出异常。

acquire

  以独占模式获取对象,忽略中断。

acquireInterruptibly

  以独占模式获取对象,如果被中断则被终止。

tryAcquireNanos

  与acquireInterruptibly类似,带有等待时间。

release

  以独占方式释放对象。

acquireShared

  以共享模式获取对象,忽略中断。

acquireSharedInterruptibly

  以共享模式获取对象,如果被中断则被终止。

tryAcquireSharedNanos

  与acquireSharedInterruptibly类似,带有等待时间。

releaseShared

  以共享方式释放对象。

hasQueuedThreads

  判断是否有正在等待获取的线程。

hasContented

  查询是否其他线程层争用此同步器。

getFirstQueuedThread

  获取队列中第一个线程,如果没有返回空。

isQueued

  判断给定线程是否在队列中。

getQueueLength

  获取队列里等待线程的个数。

getQueuedThreads

  获取队列里等待的线程。

getExclusiveQueuedThreads

  返回以独占模式获取对象等待的线程。

getSharedQueuedThreads

  返回以共享模式获取对象等待的线程。

2、源码解析

先看一下AQS的Node里定义的几个状态:

1     static final int CANCELLED =  1;
2     static final int SIGNAL    = -1;
3     static final int CONDITION = -2;
4     static final int PROPAGATE = -3;

CANCELLED状态是队列里的节点因为超时或中断而不再参与争夺资源的状态

SIGNAL状态是线程需要被唤醒的状态,这个状态的使用方式为将当前节点的前面节点设置为此状态,前面节点释放资源是如果检测是这个状态则唤醒当前节点(如果当前节点的状态正常)

CONDITION状态是线程需要等待一个条件成立,条件成立会出队

PROPAGATE状态是在共享状态下,当前节点释放资源时,传播给后面所有节点

再说一下AQS的两种模式:独占模式共享模式

独占模式可以理解为上课用教室或机房,这个教室或机房里有30个位子,就算只有11个人来上课,也不能有其他人来用这个教室或机房(资源)

共享模式可以理解为排队自习用教室或上机用机房,这个教室或机房如果有空的位置,而且空的位置够排队的第一个人用的,那就让排队的第一个人进来用,第一个人进来后发现还能进第二个人用,那就让第二个人也进来用,以此类推;但如果不够第二个人用而够第三个人用,让第三个人进来。

acquire及相关方法(独占模式):

1     public final void acquire(int arg) {
2         if (!tryAcquire(arg) &&
3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4             selfInterrupt();//设置中断状态
5     }

该方法会先调用一个非等待的尝试获取方法(这个方法由用户实现),如果获取成功就直接返回,否则就调用addWaiter方法并调用acquireQueued方法,addWaiter方法的代码如下:

 1     private Node addWaiter(Node mode) {
 2         Node node = new Node(Thread.currentThread(), mode);//Node是内部类,队列是由Node组成的
 3         // Try the fast path of enq; backup to full enq on failure
 4         Node pred = tail;//先快速入队
 5         if (pred != null) {
 6             node.prev = pred;
 7             if (compareAndSetTail(pred, node)) {
 8                 pred.next = node;
 9                 return node;//如果成功就返回
10             }
11         }
12         enq(node);//失败就比较复杂地入队
13         return node;
14     }

可以看出addWaiter方法就是让当前线程入队。

acquireQueued方法的代码如下:

 1     final boolean acquireQueued(final Node node, int arg) {
 2         boolean failed = true;
 3         try {
 4             boolean interrupted = false;
 5             for (;;) {
 6                 final Node p = node.predecessor();
 7                 if (p == head && tryAcquire(arg)) {//判断当前节点的前一个节点是否为队首,如果是,就尝试获取资源,如果获取成功,就把当前节点设置为队首,获取成功
 8                     setHead(node);
 9                     p.next = null; // help GC
10                     failed = false;
11                     return interrupted;
12                 }
13                 if (shouldParkAfterFailedAcquire(p, node) &&//获取资源失败,判断是否应该等待,如果应该,就调用parkAndCheckInterrupt方法等待,并检查是否有中断状态;如果不应该等待则说明队列的状态有变化,当前节点有机会获取资源,会再次尝试
14                     parkAndCheckInterrupt())
15                     interrupted = true;
16             }
17         } finally {
18             if (failed)
19                 cancelAcquire(node);
20         }
21     }

这个方法的作用就是一直尝试获取资源,直到获取了资源或者取消了,中间的过程会根据情况判断是否等待,返回值为是否出现中断的情况。

shouldParkAfterFailedAcquire方法的代码如下:

 1     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2         int ws = pred.waitStatus;//获取前面节点的状态
 3         if (ws == Node.SIGNAL)//如果前面节点的状态为SIGNAL,则说明已经告诉前面节点前面节点运行完通知当前节点
 4             /*
 5              * This node has already set status asking a release
 6              * to signal it, so it can safely park.
 7              */
 8             return true;
 9         if (ws > 0) {//如果前面节点状态大于0,则为CANCELLED,则说明前面节点取消了获取,跳过前面节点,找到最近的正常状态的节点,放到该节点后面
10             /*
11              * Predecessor was cancelled. Skip over predecessors and
12              * indicate retry.
13              */
14             do {
15                 node.prev = pred = pred.prev;
16             } while (pred.waitStatus > 0);
17             pred.next = node;
18         } else {//如果前面节点是正常的,则设置为SIGNAL,告诉前面节点运行完通知当前节点 有一种可能,那就是当前刚给前面节点设置了SIGNAL,前面节点就已经获取资源并释放了,所以这种状态就返回false,先不等待,运行一下是否有前面节点刚好释放的情况出现
19             /*
20              * waitStatus must be 0 or PROPAGATE.  Indicate that we
21              * need a signal, but don‘t park yet.  Caller will need to
22              * retry to make sure it cannot acquire before parking.
23              */
24             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
25         }
26         return false;
27     }

这个方法的作用就是判断在获取资源失败后是否应该等待。

parkAndCheckInterrupt方法的代码如下:

1     private final boolean parkAndCheckInterrupt() {
2         LockSupport.park(this);
3         return Thread.interrupted();
4     }

这个方法的作用就是等待并返回是否中断并消除中断状态。

release及相关方法(独占模式):

1     public final boolean release(int arg) {
2         if (tryRelease(arg)) {
3             Node h = head;
4             if (h != null && h.waitStatus != 0)
5                 unparkSuccessor(h);//唤醒队列里的下一个线程
6             return true;
7         }
8         return false;
9     }

unparkSuccessor方法的代码:

 1     private void unparkSuccessor(Node node) {
 2         /*
 3          * If status is negative (i.e., possibly needing signal) try
 4          * to clear in anticipation of signalling.  It is OK if this
 5          * fails or if status is changed by waiting thread.
 6          */
 7         int ws = node.waitStatus;
 8         if (ws < 0)
 9             compareAndSetWaitStatus(node, ws, 0);
10 
11         /* 这里的意思就是如果下一个节点没有取消,则唤醒它;如果取消了,则从尾开始找一个没有取消的节点,唤醒这个节点
12          * Thread to unpark is held in successor, which is normally
13          * just the next node.  But if cancelled or apparently null,
14          * traverse backwards from tail to find the actual
15          * non-cancelled successor.
16          */
17         Node s = node.next;
18         if (s == null || s.waitStatus > 0) {
19             s = null;
20             for (Node t = tail; t != null && t != node; t = t.prev)
21                 if (t.waitStatus <= 0)
22                     s = t;
23         }
24         if (s != null)
25             LockSupport.unpark(s.thread);
26     }

这个方法的作用就是唤醒后面节点(线程),如果后面节点取消获取,则从队列尾部向前找到一个没有取消的节点,唤醒这个节点。

acquireShared及相关方法(共享模式):

1     public final void acquireShared(int arg) {
2         if (tryAcquireShared(arg) < 0)//尝试获取资源,如果成功就返回,如果失败则调用下面方法
3             doAcquireShared(arg);//
4     }
doAcquireShared方法的代码:
 1     private void doAcquireShared(int arg) {
 2         final Node node = addWaiter(Node.SHARED);//以SHARED模式添加一个节点
 3         boolean failed = true;
 4         try {
 5             boolean interrupted = false;
 6             for (;;) {
 7                 final Node p = node.predecessor();
 8                 if (p == head) {
 9                     int r = tryAcquireShared(arg);//尝试获取资源
10                     if (r >= 0) {
11                         setHeadAndPropagate(node, r);//将当前节点设置为head,如果还有剩余资源则可以再唤醒之后的线程
12                         p.next = null; // help GC
13                         if (interrupted)
14                             selfInterrupt();
15                         failed = false;
16                         return;
17                     }
18                 }
19                 if (shouldParkAfterFailedAcquire(p, node) &&
20                     parkAndCheckInterrupt())
21                     interrupted = true;
22             }
23         } finally {
24             if (failed)
25                 cancelAcquire(node);
26         }
27     }

这个方法的作用就是以共享模式获取资源。

setHeadAndPropagate方法的代码:

 1     private void setHeadAndPropagate(Node node, int propagate) {
 2         Node h = head; // Record old head for check below
 3         setHead(node);
 4         /*
 5          * Try to signal next queued node if:
 6          *   Propagation was indicated by caller,
 7          *     or was recorded (as h.waitStatus either before
 8          *     or after setHead) by a previous operation
 9          *     (note: this uses sign-check of waitStatus because
10          *      PROPAGATE status may transition to SIGNAL.)
11          * and
12          *   The next node is waiting in shared mode,
13          *     or we don‘t know, because it appears null
14          *
15          * The conservatism in both of these checks may cause
16          * unnecessary wake-ups, but only when there are multiple
17          * racing acquires/releases, so most need signals now or soon
18          * anyway.
19          */
20         if (propagate > 0 || h == null || h.waitStatus < 0 ||
21             (h = head) == null || h.waitStatus < 0) {//如果还有剩余的资源,则唤醒下一个节点(线程)
22             Node s = node.next;
23             if (s == null || s.isShared())
24                 doReleaseShared();
25         }
26     }

这个方法的作用就是将当前节点设置为队首并根据剩余资源数判断是否唤醒下一个节点(线程)。

releaseShared及相关方法(共享模式):

1     public final boolean releaseShared(int arg) {
2         if (tryReleaseShared(arg)) {//尝试释放资源
3             doReleaseShared();
4             return true;
5         }
6         return false;
7     }

doReleaseShared方法的代码:

 1     private void doReleaseShared() {
 2         /*
 3          * Ensure that a release propagates, even if there are other
 4          * in-progress acquires/releases.  This proceeds in the usual
 5          * way of trying to unparkSuccessor of head if it needs
 6          * signal. But if it does not, status is set to PROPAGATE to
 7          * ensure that upon release, propagation continues.
 8          * Additionally, we must loop in case a new node is added
 9          * while we are doing this. Also, unlike other uses of
10          * unparkSuccessor, we need to know if CAS to reset status
11          * fails, if so rechecking.
12          */
13         for (;;) {
14             Node h = head;
15             if (h != null && h != tail) {
16                 int ws = h.waitStatus;
17                 if (ws == Node.SIGNAL) {
18                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//
19                         continue;            // loop to recheck cases
20                     unparkSuccessor(h);
21                 }
22                 else if (ws == 0 &&
23                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
24                     continue;                // loop on failed CAS
25             }
26             if (h == head)                   // loop if head changed
27                 break;
28         }
29     }

这个方法的作用就是判断是否唤醒后面节点(线程)。

3、补充

AQS有个有意思的地方,那就是state的变量的值不是直接用cas修改的,而是用一个stateOffset的偏移量修改的,stateOffset的获取是用下面的代码获取的:

1     stateOffset = unsafe.objectFieldOffset
2                                         (AbstractQueuedSynchronizer.class.getDeclaredField("state"));

注释说是为了以后的加强而没有用AtomicInteger子类的方式实现。个人的能力有限,暂时还没看出巧妙在什么地方,也没有找到描述这个设计巧妙之处的文章。


以上是关于AbstractQueuedSynchronizer解析的主要内容,如果未能解决你的问题,请参考以下文章

ReentrantLock原理源码详解

一行一行源码分析清楚 AbstractQueuedSynchronizer

Java并发-- AQS 原理