理解AbstractQueuedSynchronizer提供的独占锁和共享锁语义

Posted 我是攻城师

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了理解AbstractQueuedSynchronizer提供的独占锁和共享锁语义相关的知识,希望对你有一定的参考价值。

前言

Doug Lea前辈在JDK5中编写的AbstractQueuedSynchronizer抽象同步框架非常精辟,整个代码里没有使用像synchronized这样调用底层硬件系统层面的锁指令来实现同步状态管理,完全是使用Java语言层面功能配合上轻量级的CAS自旋锁来构建的抽象同步器,总的来说AQS里面包含了二套api语义一种是独占锁,另一种是共享锁。这两套语义都是独立的,并不是说任何时候我们都需要同时使用这两种功能的。关于AQS的学习不建议一上去就关注AQS类源码本身,因为单看源码看不出来有任何精妙,反而容易让人迷惑,但是我们从其构建的工具类反看其如何使用AQS功能,结合具体案例则更容易理解。

AQS独占锁的申请和释放流程

这里以重入锁ReentrantLock独占加锁过程:

(1)reentrantLock.lock()

(2)sync.lock()

(3)acquire(1)

(4)!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

如果tryAcquire加锁成功,把state从0变成1,如果是重入则还会把state累加表示重入的次数,最终就会返回true,如果失败,先调用

addWaiter(Node.EXCLUSIVE)方法,这个方法的作用是:

将返回false的线程加入到AQS的阻塞队列里面,这里面先初始化一个Node节点,把当前线线程和锁模式(这里是独占)初始化进去,然后会判断tail末尾节点是不是null,如果不等于null,则直接将当前节点加入队列,并把其pred引用指向上一个末尾节点,同时把上一个末尾节点的next引用指向当前最后的节点,这是是双向链表结构。如果是null则进入初始化方法enq,这个方法的作用:

采用无限循环+CAS的模式,先初始化一个空的head节点,并把tail节点置为和head节点相等,这里使用的是原子字段更新方法AtomicReferenceFieldUpdater来赋值的,成功之后把node节点的prev指向刚才初始化的tail节点,然后把当前的node节点也通过原子引用更新器的方法赋值到tail节点,直到成功为止,返回当前的node节点,至此循环结束。

接着进入acquireQueued方法,这个方法的主要作用是用来挂起线程通过LockSupport的park方法,首先判断当前的节点是不是第一个锁的节点,里面会再次调用tryAcquire方法确认,如果是则覆盖原来的空的head节点,让自己变成真正意义上的head节点,然后返回,这样这个节点就可以继续执行其任务了,但如果这里判断失败则意味着自己并不是当前的队列的第一个线程,那么就需要判断前继节点的waitStats状态,如果是0(初始化状态)或者时-3(传播),那么则意味着当线的线程需要阻塞等待,所以需要将前继的waitStats状态更新为-1(信号通知),然后当前节点继续循环第二次,发现前继节点的状态是-1,那么就会调用LockSupport类的park方法,将当前线程挂起等待,直到第一个节点的任务处理完毕后,唤醒自己。

(5)至此申请锁完毕,如果得到锁则执行,失败则放入同步队列里面挂起,至于公平和非公平在于允不允许直接抢占锁(修改state)字段,如果允许就是不公平,注意不公平只有一次抢占机会,如果失败还得走排队流程,如果不允许就是公平,来了之后就走流程直接排队就行了。

独占解锁过程:

(1)调用unlock方法

(2)代理类调用sync.release(1);方法

(3)实现类调用tryRelease(1)方法,将state值减去1,如果成功则调用 unparkSuccessor方法,这个方法的作用是设置当前节点的waitStatus状态为0,接着获取next节点,如果next的节点的waitStatus状态被修该成了1,就意味着这个任务等不急已经取消了,那么则设置引用为null,方便gc,然后接着从尾节点向前遍历,找到这个取消节点之后的第一个需要通知唤醒的后继,源码如下:

 
   
   
 
  1.        Node s = node.next;

  2.        if (s == null || s.waitStatus > 0) {

  3.            s = null;

  4.            for (Node t = tail; t != null && t != node; t = t.prev)

  5.                if (t.waitStatus <= 0)

  6.                    s = t;//注意这里没有break,也就是说它找的一定是取消节点之后的第一个节点

  7.        }

  8.        if (s != null)

  9.            LockSupport.unpark(s.thread);//唤醒线程

至此释放锁成功。

AQS共享锁的申请和释放流程

这里以CountDownLatch的await分析:首先在构造函数里面我们需要传入一个阻塞的线程个数这里假设为3,在构造函数里面会设置AQS的state字段值为3。

(1)申请共享锁sync.acquireSharedInterruptibly(1)

(2)调用tryAcquireShared(arg) < 0判断是否有资格申请:

这个方法需要子类实现

 
   
   
 
  1.     protected int tryAcquireShared(int acquires) {

  2.            return (getState() == 0) ? 1 : -1;

  3.        }

很明显这里state=3,所以返回-1,故而获取成功

(3)接着调用doAcquireSharedInterruptibly方法

这个方法里面会先调用addWaiter(Node.SHARED)方法,该方法里面会先判断是不是有末尾节点,如果有直接添加,如果没有则需要初始化链表head,并将head.next指向自己。自己的prev指向head,同时将自己变成tail节点。接着调用 if (shouldParkAfterFailedAcquire(p, node) 方法,将head节点的waitSatus设置为-1,然后自旋一次进入阻塞。同时假如有多个调用await方法的线程,那么这些线程会依次排队等待。

 
   
   
 
  1.    private void doAcquireSharedInterruptibly(int arg)

  2.        throws InterruptedException {

  3.        final Node node = addWaiter(Node.SHARED);

  4.        boolean failed = true;

  5.        try {

  6.            for (;;) {

  7.                final Node p = node.predecessor();

  8.                if (p == head) {

  9.                    int r = tryAcquireShared(arg);

  10.                    if (r >= 0) {

  11.                        setHeadAndPropagate(node, r);

  12.                        p.next = null; // help GC

  13.                        failed = false;

  14.                        return;

  15.                    }

  16.                }

  17.                if (shouldParkAfterFailedAcquire(p, node) &&

  18.                    parkAndCheckInterrupt())

  19.                    throw new InterruptedException();

  20.            }

  21.        } finally {

  22.            if (failed)

  23.                cancelAcquire(node);

  24.        }

  25.    }

接着我们看下,如何释放锁?

在CountDownLatch里面释放锁是由线程执行完任务,调用countDown方法实现的,在该方法里面调用了代理类的解锁方法:

 
   
   
 
  1.    public void countDown() {

  2.        sync.releaseShared(1);

  3.    }

接着看这个方法:

 
   
   
 
  1.    public final boolean releaseShared(int arg) {

  2.        if (tryReleaseShared(arg)) {

  3.            doReleaseShared();

  4.            return true;

  5.        }

  6.        return false;

  7.    }

我们看到这里面调用了tryReleaseShared方法:

 
   
   
 
  1.        protected boolean tryReleaseShared(int releases) {

  2.            // Decrement count; signal when transition to zero

  3.            for (;;) {

  4.                int c = getState();

  5.                if (c == 0)

  6.                    return false;

  7.                int nextc = c-1;

  8.                if (compareAndSetState(c, nextc))

  9.                    return nextc == 0;

  10.            }

  11.        }

这个方法只有当state的等于0时,才会返回true也就是说只有最后一个线程调用了countDown方法,那么doReleaseShared方法才会被激活,这也是CountDownLatch的功能,接着在doReleaseShared方法里面,会将head节点的status状态设置为0,然后调用unparkSuccessor方法唤醒第一个在休眠的线程。

接着我们回到上面的第三步的方法:这个时候由于state=0,那么该方法就会进入

 
   
   
 
  1.        int r = tryAcquireShared(arg);

  2.                    if (r >= 0) {

  3.                        setHeadAndPropagate(node, r);

  4.                        p.next = null; // help GC

  5.                        failed = false;

  6.                        return;

  7.                    }

这个方法体,其中setHeadAndPropagate方法会将当前的节点替换为head节点,然后如果它的后继节点不为null,就继续调用doReleaseShared()

 
   
   
 
  1.    private void doReleaseShared() {

  2.        for (;;) {

  3.            Node h = head;

  4.            if (h != null && h != tail) {

  5.                int ws = h.waitStatus;

  6.                if (ws == Node.SIGNAL) {

  7.                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

  8.                        continue;            // loop to recheck cases

  9.                    unparkSuccessor(h);

  10.                }

  11.                else if (ws == 0 &&

  12.                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

  13.                    continue;                // loop on failed CAS

  14.            }

  15.            if (h == head)                   // loop if head changed

  16.                break;

  17.        }

  18.    }

这个方法里面又会唤醒,该线程的下一个节点,直到h==head代表都已经释放完毕,从而退出循环。

简单的来说共享锁的释放类似,排队的人,第一个告诉第二个你可以执行了,然后第二个完事,告诉第三个依次类推直到所有的共享锁得到释放。

总结

借用Java并发编程的艺术里面术语来说,锁是面向使用者的,而AQS则是面向实现者也或开发者,AQS抽象了锁的状态管理,同步队列的,等待与唤醒等功能,简化了锁的实现方式,从而很好的隔离了使用者和实现者所关注的重点。

参考文章:

https://yq.aliyun.com/articles/601071?spm=a2c4e.11153940.bloghomeflow.642.513b291a06OQbE

http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer

http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer


以上是关于理解AbstractQueuedSynchronizer提供的独占锁和共享锁语义的主要内容,如果未能解决你的问题,请参考以下文章

深入理解spring

如何理解FFT

深入理解Java内存模型

理解yarn平台,理解万岁,肤浅理解也万岁~

正确理解MYSQL的幻读

机器阅读理解综述