AQS源码剖析第三篇--共享模式

Posted 热爱编程的大忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS源码剖析第三篇--共享模式相关的知识,希望对你有一定的参考价值。

AQS源码剖析第三篇--共享模式


系列文章:

AQS源码剖析第一篇—全貌概览

AQS源码剖析第二篇–公平与非公平,条件队列和线程中断


这篇,我们的关注点是 AQS 最后的部分,AQS 共享模式的使用。

本文先用 CountDownLatch 将共享模式说清楚,然后顺着把其他 AQS 相关的类 CyclicBarrier、Semaphore 的源码一起过一下。


CountDownLatch

CountDownLatch 这个类是比较典型的 AQS 的共享模式的使用,这是一个高频使用的类。latch 的中文意思是门栓、栅栏,具体怎么解释我就不废话了,大家随意,看两个例子就知道在哪里用、怎么用了。

假设我们有3个任务,那么我们会用 3 来初始化一个 CountDownLatch,然后将这个 latch 的引用传递到各个线程中,在每个线程完成了任务后,调用 latch.countDown() 代表完成了一个任务。

调用 latch.await() 的方法的线程会阻塞,直到所有的任务完成。

package com.aqs;

import java.util.concurrent.CountDownLatch;

/**
 * @author 大忽悠
 * @create 2022/10/2 16:56
 */
public class AqsShare 
    /**
     * 开始信号
     */
    private static final CountDownLatch START_SIGNAL=new CountDownLatch(1);
    /**
     * 结束信号
     */
    private static final CountDownLatch END_SIGNAL=new CountDownLatch(3);


    public static void main(String[] args) throws InterruptedException 
        for (int i = 0; i < 3; ++i) 
            new Thread(new Worker(START_SIGNAL, END_SIGNAL)).start();
        

        // 这边插入一些代码,确保上面的每个线程先启动起来,才执行下面的代码。
        doSomethingElse();
        // 因为这里 N == 1,所以,只要调用一次,那么所有的 await 方法都可以通过
        START_SIGNAL.countDown();
        doSomethingElse();
        // 等待所有任务结束
        END_SIGNAL.await();
        System.out.println("所有任务执行结束");
    

    private static void doSomethingElse() 
        try 
            Thread.sleep(1000L);
            System.out.println("Main线程休息好了...");
         catch (InterruptedException e) 
            e.printStackTrace();
        
    


    public static class Worker implements Runnable
        /**
         * 开始信号
         */
        private final CountDownLatch startSignal;
        /**
         * 结束信号
         */
        private final CountDownLatch endSignal;

        public Worker(CountDownLatch startSignal, CountDownLatch endSignal) 
            this.startSignal = startSignal;
            this.endSignal = endSignal;
        


        @Override
        public void run() 
            try 
                // 为了让所有线程同时开始任务,我们让所有线程先阻塞在这里
                // 等大家都准备好了,再打开这个门栓
                startSignal.await();
                doWork();
                endSignal.countDown();
             catch (InterruptedException e) 
                e.printStackTrace();
            
        

        private void doWork() throws InterruptedException 
              System.out.printf("当前线程[%s],努力工作中...\\r\\n",Thread.currentThread().getName());
              Thread.sleep(3000L);
            System.out.printf("当前线程[%s],工作结束\\r\\n",Thread.currentThread().getName());
        
    


3个新开启的线程都调用了startSignal.await() 进行阻塞等待,它们阻塞在栅栏上,只有当条件满足的时候(startSignal.countDown()),它们才能同时通过这个栅栏,目的是让所有的线程站在一个起跑线上。

如果始终只有一个线程调用 await 方法等待任务完成,那么 CountDownLatch 就会简单很多,所以之后的源码分析读者一定要在脑海中构建出这么一个场景:有 m 个线程是做任务的,有 n 个线程在某个栅栏上等待这 m 个线程做完任务,直到所有 m 个任务完成后,n 个线程同时通过栅栏。

CountDownLatch 非常实用,我们常常会将一个比较大的任务进行拆分,然后开启多个线程来执行,等所有线程都执行完了以后,再往下执行其他操作。


源码分析

构造方法,需要传入一个不小于 0 的整数:

    public CountDownLatch(int count) 
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    

CountDownLatch内部提供的同步器实现如下:

private static final class Sync extends AbstractQueuedSynchronizer 
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) 
            setState(count);
        

        int getCount() 
            return getState();
        

       //count计数为0,任务都完成了,就返回1,否则返回-1
        protected int tryAcquireShared(int acquires) 
            return (getState() == 0) ? 1 : -1;
        

        //count计数自旋减一,如果减去一后为0,则返回true,否则返回false  
        protected boolean tryReleaseShared(int releases) 
            // Decrement count; signal when transition to zero
            for (;;) 
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            
        
    

代码都是套路,先分析套路:AQS 里面的 state 是一个整数值,这边用一个 int count 参数其实初始化就是设置了这个值,所有调用了 await 方法的等待线程会挂起,然后有其他一些线程会做 state = state - 1 操作,当 state 减到 0 的同时,那个将 state 减为 0 的线程会负责唤醒 所有调用了 await 方法的线程。


对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。

countDown() 方法每次调用都会将 state 减 1,直到 state 的值为 0;而 await 是一个阻塞方法,当 state 减为 0 的时候,await 方法才会返回。await 可以被多个线程调用,读者这个时候脑子里要有个图:所有调用了 await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满足(state == 0),将线程从队列中一个个唤醒过来。


await源码分析

首先,我们来看 await() 方法,它代表线程阻塞,等待 state 的值减为 0。

public void await() throws InterruptedException 
    //共享模式下响应中断的获取资源获取
    sync.acquireSharedInterruptibly(1);

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException 
    //如果获取资源过程中发生了中断,会给予响应
    if (Thread.interrupted())
        throw new InterruptedException();

    //尝试在共享模式下获取资源
    if (tryAcquireShared(arg) < 0)
    //资源未能获取成功,则准备进入阻塞队列
        doAcquireSharedInterruptibly(arg);


//尝试在共享模式下去获取资源
//对于CountDownLatch来说,资源是否获取成功,等价于count是否为0
// 只有当 state == 0 的时候,这个方法才会返回 1,表示资源获取成功,那就不需要将当前线程弄到阻塞队列等着了
//如果资源获取失败,则返回-1,下一步就是让当前线程去阻塞队列等着了
protected int tryAcquireShared(int acquires) 
    return (getState() == 0) ? 1 : -1;


doAcquireSharedInterruptibly 从方法名我们就可以看出,这个方法是获取共享锁,并且此方法是可中断的(中断的时候抛出 InterruptedException 退出这个方法)。

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException 
        //将当前线程以共享模式加入阻塞队列
        final Node node = addWaiter(Node.SHARED);
        try 
           //死轮询直到获取到共享锁
            for (;;) 
                //只有前驱节点是head哑结点的情况下,才有机会去抢共享锁
                final Node p = node.predecessor();
                if (p == head) 
                    //抢锁成功对于CountDownLatch意味着count=0
                    int r = tryAcquireShared(arg);
                    //返回值大于0,表示count=0了,那么需要链式唤醒后续其他阻塞线程
                    if (r >= 0) 
                        //设置当前节点为头结点,并且唤醒阻塞队列上模式为shared的后续节点 
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    
                
                //抢锁失败就挂起,和之前一样---这里抢锁失败意味着count!=0
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            
         catch (Throwable t) 
            cancelAcquire(node);
            throw t;
        
    

如果当前阻塞队列中第一个节点绑定的线程阻塞被唤醒后,发现此时count=0了,那么会去链式唤醒在阻塞队列上的后续模式为shared的节点。

链式唤醒的逻辑存在于setHeadAndPropagate中:

private void setHeadAndPropagate(Node node, int propagate) 
        Node h = head;
        //设置当前节点为新的头结点
        setHead(node);
        //如果当前被唤醒线程的node节点的下一个节点是共享模式,那么会通过doReleaseShared唤醒后继节点
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) 
            Node s = node.next;
            //链式唤醒的核心,后面分析doReleaseShared方法的时候就会明白
            if (s == null || s.isShared())
                doReleaseShared();
        
    

下面再通过流程图的方式把上面的流程过一遍:

  • 线程 t3 经过第 1 步 addWaiter 入队以后,我们应该可以得到这个:


由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,t3 将 head 的 waitStatus 值设置为 -1,如下:


然后进入到 parkAndCheckInterrupt 的时候,t3 挂起。

我们再分析 t4 入队,t4 会将前驱节点 t3 所在节点的 waitStatus 设置为 -1,t4 入队后,应该是这样的:


然后,t4 也挂起。接下来,t3 和 t4 就等待唤醒了。

接下来,我们来看唤醒的流程。为了让下面的示意图更丰富些,我们假设用 10 初始化 CountDownLatch。


countDown源码分析

    public void countDown() 
        sync.releaseShared(1);
    
    public final boolean releaseShared(int arg) 
        if (tryReleaseShared(arg)) 
            doReleaseShared();
            return true;
        
        return false;
    

releaseShared是由AQS框架提供的模板方法,其中tryReleaseShared如何释放共享锁的逻辑应该由子类实现,因此我们来看看countDownLatch中是如何实现的吧。

        protected boolean tryReleaseShared(int releases) 
            // Decrement count; signal when transition to zero
            for (;;) 
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            
        

这个方法很简单,用自旋的方法实现 state 减 1,如果减去一后,state为0,返回true。

state为0,对于CountDownLatch来说,即count为0,即计数为0了,那么就需要唤醒所有因为await阻塞的线程了。


doReleaseShared是由AQS框架提供的,共享模式下唤醒逻辑:

// 调用这个方法的时候,state == 0
private void doReleaseShared() 
    for (;;) 
        Node h = head;
        //阻塞队列不为空
        if (h != null && h != tail) 
            int ws = h.waitStatus;
            //说明头结点后面存在节点需要被唤醒
            if (ws == Node.SIGNAL) 
                // 将 head 的 waitStatue 设置为 0---因为唤醒头结点的后继节点后,后继节点后将自己作为新的头结点
                //旧的头结点会从阻塞队列中移除
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒 head 的后继节点,也就是阻塞队列中靠前面满足条件的节点
                unparkSuccessor(h);
            
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
                continue;                // loop on failed CAS
        
        if (h == head)                   // loop if head changed
            break;
    

这段代码先不看完,就看到unparkSuccessor(h),然后结束。

当unparkSuccessor(h)唤醒头节点的第一个后继节点后,因为此时count=0,所以:


setHeadAndPropagate开始了链式唤醒:


唤醒第一个后继节点后,还存在下面几种可能情况:

  1. 被唤醒的后继节点所在线程将自己设置为新的头结点时,调用CountDown的线程已经执行完了,到了 if (h == head)判断逻辑
// 调用这个方法的时候,state == 0
private void doReleaseShared() 
    for (;;) 
        Node h = head;
        //阻塞队列不为空
        if (h != null && h != tail) 
            int ws = h.waitStatus;
            //说明头结点后面存在节点需要被唤醒
            if (ws == Node.SIGNAL) 
                // 将 head 的 waitStatue 设置为 0---因为唤醒头结点的后继节点后,后继节点后将自己作为新的头结点
                //旧的头结点会从阻塞队列中移除
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
            // 唤醒 head 的后继节点,也就是阻塞队列中靠前面满足条件的节点
                unparkSuccessor(h);`在这里插入代码片`
            
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
                continue;                // loop on failed CAS
        
        //此时头结点还没有被后继节点重新设置,因此满足条件,跳出循环
        if (h == head)                   // loop if head changed
            break;
    

  1. 被唤醒的后继节点所在线程将自己设置为新的头结点时,调用CountDown的线程还没执行完 if (h == head)判断逻辑
// 调用这个方法的时候,state == 0
private void doReleaseShared() 
    for (;;) 
        Node h = head;
        //阻塞队列不为空
        if (h != null && h != tail) 
            int ws = h.waitStatus;
            //说明头结点后面存在节点需要被唤醒
            if (ws == Node.SIGNAL) 
                // 将 head 的 waitStatue 设置为 0---因为唤醒头结点的后继节点后,后继节点后将自己作为新的头结点
                //旧的头结点会从阻塞队列中移除
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
         // 唤醒 head 的后继节点,也就是阻塞队列中靠前面满足条件的节点
                unparkSuccessor(h);
            
            //当唤醒到阻塞队列中最后一个节点时,自然此时ws为0了
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
                continue;                // loop on failed CAS
        
        //此时头结已经被后继节点重新设置,因此不满足条件,继续循环
        if (h == head)                   // loop if head changed
            break;
    


CyclicBarrier

字面意思是“可重复使用的栅栏”或“周期性的栅栏”,总之不是用了一次就没用了的,CyclicBarrier 相比 CountDownLatch 来说,要简单很多,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。看如下示意图,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。


首先,CyclicBarrier 的源码实现和 CountDownLatch 大相径庭,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现。

因为 CyclicBarrier 的源码相对来说简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。

先用一张图来描绘下 CyclicBarrier 里面的一些概念,和它的基本使用流程:


看图我们也知道了,CyclicBarrier 的源码最重要的就是 await() 方法了。

简单使用演示如下:

package com.aqs;

import lombok.SneakyThrows;


import java.util.concurrent.*;

/**
 * @author 大忽悠
 * @create 2022/10/2 16:56
 */
public class AqsShare 
    public static void main(String[] args) throws InterruptedException 
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> 
            System.out.println("所有线程已经到达栅栏处,准备放行");
        );

        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) 
            executorService.execute(()->
                try 
                    while(true)
                        System.out.println("马上到达栅栏处...");
                        sleepRandom();
                        cyclicBarrier.await();
                        System.out.println("栅栏放开...");
                    
                 catch (InterruptedException |以上是关于AQS源码剖析第三篇--共享模式的主要内容,如果未能解决你的问题,请参考以下文章

《并发系列一》AbstractQueuedSynchronizer(AQS)- 互斥锁源码剖析

AQS 详解

AQS源码探究_08 CyclicBarrier源码分析

AQS源码探究_08 CyclicBarrier源码分析

Java并发:深入浅出AQS之共享锁模式源码分析

AQS源码剖析第一篇---全貌概览