JUC系列LOCK框架系列之五 核心锁类AbstractQueuedSynchonizer

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列LOCK框架系列之五 核心锁类AbstractQueuedSynchonizer相关的知识,希望对你有一定的参考价值。

初步认识AbstractQueuedSynchonizer

文章目录

源码采用JDK8

什么是AQS

AQS是用来构建锁或者其他同步组件的基础框架,它使用了一个int的变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

它的主要使用方式是继承,子类通过继承AQS并实现他的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行修改,AQS为此提供了三个方法(getState()setState(int newState)compareAndSetState(int expect, int update))来操进行作操作,它们可以保证修改是安全的。使用自定义同步组件的静态内部类来实现子类,同步器自身没有实现任何同步接口,它仅仅定义了一些关于同步状态获取和释放的方法来使用。AQS既支持抢占式的获取同步状态,也支持共享式的获取同步状态。

AQS的接口和示例

AQS的设计基于模板方法模式的,这就需要我们继承AQS并重写指定的方法,随后AQS组合在自定义的组件实现中,并调用同步器提供的模板方法,而这些模板方法会调用我们重写的方法。子类需要重写的三个方法:

  • getState() :获取当前同步状态
  • setState(int newState):设置当前同步状态
  • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性

AQS可以被重写的方法

方法名称描述
protected boolean tryAcquire(int arg)独占式获取同步状态,实现的时候需要查询当前状态并判断是否符合预期,再使用CAS设置同步状态
protected boolean tryRelease(int arg)独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
protected int tryAcquireShared(int arg)共享式获取同步状态,返回大于等于0的值,表示获取成功,否知失败
protected boolean tryReleaseShared(int arg)共享式释放同步状态
protected boolean isHeldExclusively()当前AQS是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占

AQS提供的模板方法基本分为3类

  1. 独占式获取与释放同步状态
  2. 共享式获取与释放同步状态
  3. 查询同步队列中的等待线程情况
方法名称描述
void acquire(int arg)独占式获取同步状态,如果当前线程获取同步状态成功则返回,否则进入同步队列等待,该发放会去调用重写的tryAcquire(int arg)
void acquireInterruptibly(int arg) throws InterruptedException与acquire(int arg)的差异是 这个方法响应中断,当前线程在等待队列中,如果被中断,那么该方法会抛出异常并返回
boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException在acquireInterruptibly(int arg)基础上增加了超时机制,在规定时间内取不到同步状态,也会以false返回。
boolean release(int arg)独占式释放同步状态,在释放同步状态后,会将等待队列的第一个节点对应的线程唤醒
void acquireShared(int arg)共享式获取同步状态,如果当前线程获取同步状态成功则返回,否则进入同步队列等待,该发放会去调用重写的tryAcquire(int arg),与独占式不同,可以有多个线程获取到同步状态。
void acquireSharedInterruptibly(int arg) throws InterruptedException与acquireShared(int arg)的差异是 这个方法响应中断,当前线程在等待队列中,如果被中断,那么该方法会抛出异常并返回
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException在acquireSharedInterruptibly(int arg)基础上增加了超时机制,在规定时间内取不到同步状态,也会以false返回。
boolean releaseShared(int arg)共享式释放同步状态,
Collection getQueuedThreads()获取等待队列中的线程集合

官网例子

在JDK1.8源码里面有提供了一个独占锁的实现例子Mutex自定义同步组件,它在同一时刻只能有一个线程能获取到锁。在Mutex中定义一个静态内部类Sync。

Sync继承了AQS,并使用getState() setState(int newState)compareAndSetState(int expect, int update)重写了isHeldExclusively() tryAcquire(int acquires)tryRelease(int releases)方法。

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Mutex implements Lock, java.io.Serializable 

    // 继承AQS的子类,静态内部类形式
    private static class Sync extends AbstractQueuedSynchronizer 
        // 是否是独占的
        @Override
        protected boolean isHeldExclusively() 
            return getState() == 1;
        

        // 当同步状态是0时,获取锁
        @Override
        public boolean tryAcquire(int acquires) 
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) 
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            
            return false;
        

        // 释放锁将 同步状态改为0
        @Override
        protected boolean tryRelease(int releases) 
            assert releases == 1; // Otherwise unused
            if (getState() == 0) 
                throw new IllegalMonitorStateException();
            
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        

        // 提供一个ConditionObject
        Condition newCondition() 
            return new ConditionObject();
        

        // Deserializes properly
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException 
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        
    

    // sync完成所有艰苦的工作。 我们是需要将操作代理到sync上。
    private final Sync sync = new Sync();

    @Override
    public void lock() 
        sync.acquire(1);
    

    @Override
    public void lockInterruptibly() throws InterruptedException 
        sync.acquireInterruptibly(1);
    

    @Override
    public boolean tryLock() 
        return sync.tryAcquire(1);
    

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    

    @Override
    public void unlock() 
        sync.release(1);
    
    
    @Override
    public Condition newCondition() 
        return sync.newCondition();
    


编写测试MutexTest类,会使用AQS的模板方法进行操作。通过lock()进行获取锁(内部调用了Sync的tryAcquire),unlock()(Sync的tryRelease)解锁。

public class MutexTest 

    public static void main(String[] args) 
        Mutex lock = new Mutex();
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) 
            executorService.execute(() -> 
                lock.lock();
                try 
                    System.out.println(Thread.currentThread() + " get lock");
                    Thread.sleep(1000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                 finally 
                    System.out.println(Thread.currentThread() + " release lock");
                    lock.unlock();
                
            );
        
        executorService.shutdown();
    

通过结果,可以看出一次只有一个线程获取到锁。

Thread[pool-1-thread-1,5,main] get lock. 17:21:23
Thread[pool-1-thread-1,5,main] release lock. 17:21:24
Thread[pool-1-thread-2,5,main] get lock. 17:21:24
Thread[pool-1-thread-2,5,main] release lock. 17:21:25
Thread[pool-1-thread-3,5,main] get lock. 17:21:25
Thread[pool-1-thread-3,5,main] release lock. 17:21:26
Thread[pool-1-thread-4,5,main] get lock. 17:21:26
Thread[pool-1-thread-4,5,main] release lock. 17:21:27
Thread[pool-1-thread-5,5,main] get lock. 17:21:27
Thread[pool-1-thread-5,5,main] release lock. 17:21:28

AQS实现分析

核心思想

如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制在AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。队列有同步队列(sync queue)和条件队列(condition queue)。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列,即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

我的理解就是没有用类似QUEUE那样的队列的实例,而是通过NODE中存放前后结点PreNode和NextNode形成一种双向链表似的关系

sync queue 同步队列

AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS操作对该同步状态进行原子操作实现对其值的修改。CAS操作主要借助sun.misc.Unsafed类来实现。

// 代表同步状态的变量
private volatile int state;

同步队列中的结点用来保存获取同步状态失败的线程引用等待状态以及前驱和后继结点结点的属性类型与名称, 源码如下

    static final class Node 
        // 模式,分为共享与独占
        // 共享模式
        static final Node SHARED = new Node();
        // 独占模式
        static final Node EXCLUSIVE = null;
        
        // 结点状态的值
        // CANCELLED,值为1,表示当前的线程被取消(由于等待超时或者中断)
        // SIGNAL,值为-1,表示当前结点的后继结点包含的线程需要唤醒(处于等待状态),也就是unpark。当前结点的线程释放同步状态或者取消了,将唤醒后继结点的线程
        // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中。
        // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。共享式同步状态获取的时候将会无条件传播下去
        // 值为0,初始值。表示当前节点在sync队列中,等待着获取锁
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

        // 结点状态
        volatile int waitStatus;

        /**
         * 前驱结点
         */
        volatile Node prev;

        /**
         * 后继结点
         */
        volatile Node next;

        /**
         * 结点所对应的线程
         */
        volatile Thread thread;

        /**
         *  下一个等待者 后继结点
         */
        Node nextWaiter;

        /**
         * 节点是否在共享模式下等待 当Returns true if node is waiting in shared mode.
         */
        final boolean isShared() 
            return nextWaiter == SHARED;
        

        /**
         *  获取前驱结点,若前驱结点为空,抛出异常
         */
        final Node predecessor() throws NullPointerException 
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        

        //无参构造方法
        Node()     // 用于建立初始头部或共享标记
        

        Node(Thread thread, Node mode)      // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        

        Node(Thread thread, int waitStatus)  // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        
    
同步队列的数据结构

在AQS中有两个结点类型的引用,head是指向头结点(状态值不会是1),tail是指向尾结点。

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
	private transient volatile Node head;
	/**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
	private transient volatile Node tail;

独占式同步状态获取与释放

以上述官网独占式获取同步状态为例,在使用sync.acquire(1);获取同步状态失败的时候,会执行addWaiter(Node.EXCLUSIVE), arg),先通过compareAndSetTail(pred, node)尝试快速填充队尾,如果填充失败或者当没有尾结点时,去调用enq(final Node node)进行队列初始化,通过compareAndSetHead(Node update)compareAndSetTail(Node expect, Node update)来设置AQS的head结点和tail结点。在完成addWaiter之后,继续执行acquireQueued(final Node node, int arg),如果当前结点的前驱结点是首结点,再次尝试获同步状态,若成功,将当前结点更新head结点,若失败,进行线程park,等待前驱结点释放锁唤醒当前线程,若期间当前线程中断,也会被唤醒。若发生异常情况, 会通过cancelAcquire(Node node) 取消继续获取(资源)。

锁获取的主要流程图如下:

独占锁的释放主要通过调用release(int arg)来释放锁。

源码部分

addWaiter(Node mode)

private Node addWaiter(Node mode) 
    // 为当前线程构建一个Node,独占模式
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 当队列尾结点不为空,快速填充队尾
    if (pred != null) 
        node.prev = pred;
        // 比较pred是否为尾结点,是则将尾结点设置为node 
        if (compareAndSetTail(pred, node)) 
            pred.next = node;
            return node;
        
    
    // 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列
    enq(node);
    return node;

enq(final Node node)

    private Node enq(final Node node) 
        // 无限循环 势必将结点加入队列中
        for (;;) 
            // 获取AQS当前尾结点
            Node t = tail;
            // 如果尾结点是null,则进行初始化,新建一个空Node同时作为head结点和tail结点
            if (t == null)  // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
             else 
                // 尾结点不为空,即已经被初始化过
                // 将当前尾结点作为node结点的前置结点
                node.prev = t;
                // 比较结点t是否为尾结点,若是则将尾结点设置为node
                if (compareAndSetTail(t, node)) 
                    t.next = node;
                    return t;
                
            
        
    

acquireQueued(final Node node, int arg)

    // 以独占不间断模式获取已在队列中的线程。
	final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            // 中断标志
            boolean interrupted = false;
            // 无限循环
            for (;;) 
                // 获取前置结点
                final Node p = node.predecessor();
                // 如果前置结点是head且当前线程成功获取到同步状态,将自身结点变为head结点,返回中断标记
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                
                // 当获取资源失败,更新结点状态并阻塞线程,返回其中断标识
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            // 取消加入队列失败的节点的资源获取
            if (failed)
                cancelAcquire(node);
        
    

shouldParkAfterFailedAcquire(Node pred, Node node)

    // 当获取(资源)失败后,检查并且更新结点状态--只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。
	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
        // 前置结点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 前置节点已经设置了使后置结点阻塞等待的信号,因此它可以安全地park。
             */
            return true;
        if (ws > 0) 
            /*
             * 前置结点已经取消了等待该锁,从前置结点向前遍历,找到未取消的节点,设置为当前节点的前置结点
             */
            do 
                node.prev = pred = pred.prev;
             while (pred.waitStatus > 0);
            pred.next = node;
         else 
            /*
             * waitStatus必须为0或PROPAGATE。我们需要信号,不是立即park。调用者将需要重试,以确保在park前。它不能获得同步状态。
             * 尝试将前驱结点的信号变为SIGNAL
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        
        return false;
    

parkAndCheckInterrupt()

    private final boolean parkAndCheckInterrupt() 
        // 将其线程阻塞--线程被唤醒后或中断后会在此后继续执行
        LockSupport.park(this);
        // 返回当前线程是否已被中断,并对中断标识位进行复位
        return Thread.interrupted();
    

cancelAcquire(Node node)

    // 取消继续获取(资源)
	private void cancelAcquire(Node node) 
        // 忽略结点已经不存在的情况
        if (node == null)
            return;
        
        // 清空node结点的thread
        node.thread = null;

        // Skip cancelled predecessors
        // 保存node的前驱结点,如果前驱节点已经是取消的状态,则一直向前遍历,取不是取消状态的结点作为当前结点的前驱结点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent no

以上是关于JUC系列LOCK框架系列之五 核心锁类AbstractQueuedSynchonizer的主要内容,如果未能解决你的问题,请参考以下文章

JUC系列LOCK框架系列之六 核心锁类之ReentrantLock

JUC系列LOCK框架系列之三 同步工具类 CountDownLatch

JUC系列LOCK框架系列之四 同步工具类Semaphore

JUC的LOCK框架系列一LOCK框架预览

JUC的LOCK框架系列二LOCK框架之LockSupport

Java并发编程系列之三JUC概述