JDK类库源码分析系列-AbstractQueuedSynchronizer

Posted _微风轻起

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK类库源码分析系列-AbstractQueuedSynchronizer相关的知识,希望对你有一定的参考价值。

这一篇我们来分析一个Java中的一个线程安全&共享等相关的抽象类AbstractQueuedSynchronizer

在这里插入图片描述

​ 通过这个图片我们可以看到,我们使用的与共享资源并发控制&锁相关类如ReentrantLockReentrantReadWriteLockCountDownLatch这些都是继承的这个类,下面我们就来分析下这个抽象类

一、结构基本介绍

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

​ 这里我们可以看到其只是简单的继承了抽象类AbstractOwnableSynchronizer,以及实现系列化接口。以下可能会用AQS代替这个类名

1、AbstractOwnableSynchronizer

​ 这个类是用来表示当前锁的拥有者的

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
	..........

    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

​ 这个我们可以看到就是一个get&set方法,用来设置&获取占有该锁的对应线程。

二、变量

1、head

private transient volatile Node head;

​ 这个就是表示等待获取锁的队列的头节点(因为如果是独占锁的话,一时间可能就需要存储还没有获取到锁的节点,用来后续获取),可以看到其是使用了volatile来处理其多线程情况下的可见性,同时加了transient,表示不需要对这个字段进行序列化存储,因为这个字段的初始化是懒加载的。

2、tail

private transient volatile Node tail;

​ 与上面类似的,这个是表示等待队列的尾结点。

3、state

private volatile int state;

​ 这个是用来表示这个类当前的同步状态。例如我们使用CountDownLatch计数等待的时候,我们对CountDownLatch初始话的时候设置的入参,就是设置的这个值,而我们使用锁ReetrantLock,当获取锁的时候默认设置的就是将这个status设置为1。我们进行释放锁的时候,就是将这个值一步一步设置到0,当为0的时候,就将占有独占锁的线程置为null

4、STATE&HEAD&TAIL

// VarHandle mechanics
private static final VarHandle STATE;
private static final VarHandle HEAD;
private static final VarHandle TAIL;

static {
    try {
        MethodHandles.Lookup l = MethodHandles.lookup();
        STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
        HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
        TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
    } catch (ReflectiveOperationException e) {
        throw new Error(e);
    }

    // Reduce the risk of rare disastrous classloading in first call to
    // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
    Class<?> ensureLoaded = LockSupport.class;
}

​ 这个就是处理并发的,由于我这里的是JDK9,所以使用的是新的并发控制类VarHandle,其如果是JDK8这里就是用的UnSafe,我们可以简单的将VarHandle当为UnSafe

三、Node子类

1、结构介绍

static final class Node {

​ 这个类就是AbstractQueuedSynchronizer类主要操作的类了

2、变量

1)、SHARED

static final Node SHARED = new Node();

​ 这个就是表明这个节点是等待在共享模式。

2)、EXCLUSIVE

static final Node EXCLUSIVE = null;

​ 表明这个节点等待在独占模式,其是获取的独占锁。

3)、waitStatus

volatile int waitStatus;

​ 这个是用来表明这个节点的等待状态的,用来与下面的值配合,同时该值如果>0,一般表示该节点被取消了。如果<0表示节点是需要被唤醒的。

4)、CANCELLED

static final int CANCELLED =  1;

​ 设置前面的waitStatus,表示这个节点已经取消获取锁了。

5)、SIGNAL

static final int SIGNAL    = -1;

​ 表明该节点的后继节点在阻塞状态,等待被唤醒。

6)、CONDITION

static final int CONDITION = -2;

​ 这个是与Condition配合使用,表示节点在条件队列,用来配合Conditionsignal()await()方法,也就是能自己去主动唤醒线程。

7)、PROPAGATE

static final int PROPAGATE = -3;

​ 这种waitStatus是表示无限制的往下传播下一个节点,一般是用于共享节点。

8)、prev

volatile Node prev;

​ 当前节点的前置节点

9)、next

volatile Node next;

​ 当前节点的后继节点

10)、thread

volatile Thread thread;

​ 表示占有当前节点的线程,在竞争获取锁的时候,AbstractQueuedSynchronizer会将每个线程描叙为Node

11)、nextWaiter

Node nextWaiter;

​ 该节点的下一个等待节点。

3、方法

1)、isShared()

final boolean isShared() {
    return nextWaiter == SHARED;
}

​ 表示下一个nextWaiter节点是不是共享模式。

2)、predecessor()

final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}

​ 获取当前节点的前置节点

3)、Node()

/** Establishes initial head or SHARED marker. */
Node() {}

​ 初始化构建方法,一般共享模式节点就是用的这个构造方法,就如同我们前面的变量SHARED一样,同时这里还可能整个获取锁的链条节点的头节点也是使用这种构造方法。

4)、Node(Node nextWaiter)

Node(Node nextWaiter) {
    this.nextWaiter = nextWaiter;
    THREAD.set(this, Thread.currentThread());
}

​ 这个构造方法,有设置两个值nextWaiterthread(设置为当前线程)。

5)、Node(int waitStatus)

Node(int waitStatus) {
    WAITSTATUS.set(this, waitStatus);
    THREAD.set(this, Thread.currentThread());
}

​ 这个是设置waitStatus&thread

6)、compareAndSetWaitStatus(int expect, int update)

final boolean compareAndSetWaitStatus(int expect, int update) {
    return WAITSTATUS.compareAndSet(this, expect, update);
}

​ CAS设置waitStatus

7)、compareAndSetNext & setPrevRelaxed

final boolean compareAndSetNext(Node expect, Node update) {
    return NEXT.compareAndSet(this, expect, update);
}

final void setPrevRelaxed(Node p) {
    PREV.set(this, p);
}

​ 这个与前面类似。

8)、VarHandle初始化

// VarHandle mechanics
private static final VarHandle NEXT;
private static final VarHandle PREV;
private static final VarHandle THREAD;
private static final VarHandle WAITSTATUS;
static {
    try {
        MethodHandles.Lookup l = MethodHandles.lookup();
        NEXT = l.findVarHandle(Node.class, "next", Node.class);
        PREV = l.findVarHandle(Node.class, "prev", Node.class);
        THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
        WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
    } catch (ReflectiveOperationException e) {
        throw new Error(e);
    }
}

​ 这个就是CAS线程安全控制的变量,这个前面有介绍。

四、方法

1、getState()&setState(int newState)

protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

​ 获取&设置AQS的同步状态。

2、compareAndSetState(int expect, int update)

protected final boolean compareAndSetState(int expect, int update) {
    return STATE.compareAndSet(this, expect, update);
}

​ 同样是CAS设置变量

3、compareAndSetTail(Node expect, Node update)

private final boolean compareAndSetTail(Node expect, Node update) {
    return TAIL.compareAndSet(this, expect, update);
}

​ 通过CAS设置尾结点。

4、enq(Node node)

private Node enq(Node node) {
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return oldTail;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

​ 将这个节点入队列,也就是设置Node节点的prednext由此就建立了一个队列,同时这个入参节点的前置节点。

1)、逻辑分析

​ 这里我们可以看到是一个 for (;;),也就是说这个循环依赖return才能跳出。然后是判断有没有尾结点(也就是同步队列有没有初始化),如果为空,同步队列没有初始化就先通过initializeSyncQueue()方法去初始化同步队列。初始化后下次循环就能将这个节点添加到同步队列中了,通过compareAndSetTail(oldTail, node)node.setPrevRelaxed(oldTail)设置前置节点。

5、initializeSyncQueue()

private final void initializeSyncQueue() {
    Node h;
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        tail = h;
}

​ 同步队列,可以看到这里是通过CAS设置创建头节点,成功后再将其引用到tail尾结点。这里还有一个信息点就是,最初的HEAD头节点Node是空的,并没有设置线程。

Node() {}

6、addWaiter(Node mode)

private Node addWaiter(Node mode) {
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

​ 这个是新建一个节点node,然后将入参mode设置为该节点的nextWaiter(nextWaiter一般是配合Condition),再将节点node通过CAS添加到队尾。

7、setHead(Node node)

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

​ 将节点node设置为头节点,要注意head节点的thread是为null的。

8、acquire(int arg)

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

​ 这个就是获取锁。首先是尝试获取锁,如果获取成功就成功了,如果获取失败,则将其添加到获取队列中。入参arg是我们前面提到的用来设置status的,如果使用的例如ReentrantLock,则入参是默认的1。如果是CountDownLatch,就会将status设置为入参:

public class ReentrantLock implements Lock, java.io.Serializable {
    ..................
	public void lock() {
        sync.acquire(1);
    }
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);
        }

9、tryAcquire(int arg)

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

​ 尝试获取锁,可以看到这个目前是UnsupportedOperationException,交由子类来实现。我们找一个子类,非公平锁

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

​ 这里首先是判断当前AQSstatus

​ 如果为0,就表示其是正常状态,没有被线程占用,就可以通过compareAndSetState(0, acquires)设置status状态了,如果竞争成功设置,就再通过setExclusiveOwnerThread(current)设置占用线程为当前线程thread,然后就返回true表示回去锁成功。

​ 如果不为0就表示当前AQS还是被占用的状态,就通过current == getExclusiveOwnerThread()判断独占的线程是不是当前线程,如果不是,就返回false表示获取失败。如果是,就将状态累加int nextc = c + acquires修改status,然后返回true

10、acquireQueued(final Node node, int arg)

final boolean acquireQueued(final Node node, int arg) {
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

​ 这个就是将节点node添加到获取队列中,同时返回是否需要中断interrupted,可以看到这里是一个for (;;)

​ 获取节点node的前驱节点node.predecessor(),判断前驱节点是不是为head头节点,如果适合就再尝试tryAcquire(arg)获取锁,如果成功了就表示当前节点是head节点 了,就将该节点node通过setHead(node)设置为头节点。

​ 如果其潜在节点不为head节点或者tryAcquire(args)获取失败,就调用shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt(),看是否需要设置为interrupted中断状态。

11、shouldParkAfterFailedAcquire(Node pred, Node node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

​ 这个方法是判断需不需要调用LockSupport.park(this)方法来暂停当前线程,入参prednode的前驱节点。

​ 首先是ws == Node.SIGNAL,即判断前置节点是否在Node.SIGNAL状态等待,如果是,表示其后继节点node也需要等待,就直接返回true,然后就会调用parkAndCheckInterrupt()方法来park当前对象。

​ 如果ws > 0表示该前置节点pred已经被取消了,所以就需要再通过while循环,建立node的最新前置节点pred,再进行赋值pred.next = node,也就是重新整理Node链,去掉已经取消的节点。

​ 如果<=0则将前置节点设置为Node.SIGNAL

​ 下面这个if&else都是返回false,表示等待下次for(;;),下次由于已经设置了predNode.SIGNAL,所以就可以去调用parkAndCheckInterrupt()park当前对象了LockSupport.park(this)

12、parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

​ 这个方法我们通过acquireQueued方法可以知道其是在shouldParkAfterFailedAcquire之后被调用的,用来阻塞当前线程,然后判断其是不是被中断了。

13、cancelAcquire(Node node)

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null;
    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    Node predNext = pred.next;

    node.waitStatus = Node.CANCELLED;
    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        pred.compareAndSetNext(predNext, null);
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                pred.compareAndSetNext(predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}以上是关于JDK类库源码分析系列-AbstractQueuedSynchronizer的主要内容,如果未能解决你的问题,请参考以下文章

JDK类库源码分析系列2-AbstractQueuedSynchronizer-CountDownLatch

JDK类库源码分析系列2-AbstractQueuedSynchronizer-ReentrantReadWriteLock

JDK类库源码分析系列2-AbstractQueuedSynchronizer-Semaphore

JDK源码分析实战系列-ThreadLocal

源码分析系列1:HashMap源码分析(基于JDK1.8)

《JDK源码分析》相关系列目录(JAVA 小虚竹)