AQS

Posted java_wxid

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS相关的知识,希望对你有一定的参考价值。

文章目录

什么是AQS?

AQS的全称是AbstractQueuedSynchronizer,也就是抽象队列同步器,它是在java.util.concurrent.locks包下的,也就是JUC并发包。java提供了synchronized关键字内置锁,还提供了显示锁,而大部分的显示锁的底层都用到了AQS,比如只有一个线程能执行ReentrantLock独占锁,又比如多个线程可以同时执行共享锁Semaphore、CountDownLatch、ReadWriteLock、CyclicBarrier。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。

在锁的实现中,聚合同步器,就是利用同步器实现锁的语义。锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者, 它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。

在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、 setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。

同步器自身没有实现任何同步接口,它仅仅是定义了同步状态获取和释放的方法,提供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态。

AQS 是基于什么设计模式实现的?

AQS使用模板方法模式,使用者继承AbstractQueuedSynchronizer并重写指定的方法,重写的方法就是对于共享资源state的获取和释放,将AQS在自定义同步组件的实现中,调用它的模板方法,这些模板方法会调用使用者重写的方法,这是模板方法模式很经典的一个运用。

AQS 底层同步队列的原理

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取 同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点有五个属性:

  • 等待状态
    值=1,在同步队列中等待的线程,等待超时或者被中断,需要从同步队列中取消等待,节点进入这个状态就不会发生变化
    值=-1,后续节点的线程处于等待状态,当前节点的线程如果释放了同步状态,或者被取消,就会通知后续节点,让后续节点的线程可以运行
    值=-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了singnal方法后,这个节点就会从等待队列中转移到同步队列中,加入到对同步状态的获取中
    值=-3,下一次共享式同步状态获取就会无条件的传递下去
    值=0,表示初始状态
  • 前驱节点:当前节点加入到同步队列时进行设置(尾部添加)
  • 后续节点
  • 等待队列中的后续节点:如果当前节点是共享的,那么这个字段就是一个shared常量,也就是节点类型(独占和共享)和等待队列中的后续节点共用同一个字段
  • 获取同步状态的线程

同步器拥有首节点(head) 和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部。
同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。 试想一下,当一个线程成功的获取了同步状态(或者锁),其他线程就无法获取到同步状态,转而被构造成为节点并加入到同步队列中, 而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法: compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态 时,将会唤醒后继节点, 而后继节点将会在获取同步状态成功时将自己设置为首节点。

设置首节点是通过获取同步状态成功的线程来完成的,因为只有一个线程能够成功获取到同步状态,所以设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点,并且断开原首节点的next引用就可以了。

AQS独占锁举例

拿ReentrantLock加锁举例,线程调用ReentrantLock的lock()方法进行加锁,这个加锁的过程,用CAS将state值从0变为1。一旦线程加锁成功了之后,就可以设置当前加锁线程是自己。ReentrantLock通过多次执行lock()加锁和unlock()释放锁,对一个锁加多次,从而实现可重入锁,每次线程可重入加锁一次,判断一下当前加锁线程是不是自己,如果是他自己就可以可重入多次加锁,每次加锁,就是把state的值给累加1。

当state=1时代表当前对象锁已经被占用,其他线程来加锁时则会失败,然后再去看加锁线程的变量里面是不是自己之前占用过这把锁,如果不是就说明有其他线程占用了这个锁,失败的线程被放入一个等待队列中,在等待唤醒的时候,经常会使用自旋(while(!cas()))的方式,不停地尝试获取锁,等待已经获得锁的线程,释放锁才能被唤醒。

当它释放锁的时候,将AQS内的state变量的值减1,如果state值为0,就彻底释放锁,会将“加锁线程”变量设置为null。这个时候,会从等待队列的队头唤醒其他线程重新尝试加锁,获得锁成功之后,会把“加锁线程”设置为线程自己,同时线程自己就从等待队列中出队。这个就是aqs实现自旋锁,可重入锁,独占锁的底层实现。


AQS独占锁实现原理

public final void acquice(int arg)
	//同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待
	if(!tryAcquirce(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
		selfInterrupt();
	

上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:
首先调用自定义同步器实现的tryAcquire(int arg)方法,保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node) 方法将该节点加入到同步队列的尾部, 最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。

独占式同步状态获取流程,也就是acquire(int arg)方法调用流程:

AQS独占式超时获取锁和可中断获取锁

在Java 5之前,当一个线程获取不到锁而被阻塞在synchronized之外时,对该线程进行中断操作,此时这个线程的中断标志位会被修改,但线程依旧会阻塞在synchronized上,等待着获取锁。在Java 5中,同步器提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出InterruptedException。

超时获取同步状态过程,可以被当作响应中断,是获取同步状态过程的“增强版”, doAcquireNanos(int arg,long nanosTimeout)方法在支持响应中断的基础上,增加了超时获取的特性。针对超时获取,主要需要计算出需要睡眠的时间间隔nanosTimeout,为了防止过早通知, nanosTimeout计算公式为: nanosTimeout-=now-lastTime,其中now为当前唤醒时间,lastTime为上次唤醒时间,如果 nanosTimeout大于0则表示超时时间未到,需要继续睡眠nanosTimeout纳秒, 否则,表示已经超时。

/**
     * Acquires in exclusive timed mode.
     *
     * @param arg the acquire argument
     * @param nanosTimeout max wait time
     * @return @code true if acquired
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException 
        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try 
            for (;;) 
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                
                if (nanosTimeout <= 0)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
				//计算时间,当前时间now减去睡眠之前的时间lastTime得到已经休眠的时间delta
				//然后被原有的超时时间nanosTimeout减去,得到了还应该休眠的时间
                nanosTimeout -= now - lastTime;
                lastTime = now;
                if (Thread.interrupted())
                    throw new InterruptedException();
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    

如果nanosTimeout小于等于 spinForTimeoutThreshold(1000纳秒)时, 将不会使该线程进行超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待,无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。因此,在超时非常短的场景下,同步器会进入无条件的快速自旋。

AQS共享锁举例

拿CountDownLatch举例,任务分为5个子线程去执行,state也初始化为5。这5个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1,等到所有子线程都执行完后,state=0,会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。


AQS共享锁实现原理

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状 态。以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。 通过调用同步器的 acquireShared(int arg) 方法可以共享式地获取同步状态,在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态。 因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是 tryAcquireShared(int arg)方法返回值大于等于0。 在doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示 该次获取同步状态成功 并从自旋过程中退出。

public final boolean releaseShared(int arg)
	if(tryReleaseShared(arg))
		doReleaseShared();
		return true;
	
	return false;

该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件,它和独占式主要区别在于 tryReleaseShared(int arg) 方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作可能会同时来自多个线程。

以上是关于AQS的主要内容,如果未能解决你的问题,请参考以下文章

《并发系列一》AbstractQueuedSynchronizer(AQS)- 互斥锁源码剖析

AbstractQueuedSynchronizer中CAS的疑惑

AQS 详解

深入java并发包源码AQS的介绍与使用

AQS学习

Java AQS学习