同步队列器AQS的实现原理

Posted yxz1025

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了同步队列器AQS的实现原理相关的知识,希望对你有一定的参考价值。

 

 

Java中的锁一般分为两种,一种是synchronized关键字,它是基于底层CPU指令实现的锁,另外一种是Lock锁,是基于jdk实现的一种锁,传统的synchronized是一个重量级锁,缺点是有多个线程获取锁时,获取失败的线程会进入阻塞状态,成功获取锁的线程在处理完逻辑后会通知阻塞态的线程,这时候是随机挑选的属于非公平锁,而lock锁,比如ReentrantLock在默认情况下唤醒被阻塞的线程的方式也是属于非公平锁,但是可动态调整锁的方式,当构造参数传为fair=true表示在公平环境下获取锁,言归正传,所有的jdk实现的锁ReentrantLock、CountDownLatch、Semaphore通过内部匿名类继承了AbstractQueuedSynchronizer,然后重写AQS提供的部分方法
1、原理及架构
AQS内部维护一个同步队列,所有的同步队列中的线程通过自旋方式不断获取同步状态,当一个线程无法获取同步状态时,通过创建一个Node节点并将此节点加入队列的尾部,此队列严格按照FIFO方式出入队列,只有当前线程是头结点并且成功获取到同步状态,此时的线程就可出队列;state在AQS中被定义为一个volatile类型的的变量通过提供两种方法来让子类设置同步状态分别是compareAndSetState和setState,一种是通过原子CAS的方式,一种是普通方式

首先看看Node节点的属性:

  • SHARED、EXCLUSIVE表示节点的模式,在创建节点是指定一种模式,一般情况比如对于文件系统的读操作,可以设置为共享模式,对于文件的写要设置为独占式保证线程安全的执行
  • waitStatus:节点的状态为三种
  1. CANCELLED表示线程处于超时或者中断状态,值为1
  2. SIGNAL表示后继节点被唤醒,值为-1
  3. CONDITION表示线程处于阻塞队列中,无法在同步队列中使用,值为-2,直到调用signal方法后将其转移到同步队列中
  4. PROPAGATE表示下一个共享模式下获取同步状态会被持续传播下去,值为-3

获取同步状态的方式:共享模式、独占模式,下面先分析一下独占模式下AQS是如何获取一个同步状态的:

  • 独占模式下同步状态的获取
    // 独占模式下获取同步状态
    public final void acquire(int arg) 
        //尝试获取同步状态,如果获取失败,创建节点并且将节点加入到同步队列的尾部,并进行自旋状态,同时设置为阻塞状态
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    

tryAcquire方法需要子类去实现,先分析一下这里的addWaiter:

    //入参Mode为节点的模式,这里将获取不到状态的线程加入到同步队列中
    private Node addWaiter(Node mode) 
        //首先创建一个线程节点
        Node node = new Node(Thread.currentThread(), mode);
        //获取尾节点赋值当做新增节点的前驱
        Node pred = tail;
        if (pred != null) 
            node.prev = pred;
            //进行cas原子操作将新增节点加入到尾部,同时将前驱节点的后继节点设置为当前节点并返回
            if (compareAndSetTail(pred, node)) 
                pred.next = node;
                return node;
            
        
        //如果不存在同步队列或者尾部不存在
        enq(node);
        return node;
    

end方法通过cas方式先去找尾节点,若为null,则初始化同步队列 这时,tail=head,否则按照正常的将新节点加入到尾部

    //通过cas死循环
    private Node enq(final Node node) 
        for (;;) 
            Node t = tail;
            if (t == null)  // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
             else 
                node.prev = t;
                if (compareAndSetTail(t, node)) 
                    t.next = node;
                    return t;
                
            
        
    

这里分析一下acquireQueued方法,当新的线程未获取到同步状态后则加入到同步队列中,此时通过此方法进行自旋

    final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                //获取当前线程节点的前驱节点,如果是head节点则再次尝试获取同步状态,获取成功,将自身设置为头节点,前驱节点出队列
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                
                //获取同步状态失败则设置该节点的状态,并且阻塞该节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    
进行自旋时一旦发现前驱节点为head节点并且尝试获取同步状态,获取成功则设置当前节点为头节点

shouldParkAfterFailedAcquire方法为当节点未获取同步状态时设置该节点的waitingStatus的状态,如果是被阻塞了直接返回true:
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //如果前驱节点已经被唤醒,则它是安全的被阻塞了,直接返回true
            return true;
        if (ws > 0) 
            //如果当前节点的前驱节点状态为cancelled,则进入循环直到找到不为cancelled的节点,把此节点设置为当前节点的前驱节点
            do 
                node.prev = pred = pred.prev;
             while (pred.waitStatus > 0);
            pred.next = node;
         else 
            //将前驱节点设置为signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        
        return false;
    
  • 独占模式同步状态的释放流程release方法
    public final boolean release(int arg) 
        if (tryRelease(arg)) 
            Node h = head;
            //获取头部节点不为Null,则唤醒后继节点
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        
        return false;
    

这里重点看看unparkSuccessor方法,当唤醒后继节点后,由于进入同步队列的线程都处于不停地自旋状态,一旦符合前驱是head并且获取到同步节点则进行出队列

    private void unparkSuccessor(Node node) 
        //获取head节点的状态如果状态不是初始状态或cancelled,则设置为初始态
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //如果后继节点是取消状态或者为Null,则循环从尾部节点开始往前找
        Node s = node.next;
        if (s == null || s.waitStatus > 0) 
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        
        //唤醒后继节点
        if (s != null)
            LockSupport.unpark(s.thread);
    
  • 共享模式下同步状态的获取

共享模式下同步状态的获取会忽略线程的中断,一旦获取成功则返回true,否则进入阻塞队列

    public final void acquireShared(int arg) 
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    

doAcquireShared方法,首先创建一个共享模式的节点加入到同步队列中,进行死循环,如果当前的节点前驱为head节点,再次尝试获取同步状态,获取成功则设置自身为head节点

    private void doAcquireShared(int arg) 
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                //获取前驱节点
                final Node p = node.predecessor();
                if (p == head) 
                    //如果节点为head,再次获取同步状态
                    int r = tryAcquireShared(arg);
                    if (r >= 0) 
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    

这里看看setHeadAndPropagate方法的实现细节

    private void setHeadAndPropagate(Node node, int propagate) 
        Node h = head;
        //设置head节点
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) 
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        
    
  • 共享模式同步状态释放流程
    public final boolean releaseShared(int arg) 
        if (tryReleaseShared(arg)) 
            doReleaseShared();
            return true;
        
        return false;
    

对于共享式释放来自多个线程的释放操作,tryReleaseShared方法需要保证原子操作,一般采用CAS方式

整个AQS的获取及释放源码分析到这里就结束了,下一章节继续分享AQS的子类ReentrantLock、CountDownLatch、Semaphore的源码(关注公众号“聊点源码”获取更多资讯)

以上是关于同步队列器AQS的实现原理的主要内容,如果未能解决你的问题,请参考以下文章

同步队列器AQS之condition等待队列的实现分析

同步队列器AQS的实现原理

AQS

AQS-ReentrantLock实现原理

JUC回顾之-AQS同步器的实现原理

原来 AQS实现原理还能如此总结