理解AbstractQueuedSynchronizer提供的独占锁和共享锁语义
Posted 我是攻城师
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了理解AbstractQueuedSynchronizer提供的独占锁和共享锁语义相关的知识,希望对你有一定的参考价值。
前言
Doug Lea前辈在JDK5中编写的AbstractQueuedSynchronizer抽象同步框架非常精辟,整个代码里没有使用像synchronized这样调用底层硬件系统层面的锁指令来实现同步状态管理,完全是使用Java语言层面功能配合上轻量级的CAS自旋锁来构建的抽象同步器,总的来说AQS里面包含了二套api语义一种是独占锁,另一种是共享锁。这两套语义都是独立的,并不是说任何时候我们都需要同时使用这两种功能的。关于AQS的学习不建议一上去就关注AQS类源码本身,因为单看源码看不出来有任何精妙,反而容易让人迷惑,但是我们从其构建的工具类反看其如何使用AQS功能,结合具体案例则更容易理解。
AQS独占锁的申请和释放流程
这里以重入锁ReentrantLock独占加锁过程:
(1)reentrantLock.lock()
(2)sync.lock()
(3)acquire(1)
(4)!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
如果tryAcquire加锁成功,把state从0变成1,如果是重入则还会把state累加表示重入的次数,最终就会返回true,如果失败,先调用
addWaiter(Node.EXCLUSIVE)方法,这个方法的作用是:
将返回false的线程加入到AQS的阻塞队列里面,这里面先初始化一个Node节点,把当前线线程和锁模式(这里是独占)初始化进去,然后会判断tail末尾节点是不是null,如果不等于null,则直接将当前节点加入队列,并把其pred引用指向上一个末尾节点,同时把上一个末尾节点的next引用指向当前最后的节点,这是是双向链表结构。如果是null则进入初始化方法enq,这个方法的作用:
采用无限循环+CAS的模式,先初始化一个空的head节点,并把tail节点置为和head节点相等,这里使用的是原子字段更新方法AtomicReferenceFieldUpdater来赋值的,成功之后把node节点的prev指向刚才初始化的tail节点,然后把当前的node节点也通过原子引用更新器的方法赋值到tail节点,直到成功为止,返回当前的node节点,至此循环结束。
接着进入acquireQueued方法,这个方法的主要作用是用来挂起线程通过LockSupport的park方法,首先判断当前的节点是不是第一个锁的节点,里面会再次调用tryAcquire方法确认,如果是则覆盖原来的空的head节点,让自己变成真正意义上的head节点,然后返回,这样这个节点就可以继续执行其任务了,但如果这里判断失败则意味着自己并不是当前的队列的第一个线程,那么就需要判断前继节点的waitStats状态,如果是0(初始化状态)或者时-3(传播),那么则意味着当线的线程需要阻塞等待,所以需要将前继的waitStats状态更新为-1(信号通知),然后当前节点继续循环第二次,发现前继节点的状态是-1,那么就会调用LockSupport类的park方法,将当前线程挂起等待,直到第一个节点的任务处理完毕后,唤醒自己。
(5)至此申请锁完毕,如果得到锁则执行,失败则放入同步队列里面挂起,至于公平和非公平在于允不允许直接抢占锁(修改state)字段,如果允许就是不公平,注意不公平只有一次抢占机会,如果失败还得走排队流程,如果不允许就是公平,来了之后就走流程直接排队就行了。
独占解锁过程:
(1)调用unlock方法
(2)代理类调用sync.release(1);方法
(3)实现类调用tryRelease(1)方法,将state值减去1,如果成功则调用 unparkSuccessor方法,这个方法的作用是设置当前节点的waitStatus状态为0,接着获取next节点,如果next的节点的waitStatus状态被修该成了1,就意味着这个任务等不急已经取消了,那么则设置引用为null,方便gc,然后接着从尾节点向前遍历,找到这个取消节点之后的第一个需要通知唤醒的后继,源码如下:
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;//注意这里没有break,也就是说它找的一定是取消节点之后的第一个节点
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒线程
至此释放锁成功。
AQS共享锁的申请和释放流程
这里以CountDownLatch的await分析:首先在构造函数里面我们需要传入一个阻塞的线程个数这里假设为3,在构造函数里面会设置AQS的state字段值为3。
(1)申请共享锁sync.acquireSharedInterruptibly(1)
(2)调用tryAcquireShared(arg) < 0判断是否有资格申请:
这个方法需要子类实现
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
很明显这里state=3,所以返回-1,故而获取成功
(3)接着调用doAcquireSharedInterruptibly方法
这个方法里面会先调用addWaiter(Node.SHARED)方法,该方法里面会先判断是不是有末尾节点,如果有直接添加,如果没有则需要初始化链表head,并将head.next指向自己。自己的prev指向head,同时将自己变成tail节点。接着调用 if (shouldParkAfterFailedAcquire(p, node) 方法,将head节点的waitSatus设置为-1,然后自旋一次进入阻塞。同时假如有多个调用await方法的线程,那么这些线程会依次排队等待。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
接着我们看下,如何释放锁?
在CountDownLatch里面释放锁是由线程执行完任务,调用countDown方法实现的,在该方法里面调用了代理类的解锁方法:
public void countDown() {
sync.releaseShared(1);
}
接着看这个方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
我们看到这里面调用了tryReleaseShared方法:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
这个方法只有当state的等于0时,才会返回true也就是说只有最后一个线程调用了countDown方法,那么doReleaseShared方法才会被激活,这也是CountDownLatch的功能,接着在doReleaseShared方法里面,会将head节点的status状态设置为0,然后调用unparkSuccessor方法唤醒第一个在休眠的线程。
接着我们回到上面的第三步的方法:这个时候由于state=0,那么该方法就会进入
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
这个方法体,其中setHeadAndPropagate方法会将当前的节点替换为head节点,然后如果它的后继节点不为null,就继续调用doReleaseShared()
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
这个方法里面又会唤醒,该线程的下一个节点,直到h==head代表都已经释放完毕,从而退出循环。
简单的来说共享锁的释放类似,排队的人,第一个告诉第二个你可以执行了,然后第二个完事,告诉第三个依次类推直到所有的共享锁得到释放。
总结
借用Java并发编程的艺术里面术语来说,锁是面向使用者的,而AQS则是面向实现者也或开发者,AQS抽象了锁的状态管理,同步队列的,等待与唤醒等功能,简化了锁的实现方式,从而很好的隔离了使用者和实现者所关注的重点。
参考文章:
https://yq.aliyun.com/articles/601071?spm=a2c4e.11153940.bloghomeflow.642.513b291a06OQbE
http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer
http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer
以上是关于理解AbstractQueuedSynchronizer提供的独占锁和共享锁语义的主要内容,如果未能解决你的问题,请参考以下文章