Java 并发之AbstractQueuedSynchronizer(AQS)源码解析

Posted 后台开发学习

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 并发之AbstractQueuedSynchronizer(AQS)源码解析相关的知识,希望对你有一定的参考价值。

关键字CLH,Node,线程,waitStatus,CAS,中断

目录

图解AQS的操作细节
0、前言
1、基本概念
1.1、CAS自旋
1.2、Node
1.3、CLH & AQS
1.4、ReentrantLock
2、图解AQS
2.1、线程A单独运行
2.2、线程B开始运行
2.3、线程C开始运行
2.4、线程A停止运行,线程B继续运行
2.5.1、线程B停止运行,线程C继续运行
2.5.2、线程C放弃竞争
3、问题总结
3.1、为什么在unparkSuccessor操作中从尾节点开始扫描
3.2、公平锁中的hasQueuedPredecessors操作为什么从tail开始
3.3、当前运行的线程在哪里
3.4、节点的waitStatus和AQS的state的区别
4、参考链接

CountDownLatch ==> Java 并发之CountDownLatch 计数器 操作图解细节

0、前言

AQS 全称 AbstractQueuedSynchronizer,中文简称是同步器,是 jdk1.5开始放在JUC中的,是java中关于线程、同步操作的基础组件,JUC作者Doug Lea AQS简介论文,ReentrantLock,ReentrantReadWriteLock,CountDownLatch等都是基于AQS开发的复合特定场景的锁

先介绍AQS中的基本概念,然后具体使用3个线程的操作过程一步一步分析AQS中的操作细节,配合着分析AQS源码,了解其中的原理。本文只分析互斥、非公平的方式竞争的情况

1、基本概念

1.1、CAS自旋

CAS 是Compare And Swap的简称,具有单一变量的原子操作特性,对比成功后进行交换操作,他是乐观操作,期间会无限循环操作,直到对比成功,然后进行后续交互操作

CAS 包含了三个操作数据,内存位置V、预期值A、新预期值B,如果当前内存V存放的数据和A一样,就认为比较成功,然后把当前V所在的位置设置为B

因为会无限循环操作,所以可能导致CPU效率低下,而且运行中还会导致ABA问题,也就是A->B->A的问题,误以为此时数据未发生变化,其实中间已经发生变化。该问题在java中提供了类AtomicStampedReference解决该问题,先会查看当前引用值是否符合期望,ABA也会变成1A->2B->3A,这样很清楚的感知到发生变化了。

1.2、Node

Node 是AQS中的双向链表的节点信息,主要信息如下

// 共享模式
static final Node SHARED = new Node();
// 互斥模式(独斥)
static final Node EXCLUSIVE = null;

// 当前线程取消竞争
static final int CANCELLED =  1;
    
// 后置节点等待被唤醒
static final int SIGNAL    = -1;
    
// 暂时不明含义
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
  
// 和共享有关  
static final int PROPAGATE = -3;
    
// 最重要的信息,就是为上面的1,-1,-2,-3,以及0
volatile int waitStatus;
// 前置节点
volatile Node prev;
// 后置节点
volatile Node next;
// 节点存储的线程信息,会被挂起和唤醒
volatile Thread thread;

// 和condition 有关
Node nextWaiter;

1.3、CLH & AQS

在AQS中所有没有抢占到锁的线程最后都会被存储到CLH双向链表中,然后挂起等待被唤醒

// CLH队列的头结点,其不包含线程信息,head永远为null
private transient volatile Node head;
// CLH队列的尾节点,每次新加一个节点都会尾插到最后
private transient volatile Node tail;
// 当前锁被占据的次数,因为可以被一个线程重复占据,所以其值可以大于0
// 没有线程占据,其值就是0
private volatile int state;
// 当前运行的线程,也是占据锁的线程,注意和CLH中的线程无关
private transient Thread exclusiveOwnerThread; 

1.4、ReentrantLock

基于AQS开发的可重入、互斥锁,并且可以自行响应线程中断,而基于底层操作系统实现线程安全的synchronized关键字却不没有该功能

Lock lock = new ReentrantLock();
// 可以通过传入的boolean值设置其为公平方式还是不公平方式占据锁
lock.lock();
try { 
  // 业务代码,线程互斥
}
finally {
  lock.unlock(); 
}

2、图解AQS

现在待运行的三个线程ABC,将会按照如下情况依次执行,三个线程故意设置耗时很长从而出现互斥的情况,必然出现锁竞争,自旋的情况,根据其运行情况,分析AQS的运行过程和原理,

 
技术图片
image

2.1、线程A单独运行

不存在任何存在的竞争问题,A线程获取锁成功,A线程进入到了运行中的情况,此时的AQS执行时,tryAcquire直接成功,看看非公平方式的tryAcquire操作源码

ReentrantLock 类

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 获取当前线程,此处肯定是A线程了
    int c = getState();
    if (c == 0) {
        // 第一次运行,state=0
        if (compareAndSetState(0, acquires)) {
            // 1、利用CAS自旋方式,判断当前state确实为0,然后设置成acquire(1)
            // 这是原子性的操作,可以保证线程安全
            setExclusiveOwnerThread(current);
            // 设置当前执行的线程,直接返回为true
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
       // 2、当前的线程和执行中的线程是同一个,也就意味着可重入操作
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        // 表示当前锁被1个线程重复获取了nextc次
        return true;
    }
    // 否则就是返回false,表示没有尝试成功获取当前锁
    return false;
}

此时A线程直接在1处被允许,然后设置了当前锁被占据的线程,直接返回,A线程正常运行

2.2、线程B开始运行

 
技术图片
image

现在A线程一直在运行中,B线程开始运行,同样的B线程也要开始尝试获取当前锁,经过tryAcquire操作肯定返回false,获取当前锁失败,原因可同理分析上面1.1的代码块,现在的情形是

A线程:继续运行
B线程:获取锁失败,计划进入到CLH队列中

这时候B线程执行的线路是acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),先添加一个互斥节点,然后唤醒可以执行的节点线程,这个时候的CLH队列是初始值状态,都指向null

 
技术图片
image

先看看添加互斥节点的代码

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 新建一个节点,包装的线程是当前线程,传入的mode是互斥的
    // 当前线程是B线程
    Node pred = tail;
    // 此时尾节点tail=null
    if (pred != null) {
        // 当尾节点不为null,也就是CLH存在节点数据时
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            // 又是CAS操作,判断当前pred节点是否是尾节点,如果是,设置node为尾节点,否则跳过
            pred.next = node;
            // 双向链表的基本操作,把当前节点当做尾部节点插入
            return node;
        }
    }
    // 其实上面这一段代码的重点是**快速**,假定当前不存在互斥或者冲突,能够高效的添加节点成功
    
    enq(node);
    // 这个时候B线程创建好一个节点,添加到尾部
    return node;
    // 然后返回新创建的节点
}

private Node enq(final Node node) {
    for (;;) {
        // 死循环,可用严格确保可用完整插入到队列尾部
        Node t = tail;
        if (t == null) { 
            // 1、如果当前尾部是null,这个地方就是初始化的CLH队列的地方
            // B线程肯定会执行到这个地方
            if (compareAndSetHead(new Node()))
                // CAS自旋方式,设置当前队列头部节点为一个new Node(不包含线程信息)
                tail = head;
                // 头部和尾部指向同一个新建的new Node 节点
                // 注意此时没有退出。。。。只是完成if操作,会继续for循环操作
        } else {
            // 2、第二次循环到这里,CLH队列只有一个节点new Node
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                // 利用CAS的方式,配合双写链表的连接方式,添加一个节点到尾部
                t.next = node;
                // 然后返回原尾部节点
                return t;
            }
        }
    }
}

经历过上面的代码块的1、2两个步骤时的CLH队列情况是

步骤1步骤2
 
技术图片
image
 
技术图片
image

需要注意到head指向的节点线程为null!

再来到acquireQueued操作,去唤醒可能执行的节点线程

final boolean acquireQueued(final Node node, int arg) {
    // 这个时候node节点是上图中的tail节点,也就是线程B所在的节点,arg=1
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 查看当前node节点的前置节点p
            if (p == head && tryAcquire(arg)) {
               // 如果前置节点是头部节点,B线程进行尝试获取锁
               // 如果!是如果!
               // 尝试获取成功了,也就是A线程执行完成,然后设置当前当前可执行的节点是B线程
                setHead(node);
                // 把node节点的线程设置为null,然后head指到node上
                p.next = null; // help GC
                // 回收操作
                failed = false;
                // 返回,然后将执行的将会是B线程
                return interrupted;
            }
            // 按照我们的线程ABC的样例,上面的if肯定不会执行
            // 1、因为即使node的前节点是head,但是也无法获取到锁,因为一直被A线程占有者
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 如果shouldParkAfterFailedAcquire返回true,则挂起,否则不挂起线程
                interrupted = true;
            // 2、第一次循环设置了p节点状态位-1,然后B线程循环至这里被挂起
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 返回true,挂起,返回false则不挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    
    if (ws == Node.SIGNAL)
        // 前节点的状态=-1,返回true,直接挂起即可
        return true;
    if (ws > 0) {
        // 前置节点状态有>0,说明前驱节点取消了排队,移除掉该节点
        do {
            node.prev = pred = pred.prev;
            // 拆分为2步
            // pred = pred.prev
            // node.prev = pred
        } while (pred.waitStatus > 0);
        // 当前节点开始往前扫描,从双向队列中除去>0的节点,直到<=0
        pred.next = node;
    } else {
        // 利用CAS设置前节点状态位-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
// 总结一下该方法用途:当前节点的前驱节点状态位-1时,挂起当前线程
// 返回false时,也就是不需要挂起,返回true,则需要调用parkAndCheckInterrupt挂起当前线程
 
技术图片
image

运行到现在,B线程挂起,A线程继续在运行

2.3、线程C开始运行

线程C开始运行,如下图

 
技术图片
image

现在的线程情况是

线程A:运行中
线程B:挂起,暂停运行
线程C:开始运行

很明显,C线程会在tryacquire(1)方法中失败,然后同样通过addWaiter方法添加一个节点放在尾部,使得经过addWaiter方法之后的队列成为

 
技术图片
image

又需要经过acquireQueued方法的处理,但是此时此时的节点是线程C,其前置节点是线程B,不是head,直接运行到shouldParkAfterFailedAcquire方法中

此时B线程所在的节点waitStatus=0,只能通过CAS方法设置B节点waitStatus为SIGHNAL(-1),,并且之后又经过1次循环操作,最后返回true,此时队列为

 
技术图片
image

然后同理经过parkAndCheckInterrupt,线程C也被中断了

现在的情况是

线程A:运行中
线程B:挂起,暂停运行
线程C:挂起,暂停运行

2.4、线程A停止运行,线程B继续运行

 
技术图片
image

现在A运行完成了,需要释放占据的锁

ReentrantLock 类

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        // 当前运行的线程和锁住的线程不是同一个,抛出监控状态异常错误
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 如果为0,则意味着这时可以释放锁了
        free = true;
        setExclusiveOwnerThread(null);
        // 设置当前的占据锁的线程为null,给其他线程一个机会
    }
    setState(c);
    // 存在重入的机会,所以不一定就是释放该锁
    return free;
}

AbstractQueuedSynchronizer 类

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 返回true,则表示可以唤醒其他挂起的线程,通知他们可以干活了
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 如果头部节点有效,则开始唤醒CLH队列的节点线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    // node节点是head,在我们上面已经说明了head节点的waitStatus=-1
   
    int ws = node.waitStatus;
    if (ws < 0)
        // 1、设置head节点状态为0
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    // 从head下一个节点开始处理
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 如果下一个节点为null,或者下一个节点状态>0(为废弃节点)
        // 从尾节点开始扫描,直到扫描到距离head最近的一个<=0的节点信息
        // 在我们当前的例子中,肯定就是线程B
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 样例中,s节点就是B线程所在的节点
        // 2、唤醒s.thread ,会触发acquireQueued的循环继续执行
        LockSupport.unpark(s.thread);
}

经过上面步骤1之后,head节点就被设置为了0

 
技术图片
image

线程B经过LockSupport.unpark操作之后,原本挂起在acquireQueued的循环中,会继续执行

for (;;) {
            final Node p = node.predecessor();
            // 查看当前node节点的前置节点p
            if (p == head && tryAcquire(arg)) {
               // 如果前置节点是头部节点,B线程进行尝试获取锁
               // 如果!是如果!
               // 尝试获取成功了,也就是A线程执行完成,然后设置当前当前可执行的节点是B线程
                setHead(node);
                // 把node节点的线程设置为null,然后head指到node上
                p.next = null; // help GC
                // 回收操作
                failed = false;
                // 返回,然后将执行的将会是B线程
                return interrupted;
            }

恰好当前B线程的前一个节点p是head,而且线程A也已经释放了,没有占据锁,所以在tryAcquire操作时,可以顺利获取到锁,然后把B线程节点当做head,并且把node节点(原head节点)前置和后置都设置为null

 
技术图片
image

其中图中的双向链表的1、2、3依次断开,原head节点被GC回收,新的CLH队列为

 
技术图片
image

现在的情况是

线程A:运行结束
线程B:运行中
线程C:挂起,暂停运行

2.5.1、线程B停止运行,线程C继续运行

 
技术图片
image

现在的情况是

线程A:运行结束
线程B:运行结束
线程C:运行中

按照1.4小节的套路,最后来到了unparkSuccessor 方法中

设置当前头部节点head的waitStatus为0,然后唤醒线程C,把head节点数据全部清除掉,head和tail将会指向同一个节点,如下图,最后线程C在运行,CLH队列只剩下一个节点,head和tail同时指向该节点。

 
技术图片
image

直到线程C运行完,CLH队列依旧如此,相当于完成了CLH队列的初始化,当下一次又来互斥线程时,又会开始创建节点

2.5.2、线程C放弃竞争

现在模拟的是B线程继续运行,而在CLH中的线程C放弃了竞争这种场景

 
技术图片
image

在上述的操作中是无法直接放弃竞争的,接下来看看是为什么,直接来到线程挂起和唤醒的代码快

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
    // 利用另外的线程对当前线程触发中断请求,会直接返回当前中断状态,也就是true
    // 那么在acquireQueued 函数中 interrupted 的值被赋值为true
    // 其他有任何的改变...
}

只是用来标记曾经被中断过,但是未有任何实际上的放弃竞争的操作
不过上面的finally代码块中有采用cancelAcquire 方法,不过一般情况下这个并不会被执行,而是一种兜底方案
,除非出现了一些意外情况

那么我们该如何确保真的进行放弃竞争操作呢,不采用lock.lock方法,而是lock.lockInterruptibly()

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        // 有线程中断,直接抛出中断异常
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    // 尾插节点成功
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 重点在这里,一旦出现了异常,则立马抛出该异常情况
                // 此时failed 字段肯定是true了
                // 需要进入到下面的cancelAcquire 方法中去取消竞争了
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            // 开始取消竞争
            cancelAcquire(node);
    }
}

在上面的学习知道,设置节点状态为Node.CANCELLED,然后在后续的队列清理中会清除该节点的,那现在看看是如何主动取消竞争的

private void cancelAcquire(Node node) {
    // Ignore if node doesn‘t exist
    if (node == null)
        return;

    node.thread = null;
    // 节点线程数据设置为null

    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // 从当前节点的前置节点开始扫描去掉废弃的节点,这样操作不会出现并发问题

    Node predNext = pred.next;

    // 设置为取消状态!
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        // node是尾部节点,而且通过CAS自旋成功,设置pred为tail
        // pred的next为nul,可以彻底和node 节点切割,node节点等待GC回收
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred‘s next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            // 前节点不是头部节点,而且包含的线程不为null(上面就有操作设置线程为null的情况)
            // 或者节点状态可以设置为-1
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            // 这样说么下个节点是可以串在一起的!
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
                // CAS 操作,设置后置节点
             // 这个操作就相当去双写链表去掉node节点   
        } else {
            unparkSuccessor(node);
            // 唤醒node的后置节点
            // 注意!此时node节点还在链表中
            // 他的真正移除是依靠node后置节点的shouldParkAfterFailedAcquire 去移除节点操作完成的
        }
        node.next = node; // help GC
    }
}

总结一下

  • 如果尾部节点或者中间节点就依靠从链表中脱离操作实现放弃竞争
  • 如果是头部的下一个节点,是依靠唤醒下一个节点后进行的链表清楚操作完成

这样就很清楚线程是如何取消竞争操作的

3、问题总结

上面的case只是各种线程执行方式的一种,实际场景中包含了各种各样的运行方式,每个线程执行的时间也长短不一,现在讨论几个常见的问题

3.1、为什么在unparkSuccessor操作中从尾节点开始扫描

这个问题需要回头观察CLH队列是如何创建的,来到addWaiter方法

Node pred = tail;
if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) {
        // 通过CAS 操作后node节点为tail
        // 并且早就设置了node.prev = pred
        // 1、也就是前置的链表已经绑定完毕,后置的链表还未绑定
        pred.next = node;
        return node;
    }
}

在代码块1处执行完成之后,还未执行绑定后置链表操作,如果出现唤醒操作,从头部开始遍历存在一定几率使得无法获取到当前最新的节点(因为后置链表还未绑定,无法通过.next获取到下一个节点信息),而从尾部开始扫描则不会导致该问题

很巧妙的利用尾部扫描避免了可能的并发问题

3.2、公平锁中的hasQueuedPredecessors操作为什么从tail开始

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    // 先读取tail节点,再读取head节点
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

在思考这个问题之前,需要时刻认为CLH队列一直都在动态的变化中,head和tail随时会发生变化
在构建CLH链表时,是先设置head,然后设置tail,那么如果有null的情况只有head!=null && tail==null
一旦在这个时刻添加尾节点成功,head.next 的值其实已经变化了,但是如果先获取head可能存在null的情况,所以先获取tail再获取head

3.3、当前运行的线程在哪里

不存放在CLH队列中,样例图中也并未展示出来,其一直处于运行中,主要CLH队列的head节点线程为null

3.4、节点的waitStatus和AQS的state的区别

两者表示的含义完全不一样,节点的waitStatus是表示节点状态,简洁的表达线程的状态,当其值大于0就认为该节点线程放弃竞争锁

AQS中的state表示该锁被占据的次数,某一时刻只能被一个线程一次或者多次占据,其他线程只能被挂起等待

4、参考链接


作者:jwfy
链接:https://www.jianshu.com/p/282bdb57e343

以上是关于Java 并发之AbstractQueuedSynchronizer(AQS)源码解析的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程:并发容器之ConcurrentHashMap(转载)

Java并发编程:并发容器之ConcurrentHashMap

Java并发编程:并发容器之CopyOnWriteArrayList

Java并发--并发容器之ConcurrentHashMap

java并发之CopyOnWriteArraySet

Java并发之CountDownLatch的使用