AQS源码解析-CLH

Posted heliner

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS源码解析-CLH相关的知识,希望对你有一定的参考价值。

AQS解析

一、简介

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState(), setState(int) and compareAndSetState(int, int) is tracked with respect to synchronization.

AQS使用一个先进先出的同步队列实现,为相关的锁和同步锁提供基本框架。它使用整数进行状态的表示以此来为同步器提供可用的基本骨架。子类通过重写AQS中受保护的方法进而实现锁的释放和获取,通过这种形式子类就能具有完整的入队和锁机制。

不得不提出的AQS中使用的先进先出的同步队列实现的理论来自于CLH,也就是说将AQS基本就是在讲CLH的实现方式

The wait queue is a variant of a "CLH" (Craig, Landin, and* Hagersten) lock queue. CLH locks are normally used for* spinlocks.

二、同步的状态和基本属性

static final class Node 
    /** 共享模式 */
    static final Node SHARED = new Node();
    /** 独占模式 */
    static final Node EXCLUSIVE = null;

    /** 标明当前线程已经被取消 */
    static final int CANCELLED =  1;
    /** 线程的下一个等待线程需要被唤醒 */
    static final int SIGNAL    = -1;
    /** 当前线程正在等待中 */
    static final int CONDITION = -2;
    /** 下一次的acquire方法应该被无条件的传播*/
    static final int PROPAGATE = -3;

    /** 当前等待状态*/
    volatile int waitStatus;

    /** 前驱节点*/
    volatile Node prev;

    /** 与上面类似         */
    volatile Node next;

    /** 当前node持有的线程,在构造器中初始化,在退出队列后被置为null*/
    volatile Thread thread;

    /** 指向当前节点的后面第一个处于CODITION状态的节点,或者为SHARED,只有对于独占式才会有CODTION节点的存         *在,对于共享式的其nextWaiter为SHARED(变量)
         */
    Node nextWaiter;

    /**
         * Returns true if node is waiting in shared mode.
         */
    final boolean isShared() 
        return nextWaiter == SHARED;
    

    /** 返回前驱节点,添加一层封装*/
    final Node predecessor() throws NullPointerException 
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    

    Node()     // 用于创建出事头结点和SHARED标志的构造器
    

    Node(Thread thread, Node mode)      // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    

    Node(Thread thread, int waitStatus)  // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    

同步的状态分为**5种,0为INITIAL,1为CANCELLED,-1为SINGAL,-2为CONDITION,-3为PROPAGETE**,这五种状态的作用和状态信息如下

  • SIGNAL:该节点的后继结点已经通过LockSupport.part()方法阻塞,当前节点在被释放或者被删除后需要唤醒它的后继节点,为避免线程之间的竞争,获取资源acquire的所有方法都应该设置SIGNAL标志,然后重新进行原子性的获取操作,如果获取失败,就阻塞
  • CANCELLED:节点因为超时或者中断进入CANCELLED状态,节点如果进入该状态就不会再转为其他状态,该状态的线程不会被阻塞
  • CONDITION:该节点处于等待队列中,他不会作为同步队列中的普通节点使用(也就是不会被前驱节点唤醒或unpark()),除非他的状态被设置为0INITIAL
  • PROPAGETE:共享锁的释放(releaseShared)应该被传递到其他节点。在doReleaseShared中用来保证头结点一定会继续传播信息
  • INITIAL:初始状态或者说是中间状态

这几类状态可以用更简单的被区分:如果>0就是CANCELLED<=0就是可以使用的状态

CLH同步队列,结构图如下

技术图片

  • prev为node的前驱节点,next为node的后驱节点

nextWaiter字段,保存的是同步状态的模式(Mode),tryAcquire(int)tryAcquireShared(int)方法通过独占方式或者共享方式进行状态获取,如果失败就调用addWaiter(Node mode)的方式进行入队。nextWaiter用于表示当前处于那种形式

  • SHARED 枚举共享模式,值为new Node(),这个值是唯一的,使用static进行修饰
  • EXCLUSIVE 枚举独占模式,值为null

#predecessor() 方法,获得 Node 节点的前一个 Node 节点。在方法的内部,Node p = prev 的本地拷贝,是为了避免并发情况下,prev 判断完 == null 时,恰好被修改,从而保证线程安全。

三、入队 addWaiter

3.1 基本步骤介绍

  1. 生成新的节点node
  2. 将新节点node的前驱指向原来的尾节点tail技术图片

    1. 通过UNSAFE设置尾节点tail为新的节点node技术图片

    2. 设置倒数第二个节点也就是原来的尾节点old tail的后驱节点next为新的尾节点node技术图片

  3. 如果失败通过enq再次进行重试
    1. 如果头结点为空,将新节点node设置到头结点
    2. 否者进行类似第2步的操作

使用图形化的形式来描述入队的问题

3.2 addWaiter()

深入到addWaiter()源码进行查看

private Node addWaiter(Node mode) 
        Node node = new Node(Thread.currentThread(), mode);
        // 先进行一次简单的入队尝试
        Node pred = tail;
        if (pred != null) 
            node.prev = pred;
            if (compareAndSetTail(pred, node)) 
                pred.next = node;
                return node;
            
        
        enq(node);
        return node;
    

3.3 enq(Node node)

enq方法部分方法和addWaiter中一致,主要不同是当考虑到节点未进行初始化时需要将当前节点设置为初始化节点head node

private Node enq(final Node node) 
    for (;;) 
        Node t = tail;
        if (t == null)  // 如果头部不存在就进行初始化
            if (compareAndSetHead(new Node()))
                tail = head;
         else 
            node.prev = t;//该步骤和3.2中的步骤类似
            if (compareAndSetTail(t, node)) 
                t.next = node;
                return t;
            
        
    

四、出队

CLH 同步队列遵循 FIFO,首节点的线程释放同步状态后,将会唤醒它的下一个节点(Node.next)。而后继节点将会在获取同步状态成功时,将自己设置为首节点( head )。

这个过程非常简单,head 执行该节点并断开原首节点的 next 和当前节点的 prev 即可。注意,在这个过程是不需要使用 CAS 来保证的,因为只有一个线程,能够成功获取到同步状态。

过程图如下:

技术图片

#setHead(Node node) 方法,实现上述的出列逻辑。代码如下:

private void setHead(Node node)    
    head = node;    node.thread = null;    node.prev = null;
                                

参考

以上是关于AQS源码解析-CLH的主要内容,如果未能解决你的问题,请参考以下文章

Java 并发之AbstractQueuedSynchronizer(AQS)源码解析

AQS源码剖析第一篇---全貌概览

源码解析之AQS源码解析

AQS(AbstractQueuedSynchronizer)源码深度解析—AQS的设计与总体结构

AQS源码解析-AtomicBoolean源码解析

AQS源码解析