Java AQS源码阅读

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java AQS源码阅读相关的知识,希望对你有一定的参考价值。

AQS源码详解

源码分析维度:同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等同步器的核心数据结构与模板方法。

同步队列介绍

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构建成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点(Node)是用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点,节点的属性类型与名称以及描述见下图:

技术分享图片

同步队列的基本结构如下图所示:

技术分享图片

同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect , Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

同步器将节点加入到同步队列的过程如图5-2所示:

技术分享图片

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后续节点将会在获取同步状态成功时将自己设置为首节点,该过程如下图:

技术分享图片

设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可。

源码阅读

队列构建:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { //初始化阶段
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

首先,我们从addWaiter方法开始看起,此方法是通过自旋的形式来实现无锁情况下并发处理问题。假设初始化时,有多个线程进入此方法,此时head和tail都尚未初始化,都为null,则第一个线程一定会进入到enq(node)方法里。在进入到enq(node)方法后,如果此时head和tail节点依旧尚未初始化,则会进入初始化head结点和tail节点的阶段,即t == null的if块里。此代码段是通过CAS的形式将head的值赋予new Node()返回的引用地址,再执行tail=head ,则此时同步队列的情况如下图:
技术分享图片

第一个线程进入到enq(final Node node)方法的线程执行完初始化操作后,后续的线程或者第一个线程的后续循环操作都会进入到enq方法的 else 代码段里。

  1. 执行完Node t = tail; node.prev = t FIFO队列的情况如下图
    技术分享图片
  2. 执行完compareAndSetTail(t, node)后 FIFO队列的情况如下图:
    技术分享图片
  3. 执行完t.next = node;后 FIFO队列的情况如下图:
    技术分享图片
  4. 如果是还有另一个线程进入到enq(final Node node)方法中时 FIFO队列的情况如下图:
    技术分享图片

同步队列中 节点唤醒逻辑

//1. 独占式
public final boolean release(int arg) {
    if (tryRelease(arg)) {  //tryRelease(arg) 调用子类的实现,让子类决定是否释放锁
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//2. 共享式
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { //调用子类的实现,让子类决定是否释放锁
        doReleaseShared(); 
        return true;
    }
    return false;
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) { // 同步队列至少有两个节点(头节点和业务节点)
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { // 如果当前头节点状态为SIGNAL(暗示后续节点需要唤醒)
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h); // 唤醒头结点后第一个需要唤醒的节点
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}


// 唤醒当前节点(Node node)的后继节点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)  
        /**
        * 1. 如果当前节点还处于有效的等待状态,则将其状态还原为初始化状态。
        * 2. 为啥要还原? 你都等到了,还等呀?肯定是从等待状态还原回去!
        * 3. 这里不care返回值,说明是允许失败的。
        * 4. 此段逻辑只有在cancelAcquire方法调用时才用到,其他入口传入的都是head节点,
        *    head节点的状态到这里都是0了,不信可以看看方法调用方。
        */
        compareAndSetWaitStatus(node, ws, 0);

    /**
    * 1. 此块逻辑的目的:找到第一个还处于有效等待状态的节点,然后将它唤醒。
    * 2. 先取头结点的下一个节点,如果该节点为null或者已经取消,则从尾节点开始向前遍历找到需要唤 
    *    醒的节点。
    */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) { 
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread); // 调用native的方法 执行唤醒线程的操作。
}

在AQS类中,tryRelease(int arg)tryReleaseShared((int arg))方法体中,都只有一行代码:throw new UnsupportedOperationException(); ,从这里,我们可以看出AQS其实是把何时释放锁的决定权交给了AQS具体的实现类来决定了,AQS在实现同步管理方面,更多的是扮演了工具类的角色。由于此处,我们关注的是同步队列争抢锁的操作,故对于其他具体细节暂时不关注。

从上面的代码中,我们可以看到在独占式和共享式两种情况下,释放锁之后都调用unparkSuccessor(h);方法都是传入的head节点,而unparkSuccessor方法的作用是释放当前节点的第一个有效后继节点,从这个方面也可以看出同步队列遵循了”先进先出“的原则。

同步队列的释放过程

// 以此举例说明
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 同步队列 出队列和唤醒逻辑
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
  
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}


 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
}
    
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

同步队列的出队列操作入口其实并不止doAcquireSharedInterruptibly这一种,此处只以它举例,其他类似。当线程在获取共享式的同步状态失败时,会进入到doAcquireSharedInterruptibly方法内,当前获取同步状态失败的线程会被封装成一个node节点,然后检测自己的prev指向的是不是头节点,如果是头结点,则会调用setHeadAndPropagate方法,否则会进入到shouldParkAfterFailedAcquireparkAndCheckInterrupt两个方法内。

shouldParkAfterFailedAcquire方法首先会判断当前节点的前置节点是否是有效的等待状态;如果waitStatus> 0,即处于已取消状态,则需要将该前置节点移除队列。由于是do while 该方法会一直往前找,直到找到最近一个正常等待的节点,并排在它后面。

为什么一定要找个处于正常等待状态的节点才排在它后面?

因为从上面分析的unparkSuccessor方法,我们可以知道队列释放唤醒操作,是从head节点开始,释放的永远当前节点的后继节点,这样如果前置节点处于取消状态,后继节点永远不能被唤醒!!! 跟对老大很重要!!!

看到这里有小伙伴可能会问:那要是跟完老大,老大才被KO掉咋办??

以上是关于Java AQS源码阅读的主要内容,如果未能解决你的问题,请参考以下文章

jdk1.8 J.U.C并发源码阅读------AQS之conditionObject内部类分析

jdk1.8 J.U.C并发源码阅读------AQS之conditionObject内部类分析

AQS源码阅读笔记

AQS源码解析

Java多线程——ReentrantReadWriteLock源码阅读

深入java并发包源码简介