JAVA并发AQS原理详解
Posted Kant101
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA并发AQS原理详解相关的知识,希望对你有一定的参考价值。
1. 概述
-
AQS全称为
AbstractQueuedSynchronizer
,是Java的 juc 当中的一个抽象类。 -
AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS之后,更多的协作工具类都可以方便得被写出来。
-
控制并发流程的类,都需要线程的
等待
和唤醒
功能,这是这些类的共同特点,因此可以抽象出一个基类,这就是AQS。 -
AQS广泛应用于控制并发流程的类中,如
CountDownLatch
、Semaphore
、ReentrantLock
、ReentrantReadWriteLock
等。 -
其中,
sync
是这些类中都有的内部类,其结构如下图,可以看到Sync类是AQS的实现类。
- AQS 主要完成的任务:
1)同步状态(比如说计数器)的原子性管理;
2)现成的阻塞和唤醒;
3)等待队列的管理;
2. AQS原理
AQS最核心的就是3个部分:
1)状态:state;
2)控制线程抢锁和配合FIFO队列(双向链表);
3)期望协作工具类去实现的获取/释放等重要方法(重写);
2.1 状态state
这里state的具体含义,会根据具体实现类的不同而不同:
- 在
Semaphore
里,它表示剩余许可证的数量; - 在
CountDownLatch
里,它表示还需要倒数的数量; - 在
ReentrantLock
里,state用来表示锁的占有情况,包括可重入计数,当state的值为0时,表示该Lock不被任何线程所占有。
state是volatile
修饰的,并被并发修改,所以修改state的方法都需要保证线程安全,比如getState
、setState
以及compareAndSetState
操作来读取和更新这个状态。
getState
和setState
是直接操作的state变量,volatile能够保证操作的原子性,compareAndSetState
依赖于Unsafe类(Unsafe类是sun.misc包下的类,能够直接操作内存,操作成功返回true,操作失败返回false,能够保证原子性)。
2.2 FIFO队列
这个队列用来存放等待的线程,AQS就是“排队管理器”,当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当释放时,锁管理器就会挑选一个合适的线程来占用这个刚刚释放的锁;
AQS会维护一个等待的线程队列,把线程都放到这个队列里,并且这个队列是双向链表形式的。
2.3 实现获取/释放等方法
这里的 获取 和 释放 方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的
,并且根据各自的功能含义不相同;
获取方法:获取state变量的操作,经常会被阻塞(比如获取不到锁的情况)
- 在
Semaphore
中,获取就是acquire
方法,作用是获取一个许可证; - 而在
CountDownLatch
里面,获取就是await
方法,作用是等待,知道倒数结束。
释放方法
- 在
Semaphore
中,释放就是release
方法,作用是释放一个许可证; - 在
CountDownLatch
里面,获取就是countDown
方法,作用是将倒数的数减一;
需要每个实现AQS的类重写tryAcquire
和tryRelease
等方法。
3. AQS源码分析
AQS在juc中用法套路:
- 第一步:写一个类,想好协作的逻辑,实现获取/释放方法;
- 第二步:类的内部写一个Sync类,继承 AbstractQueuedSynchronizer 类;
- 第三步:根据是否独占来重写
tryAcquire / tryRelease
或tryAcquireShared(int acquired) / tryReleaseShared(int releases)
等方法在之前写的 获取/释放 方法中调用AQS的acquire/release
或者acquireShared/releaseShared
方法。
(AQS中的acquire/release
方法调的是tryAcquire/tryRelease
,AQS的acquireShared/releaseShared
方法调的是tryAcquireShared/tryReleaseShared
方法,但这两个方法在AQS中没有具体实现,是在具体的类中实现的,因此调的时候会根据类动态绑定到对应实现类的方法)
1)AQS中 acquire/release
方法实现的是独占锁,调tryAcquire/tryRelease
方法,这两个方法在AQS中没有实现,需要在实现类中去具体实现,调用时会动态绑定到具体的实现类重写的方法。
2)AQS中 acquireShared/releaseShared
方法实现的是共享锁,调 tryAcquireShared/tryReleaseShared
方法,同上,也是在实现类中具体实现的,调用时再动态绑定。
4. AQS在juc中各类工具中的使用分析
4.1 AQS在CountDownLatch中的应用
CountDownLatch是共享锁。
新建一个 CountDownLatch
对象
CountDownLatch cdl = new CountDownLatch(3);
会在对象cdl的内部创建一个Sync的对象,并将 this.sync中的state设置为3,代表要倒数3次。
当调用cdl.await()
时,就会进行如下调用
调了AQS的tryAcquireShared
方法,这里会动态绑定到CountDownLatch中的Sync类中的tryAcquireShared
方法。
可以看到,这里state设置为了3,导致程序进入到acquireSharedInterruptibly
方法中,首先会阻塞当前线程。
当调用 countDown()
函数,会使得state–,当state值变成0后,就会唤醒等待队列中的第一个线程。
其中,countDown() 函数具体如下所示,调用AQS的releaseShared
方法,AQS再调tryReleaseShared
方法,动态绑定到CountDownLatch中所实现的tryReleaseShared
方法。
4.2 Semaphore
Semaphore是共享锁。
新建一个 Semaphore
对象时:
Semaphore s = new Semaphore(3);
会在 s
内部创建一个 NonfairSync
对象,其继承自 Sync
类,并将 this.sync
中的 state
值设置为3,代表有3个通行证。
当我们调用s.acquire(2)
,会获取到两个许可证,分析如下:
acquire
方法会调AQS中的acquireSharedInterruptibly
方法,
AQS中的acquireSharedInterruptibly
方法会调AQS中的tryAcquireShared
方法,会被动态绑定到Semaphore
中的静态内部类NonfairSync
中实现的tryAcquireShared
方法。
在NonfairSync
中的tryAcquireShared
又调了AQS中的doAcquireSharedInterruptibly
。可以看到当请求的通行证数大于state
的值时,返回的是负数,那么前面的acquireSharedInterruptibly
方法就会进入到doAcquireSharedInterruptibly
方法,就会将当前线程阻塞并放入等待队列中。当通行证数量足够时才会唤醒该线程。
当调用s.release(2)
时,会归还2个许可证,分析如下:
会调AQS的releaseShared
方法
让后调AQS的tryReleaseShared
方法,会被动态绑定到Semaphore的tryReleaseShared
方法,该方法将释放的通行证数加回到state
。
当调s.release()
时,分析如下:
会调Sync的releaseShared
方法
然后调AQS的releaseShared
方法
然后调AQS的tryReleaseShared
方法
Semaphore中的Sync重写了tryReleaseShared
方法,就会动态绑定到Semaphore中的tryReleaseShared
之后就会调AQS中的doReleaseShared
方法,跟前面调s.acquire()
方法时的类似,尝试唤醒等待队列中的线程。
AQS中关键代码阅读
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException
final AbstractQueuedSynchronizer.Node node = addWaiter(AbstractQueuedSynchronizer.Node.SHARED);
boolean failed = true;
try
for (;;)
final AbstractQueuedSynchronizer.Node p = node.predecessor();
if (p == head)
int r = tryAcquireShared(arg);
if (r >= 0)
/*
* 进入到这里,说明当前线程获取到了通行证,通行证有可能还有多的,
* 就需要看看能不能唤醒接下来的线程
*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
finally
if (failed)
cancelAcquire(node);
private void setHeadAndPropagate(AbstractQueuedSynchronizer.Node node, int propagate)
AbstractQueuedSynchronizer.Node h = head; // Record old head for check below
/*
* 在尝试唤醒下一个线程时,把等待队列的头结点设置为当前线程
* 即把当前线程从等待队列中去掉
*/
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0)
AbstractQueuedSynchronizer.Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
private void doReleaseShared()
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;)
AbstractQueuedSynchronizer.Node h = head;
if (h != null && h != tail)
int ws = h.waitStatus;
if (ws == AbstractQueuedSynchronizer.Node.SIGNAL)
if (!compareAndSetWaitStatus(h, AbstractQueuedSynchronizer.Node.SIGNAL, 0))
continue; // loop to recheck cases
/*
* 将下一个线程的状态由挂起态设置为非挂起,也就是唤醒了线程,能够再被调度获取到CPU
* 这样这个线程就会去尝试调acquire()方法获取通行证
* 由doAcquireSharedInterruptibly的代码可知,只有当获取到了通行证后才会重新设置等待队列的head
* 也就是只有当被唤醒的线程获取到了通信证,它才会被从等待队列中移除
* 否则,仍然在原来的位置,又被挂起了,只有等到它前面那个线程释放了通行证,才会再去尝试唤醒等待队列中的线程
* 且,每次唤醒的都是等待队列中的第一个,如果通行证还有剩余,则会一直传递下去尝试唤醒线程
*/
unparkSuccessor(h);
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, AbstractQueuedSynchronizer.Node.PROPAGATE))
continue; // loop on failed CAS
if (h == head) // loop if head changed
break;
4.3 ReentrantLock
ReentrantLock是独占锁。
新建一个ReentrantLock对象(需要注意ReentrantLock有公平锁和非公平锁之分,默认是非公平锁,公平锁需要再构造函数中传一个true
):
ReentrantLock lock = new ReentrantLock();
会在lock
内部创建一个NonfairSync
类的对象sync
(继承自Sync
),其调的是默认构造函数,也就是state
的初始值为0。
当我们调用lock.lock()
时,分析如下
首先调ReentrantLock中的lock
方法
让后调Sync
中的接口lock
当是非公平锁时,sync
是NonfairSync
类的实例,因此非公平锁调lock()函数也就会动态绑定到NonfairSync
中的lock
函数
当第一个线程到达时,通过CAS检查state
的值(即第一个线程会把state
的值设置为1,后面再到来的线程就无法进入了,要注意debug时state的值是不准确的,有缓存),设置该lock
对象被哪个线程占有。
当非独占该锁的线程进入后,接着调AQS中的acquire
函数
然后就会调AQS中的tryAcquire
函数,会被动态绑定到ReentrantLock中实现的tryAcquire
函数
接着调ReentrantLock的nonfairTryAcquire
,从nonfairTryAcquire函数可以看到,占有该锁的线程进入后会返回true,就不会执行后面的阻塞线程的操作了,并且当不是第一次进入时,每进入一次state的值都会被加1,就是为了实现同一个线程的 可重入
;非占有该锁的线程进入后直接返回false,接着执行后面的阻塞线程操作。
未获取到锁,就会进入到AQS的acquireQueued
函数。只有当当前线程获取到了锁才会退出这个死循环。
lock.unlock()
的执行分析:
接着调AQS的release
函数
接着调AQS的tryRelease
函数,然后动态绑定到ReentrantLock中的tryRelease
函数上
释放锁后当state的值变成0后就会唤醒等待队列中的线程,然后调AQS的unparkSuccessor
函数,看它们是否能够获取到刚刚被释放的锁。
公平锁与非公平锁
公平锁每次获取到锁为同步队列中的第一个节点,保证请求资源时间上的绝对顺序(强制保证每次都是等待队列中的第一个线程),而非公平锁有可能刚释放锁的线程下次继续获取该锁(刚释放锁的线程或者新来的线程与释放锁时被唤醒的等待队列中的第一个线程竞争锁,有可能被唤醒的线程没有竞争赢,而在公平锁中,刚释放锁的线程和新来的线程都要到等待队列后面去排队),则有可能导致其他线程永远无法获取到锁,造成“饥饿”现象。
5. AQS总结
从上面的3个同步工具类的分析可以看到,AQS是一个构建JAVA同步工具的框架。
控制并发的工具类,都需要修改同步变量以达到控制并发流程的目的,这些共同的功能被写到了AQS类中:共享锁的acquire() / release()
和 独占锁的 acquireShared() / releaseShared()
;然后分别调AQS中的 tryAcquire() / tryRelease()
和 tryAcquireShared() / tryReleaseShared()
,这些函数在AQS中都没有被具体实现。
各个工具类中都自己写了一个Sync
,公平锁和非公平锁还由Sync派生出了FairSync
和NonfairSync
,在这里面详细地写了 tryAcquire() / tryRelease()
和 tryAcquireShared() / tryReleaseShared()
的控制流程。
同时,等待队列的管理,线程的阻塞和唤醒,这些共同功能都被抽取出来写入到了AQS中。
6. 参考文献
[1] https://blog.csdn.net/weixin_42638946/article/details/120252441。
[2] https://www.jianshu.com/p/15f33406543b。
以上是关于JAVA并发AQS原理详解的主要内容,如果未能解决你的问题,请参考以下文章