AQS原理初探

Posted 风在哪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS原理初探相关的知识,希望对你有一定的参考价值。

AQS原理初探

AQS全称为AbstractQueuedSynchronizer,如果直接按名字翻译的话就是抽象队列式同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类的实现都依赖于它,如ReentrantLock(可重入锁)、Semaphore(信号量)等等。它是构建锁或者其他同步组件的基础框架。

可重入锁

这里解释一下可重入锁:可重入锁就是如果某个线程已经获得某个锁,可以再次获取该锁而不会导致死锁。ReentrantLock以及synchronized都是可重入锁,其中ReentrantLock需要自己手动释放(如果获取次数和释放次数不一致会有问题),而synchronized会自动释放。

AQS成员变量


    private transient volatile Node head;		// 队列的头节点

    private transient volatile Node tail;		// 队列的尾节点

    private volatile int state;				// 标识同步状态

使用int类型的state是AQS定义线程获取锁的模式包括独占模式和共享模式,所以使用int可以标识这两种状态。

AQS如何实现同步

AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获得了锁,当state=0时表示释放了锁,它提供了三个方法getState(),setState(int newState),compareAndSetState(int expect, int update)来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

state是volatile类型的变量,volatie保证了共享变量的可见性和有序性,但是不保证原子性,所以这里还有一个cas操作保证了设置state操作的原子性。

    private volatile int state;

    
    protected final int getState() {	// 返回当前状态的值
        return state;
    }

    
    protected final void setState(int newState) {	// 设置当前状态的值
        state = newState;
    }

    // 以cas的方式设置当前状态的值
    protected final boolean compareAndSetState(int expect, int update) {	
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

AQS定义了两种资源共享方式:Exclusive(独占方式,只有一个线程执行)和Share(共享式,多个线程可以同时执行)

CLH同步队列

CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其数据结构如下

image-20210316163015014

队列的node节点包含如下字段:

	/** 标识该节点在共享模式 */
        static final Node SHARED = new Node();
        /** 标识该节点在独占模式 */
        static final Node EXCLUSIVE = null;

	/** 标识此线程取消了争夺锁的操作 */
        static final int CANCELLED =  1;
        
	// 表示当前节点的后继节点对应的线程需要被唤醒
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        
	// 当前线程的状态
        volatile int waitStatus;

        /**
         * 当前结点的前置结点
         */
        volatile Node prev;

        /**
         * 当前结点的下一个结点
         */
        volatile Node next;

        /**
         * 使该结点排队的线程,在构造函数中初始化,并且用完以后置为null
         */
        volatile Thread thread;

        /**
         * 指向下一个等待某一条件的结点,或者处于SHARED状态的结点
         */
        Node nextWaiter;

如何自定义同步器

不同的自定义的同步器争用共享资源的方式不同,自定义同步器在实现时只需要实现共享资源的state的获取于释放方式即可,至于具体线程等待队列的维护AQS顶层已经为我们实现了,自定义同步器需要实现以下方法:

  • isHeldExclusively():该线程是否正在独占资源
  • tryAcquire(int):以独占的方式尝试获取资源,成功则返回true,失败则返回false,成功获取同步状态以后,其他线程需要等待该线程释放同步状态才能获取同步状态
  • tryRelease(int):以独占的方式去尝试释放资源,成功则返回true,失败返回false
  • tryAcquireShared(int):以共享方式尝试获取资源,其中返回负数表示失败;0表示成功,但没有剩余资源可用;整数表示成功,且有剩余资源
  • tryReleaseShared(int):以共享方式尝试释放资源,成功则返回true,失败则返回false

当然AQS还包含其他的已经给我们实现好的方法:

  • acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
  • acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
  • tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
  • acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
  • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
  • release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
  • releaseShared(int arg):共享式释放同步状态;

源码浅析

获得锁

如果想要获取锁的话,会调用tryAcquire方法,我们来看一下这个方法:

	/*
	通过这个源码可以看出这个方法会返回boolean值,判断是否成功获得锁,其中使用int型变量arg来修改state
	这个方法没有具体的实现,需要继承AQS的类自己去实现,给上层的调用开放空间,上层可以自由编写业务逻辑
	*/
	protected boolean tryAcquire(int arg) {
        	throw new UnsupportedOperationException();
    	}

我们来看一个具体的实现:

	// 这是ReentrantLock的内部类,FairSync继承自Sync,Sync是继承了AbstractQueuedSynchronizer
   static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

当获取锁成功,那么线程就可以对资源进行操作,使用完成之后再释放,如果获取锁失败呢?

当获取锁失败的时候,如果上层业务不想等待锁,那么就可以进行相应的处理(例如直接返回false);如果等待的话, 可以调用acquire方法,而不用自己去实现复杂的排队处理,这就是框架灵活的地方,我们来看看这个acquire方法:

// 使用关键字final修饰,所有的继承类不能改变此方法
public final void acquire(int arg) {
        /* 
        	如果tyrAcquire()获取锁成功,那么就直接返回,跳出这个方法
        	否则会调用acquireQueued方法,这个里面会
        */
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

上述的方法调用了addWaiter和acquireQueued方法,我们首先来看看addWaiter方法:

// 将当前线程封装为node节点加入等待队列
// 先尝试进行快速入队,如果失败则再进行完整的入队
// 注意这里返回的是尾节点
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 当前尾节点为空,或者cas操作入队失败的话,会调用enq方法,进入完整的入队方法
        if (pred != null) {
            node.prev = pred;
            // if里面的内容会不会产生线程安全问题?
            if (compareAndSetTail(pred, node)) {
                // 这里只是将前置节点的指针指向了当前节点,即使这时候尾节点发生变化也不会产生线程安全问题
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

	
	private Node enq(final Node node) {
        // 自旋,采用尾插法
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

接下来就是acquireQueued方法:

  • 如果当前线程所在的节点为头节点的下一个节点,那么将会不断的尝试拿锁,直到拿锁成功,否则将会判断是否需要挂起
  • 如何判断是不是需要挂起,如果当前线程所在节点之前除了head还有其他节点,那这些节点的waitStatus为signal,那这些节点就需要挂起,保证head之后只有一个节点获取锁,其他节点都处于已挂起或正在被挂起状态,避免无用的自旋
  • 当一个线程使用完资源,并且尝试去释放锁的时候应该尝试去唤醒挂起的线程,
final boolean acquireQueued(final Node node, int arg) {
    	// 从finally方法块中,我们可以看出,当方法正常执行时,failed永远是为false,不会执行cancelAcquire方法,只有方法抛出异常,failed为true进入finally以后才会调用cancelAcquire方法
        boolean failed = true;
        try {
            // 
            boolean interrupted = false;
            // 自旋操作
            for (;;) {
                final Node p = node.predecessor();
                // 如果当前节点的前置结点为头节点,而且当前线程尝试获取锁成功了,那么直接返回即可
                // 在这个地方,头节点其实是一个空节点,只是充当一个标识符,头节点的next节点才是真正需要拿锁的节点,当他拿到锁以后,它就变成了头节点,并且会将p节点置为空,使它参与垃圾回收。
               	// 判断当前节点有没有资格拿锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 判断当前线程是否需要挂起,减少自旋等待的线程,减少cpu的压力,提升性能
                // shouldParkAfterFailedAcquire返回true,则当前线程需要挂起
                // 如果当前线程需要挂起,并且成功挂起,而且Thread.interrupted()的放回为true时,interrupted会被置为true
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // cancelAcquire就是将node.waitStatus置为cancel,并进行一定的清理操作
            if (failed)
                cancelAcquire(node);
        }
    }


	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        // 当waitStatus>0时,只可能是cancel状态,此时直接将节点从等待队列删除
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            // 删除前置结点为cancel的node
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // cas操作将前置节点的waitStatus置为signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

/* 这里返回Thread.interrupted()意欲何为?若当前线程被挂起,
那么在此期间,很有可能有aqs以外的其他操作中断这个线程,
调用了该线程的interrupt方法,(如果线程调用wait或sleep等方法挂起,
那么interrupt方法可能会抛出异常),使用中断原语LockSupport.park挂起后,
再中断是不会抛出异常的,如果其他某个地方调用了该线程的interrupt方法,
只会改变线程内部的中断状态值,所以我们需要通过变量记录这个值。*/
/* 如果外部调用了线程的interrupt,那么该线程被唤醒的时候,
此时interrupted为true,会将中断信息带到外层,这样将会调用外层的selfInterrupt()方法*/
/* 当线程处于等待队列时,无法响应外部的中断请求,
只有当这个线程拿到锁之后再进行中断响应*/
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }


	static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

释放锁

tryRelease也是将实现交给继承它的子类,提高编程的自由度

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

release中,如果尝试释放锁成功,那么将尝试唤醒队列中的其他节点

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
/* 传进来的是头节点,该方法是为了唤醒头节点后的node,
此时head节点其实已经完成了所有的操作,需要先将他的waitStatus置为0,
不影响其他函数的判断*/

/* 然后程序开始从这个先进先出的队列的尾节点开始搜索,
找到除了head节点以外最靠前的waitStatus小于等于0的节点,
然后唤醒该线程,当线程被唤醒后它会自行执行acquireQueued方法,
尝试自旋的获取锁。当他被唤醒之前还被阻塞在那里。*/
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

为什么唤醒操作不直接从头开始搜索,而是从后往前搜索?

从后往前遍历是因为插入的时候,cas和next节点赋值的时候可能会有其他线程打断,导致从前往后遍历会出现null。

在enq方法里,在执行“if(compareAndSetTail(t, node)){//…}”时,cas是原子操作,但是当cas成功(tail指向当前node),执行if代码块里的内容时,此时不是同步操作。这个时刻,node与前一个节点t之间,node的prev指针在cas操作之前已经建立,而t的next指针还未建立。此时若其他线程调用了unpark操作,从头开始找就无法遍历完整的队列,而从后往前找就可以

参考

CodesGeek

石头StoneWang

AstrophelYang

bilibili 寒食君

以上是关于AQS原理初探的主要内容,如果未能解决你的问题,请参考以下文章

多线程(十AQS原理-ReentrantLock实现)

Java并发编程原理与实战十九:AQS 剖析

AQS 原理以及 AQS 同步组件总结

AQS源码解析

从ReentrantLock的实现看AQS的原理及应用

AQS 原理解析以及源码分析