源码剖析:AQS-AbstractQueuedSynchronizer

Posted talk.push

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码剖析:AQS-AbstractQueuedSynchronizer相关的知识,希望对你有一定的参考价值。

文章目录


有的时候在一段代码上花费大量时间,在一段代码上反复花时间,都是值得的。本文的jdk版本是1.8.
本文将演示AQS如何在多线程场景下进行入队和出队操作。

从一段代码说起

这段代码中通过ReentranLock来演示在多线程环境下,ReentranLock是如何通过底层的AQS对象来实现同步的。

package com.jeff.study.concurrent.aqs;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Description AQS的抢锁和入队过程
 * @Date 2021/1/31 7:36 下午
 * @Author jeff.sheng
 * @see AbstractQueuedSynchronizer
 * @see ReentrantLock
 */
public class AQSDemo 

    public static void main(String[] args) throws InterruptedException 
        ReentrantLock lock = new ReentrantLock();
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        Runnable runnable = () -> 
            System.out.println("当前线程: " + Thread.currentThread().getName() + " 请输入:");
            lock.lock();
            try 
                System.in.read();
             catch (IOException e) 
                e.printStackTrace();
             finally 
                lock.unlock();
            
        ;
        executorService.execute(runnable);
        executorService.execute(runnable);
        executorService.awaitTermination(200, TimeUnit.SECONDS);
        executorService.shutdown();
    


AbstractQueuedSynchronizer

AbstractQueuedSynchronizer,抽象队列同步器。包含了state、head和tail两个Node类型的变量。
Node是AbstractQueuedSynchronizer的内部类,提供属性如下:

  • prev:类型为Node,表示当前节点的前一个节点。
  • next:类型为Node,表示当前节点的后一个节点。
  • waitStatus:表示当前节点线程的状态。比如CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3).
  • thread:表示当前AQS队列中当前节点的线程对象。
  • nextWaiter:AQS队列中下一个等待节点。

ReentranLock

ReentranLock跟AbstractQueuedSynchronizer的关系

ReentranLock类内部定义了一个Sync类型的变量,而Sync则是ReentranLock的一个抽象的静态内部类,继承了AbstractQueuedSynchronizer抽象类。

 abstract static class Sync extends AbstractQueuedSynchronizer
 ...
 

而AbstractQueuedSynchronizer又继承了AbstractOwnableSynchronizer类,这个类需要重点看下,因为里边有一个AQS的独占线程对象方法setExclusiveOwnerThread

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements Serializable 
.....



package java.util.concurrent.locks;

import java.io.Serializable;

public abstract class AbstractOwnableSynchronizer implements Serializable 
    private static final long serialVersionUID = 3737899427754241961L;
    private transient Thread exclusiveOwnerThread;

    protected AbstractOwnableSynchronizer() 
    
    // 设置AQS的独占线程对象
    protected final void setExclusiveOwnerThread(Thread thread) 
        this.exclusiveOwnerThread = thread;
    

    protected final Thread getExclusiveOwnerThread() 
        return this.exclusiveOwnerThread;
    


Sync则继续提供了两种实现,也就是FairSync和NonfailSync.也就是我们常说的公平锁和非公平锁。ReentranLock则提供了非公平Sync的默认构造器,且提供了可选的构造器。

/**
     * Creates an instance of @code ReentrantLock.
     * This is equivalent to using @code ReentrantLock(false).
     */
    public ReentrantLock() 
        sync = new NonfairSync();
    

    /**
     * Creates an instance of @code ReentrantLock with the
     * given fairness policy.
     *
     * @param fair @code true if this lock should use a fair ordering policy
     */
    public ReentrantLock(boolean fair) 
        sync = fair ? new FairSync() : new NonfairSync();
    

NonfairSync

ReentranLock的内部非公平实现NonfairSync代码如下:

 /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync 
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() 
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        

        protected final boolean tryAcquire(int acquires) 
            return nonfairTryAcquire(acquires);
        
    

lock方法

我们先看下lock方法:

final void lock() 
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);

当线程T1进入时,通过CAS操作修改state变量的状态.

  • 如果此时没有其他线程一起竞争,则返回true。然后设置当前独占锁的线程对象为当前线程(Thread.currentThread()).
  • 如果有线程T2一起竞争,则进入acquire(1)方法。

acquire方法

acquire方法由NonfairSync的父类AbstractQueuedSynchronizer提供。

 public final void acquire(int arg) 
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    

我们先看tryAcquire方法,这个方法由NonfairSync实现。

 protected final boolean tryAcquire(int acquires) 
            return nonfairTryAcquire(acquires);
 

nonfairTryAcquire方法由ReentranLock提供。

   final boolean nonfairTryAcquire(int acquires) 
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) 
                if (compareAndSetState(0, acquires)) 
                    setExclusiveOwnerThread(current);
                    return true;
                
            
            else if (current == getExclusiveOwnerThread()) 
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            
            return false;
        

当线程T1和T2都过来时,先获取AQS的state值,state值是一个volitale变量。

  • 如果T1和T2都发现state值等于0则进行CAS操作设置state的值为1,但最终只有一个线程设置成功,并设置独占锁的线程为当前线程。假如T1抢占锁成功则返回true。整个流程结束。那么T2则返回false,抢锁失败!
  • 如果T1先进来抢占锁成功了,那么state值就会通过CAS操作设置为1.由于volitale变量的可见性,T2进来时发现state不是0,而且发现当前抢锁成功的线程是T1,就直接返回false了,抢锁失败!
  • 如果T1先抢锁成功且未释放锁,然后又再次进入此方法,那么发现自己就是拥有锁的线程,则给state值加1,锁重入。

以上我们说明了T1线程运气好抢锁成功的场景,而T2线程运气差总是抢锁失败。那么T2抢占锁失败之后返回了false,会如何呢?可以从tryAcquire方法看到,抢锁失败之后,会进一步执行下边的判断:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

我们来看下addWaiter方法。

addWaiter方法

  Node(Thread thread, Node mode)      // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
 
/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) 
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) 
            node.prev = pred;
            if (compareAndSetTail(pred, node)) 
                pred.next = node;
                return node;
            
        
        enq(node);
        return node;
    

addWaiter方法传进来的参数是Node.EXCLUSIVE,看一下Node节点中对它的解释:

 static final class Node 
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** 标记Node节点是独占模式等待状态 */
        static final Node EXCLUSIVE = null;
        ..........

T2虽然抢锁失败了,但是T2将会以独占模式节点的状态继续等待。
T2进来addWaiter方法时,会包装为一个Node节点。假设T2进入时,此时AQS的tail节点尚未初始化。也就是说AQS对象的tail和head两个头尾Node节点都是NULL。
那么就会进入enq方法先初始化:

  /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) 
        for (;;) 
            Node t = tail;
            if (t == null)  // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
             else 
                node.prev = t;
                if (compareAndSetTail(t, node)) 
                    t.next = node;
                    return t;
                
            
        
    

可以看到tail赋值给t,t此时为null’就进行头结点的初始化了,然后把初始化的head节点引用给了tail。

继续for循环,tail就不为空了,tail赋值给t也不为空,就是初始化的那个节点。将其赋值给T2独占等待状态节点的prev节点。

然后将t,也就是尾结点TAIL更新为T2独占等待节点。

这样以来T2独占等待节点就成了AQS队列中的一个尾节点。接下来最重要的获取AQS队列节点的操作来了。

acquireQueued方法

 /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return @code true if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    

方法传递进来的参数就是T2独占等待节点,根据前边的描述,它的前一个节点就是哨兵节点也就是head节点,此时T2虽然抢锁失败而进入队列,但是它并不死心,它觉得此时T1可能已经释放锁了,所以想再次tryAcquire一次,如果真的如T2所愿,T1执行结束真的unlock的话,就会执行以下代码并返回interrupted值为false。

 if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
 
 /**
 */将T2独占等待节点指针设置给head节点,也就是T2节点将从队列中出队。设置thread和prev都为null。
 */
 private void setHead(Node node) 
        head = node;
        node.thread = null;
        node.prev = null;
    

但如果是T2想的美呢?也就是T1根本就没结束还在持有锁,那么tryAcquire获取的state值就不是0而且持有锁的线程还是T1.那么不好意思,T2线程Acquire失败了,此时将尝试park住T2线程。

  /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return @code true if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) 
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do 
                node.prev = pred = pred.prev;
             while (pred.waitStatus > 0);
            pred.next = node;
         else 
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        
        return false;
    

通过debug来走读下这段代码吧!
T2进来之后,先判断pred节点,也就是T2独占等待节点的prev节点的waitStatus状态,可以看到值为0,回顾下开始时提到的waitStatus:

waitStatus:表示当前节点线程的状态。比如CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3).

所以都不满足,于是就使用CAS更新prev节点waitStatus的状态(更新为SIGNAL=-1):

 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);


然后继续执行acquireQueued方法中的下一次for循环,仍然进入了shouldParkAfterFailedAcquire方法中,但这一次由于waitStatus=-1满足第一个if条件,于是返回true。再继续判断parkAndCheckInterrupt方法是否满足条件:

 private final boolean parkAndCheckInterrupt() 
        LockSupport.park(this);
        return Thread.interrupted();
    

当T2线程进入调用LockSupport.park方法时,T2线程阻塞在这里了。此时此刻,T1线程阻塞在:

System.in.read()

T2线程阻塞在LockSupport.park方法。通过调用jstack方法可以观察到此时的线程stack信息:

而T1线程则是RUNABLE状态:

当在控制台输入一行字符串时,T2线程返回了。因为T1线程此时unlock了。


然后T2线程调用Thread.interruped方法检测线程T2是否被中断,本次返回了false。(
这里如果检测到T2被中断则清除中断标识返回true,如果返回了true则会调用Thread.currentThread().interrupt();重新还原T2的中断标识。
但是由于此时T1线程已经unlock,AQS的state变量值变为0了。T2线程成功的获取到了锁,返回true。

unlock方法

执行完业务逻辑之后,在finally中通常会调用比如ReentranLock的unlock方法,这里我们就看下ReentranLock的unlock方法。

/**
     * Attempts to release this lock.
     *
     * <p>If the current thread is the holder of this lock then the hold
     * count is decremented.  If the hold count is now zero then the lock
     * is released.  If the current thread is not the holder of this
     * lock then @link IllegalMonitorStateException is thrown.
     *
     * @throws IllegalMonitorStateException if the current thread does not
     *         hold this lock
     */
    public void unlock() 
        sync.release(1);
    

同样是调用了AQS的release方法,其中可以看到tryRelease这个方法,这个方法是ReentranLock提供的。

/**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if @link #tryRelease returns true.
     * This method can be used to implement method @link Lock#unlock.
     *
     * @param arg the release argument.  This value is conveyed to
     *        @link #tryRelease but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from @link #tryRelease
     */
    public final boolean release(int arg) 
        if (tryRelease(arg)) 
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        
        return false;
    

确切的说,实现了AQS的对象很多都提供了这个方法。而AQS抽象类中提供了一个抛出异常的实现:

   protected boolean tryRelease(int arg) 
        throw new UnsupportedOperationException();
    

//ReentranLock提供
 protected final boolean tryRelease(int releases) 
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) 
                free = true;
                setExclusiveOwnerThread(null);
            
            setState(c);
            return free;
        

这里的逻辑很明显了,就是要释放state和独占线程这两个资源给其他线程用了。回到AQS的release方法,如果tryRelease执行成功,则会对AQS队列中下一个Node进行unparkSuccessor操作。AQS提供了实现:

 /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) 
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) 
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        
        if (s != null)
            LockSupport.unpark(s.thread);
    

可以看到,在找到了后继者之后唤醒了这个节点的线程:

LockSupport.unpark(s.thread);

公平锁和非公平锁的区别

代码一贴,疑惑全解。


    /**

以上是关于源码剖析:AQS-AbstractQueuedSynchronizer的主要内容,如果未能解决你的问题,请参考以下文章

java集合框架源码剖析系列java源码剖析之TreeMap

Mybatis源码剖析:二级缓存源码剖析

Mybatis源码剖析:延迟加载源码剖析

SpringBoot整合SSM三大框架源码剖析之SpringBoot源码剖析

转:Java集合源码剖析LinkedHashmap源码剖析

Android 高级进阶(源码剖析篇)