多线程-AbstractQueuedSynchronizer(AQS)
Posted 小路不懂2
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程-AbstractQueuedSynchronizer(AQS)相关的知识,希望对你有一定的参考价值。
概述
从使用者的角度,AQS的功能可分为两类:独占功能和共享功能。它的子类中,要么实现并使用了它独占功能的API,要么使用了共享锁的功能,而不会同时使用两套API,即使是它的子类ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套API来实现的。
// AQS主要的属性,接口
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 同步状态值 private volatile int state;
// 队列头结点,一般获取锁的线程在头结点 private transient volatile Node head;
// 队尾节点 private transient volatile Node tail;
// 队列缓存对象 static final class Node {}
// 获取独占资源,模板方法 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
// 子类实现 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
// 释放独占资源,模板方法 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
// 释放资源,子类实现 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
// 获取共享资源,模板方法 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
// 获取共享资源,子类实现 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
// 释放共享资源,模板方法 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
// 释放共享资源,子类实现 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
// 是否独占?子类实现 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); } }
AQS是一个典型的模板方法模式:AQS定义模板方法,子类实现具体的方法。
模板方法模式:定义一个操作中算法的框架,而将一些步骤延迟到子类中。模板方法模式使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。
模板方法模式是一种基于继承的代码复用技术,它是一种类行为型模式。
类图
本文从AQS两个典型的应用类ReeantrantLock和CountDownLatch分别介绍独占和共享两个模式。
CAS操作
JDK中很多的线程安全操作依赖于CAS + 循环的无锁模式,AQS中大量使用。
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } /** * 设置头结点,期望值为null */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * 设置尾节点 */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * 设置一个节点的waitStatus */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } /** * 设置一个节点的next */ private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); }
ReentrantLock
ReentrantLock会保证在同一时间只有一个线程在执行这段代码,或者说同一时刻只有一个线程的lock方法会返回,其他线程会被挂起,直到获取锁。其实ReentrantLock实现的就是一个独占锁的功能。
ReentranctLock内部有一个抽象静态类Sync,继承自AQS,重写了tryRelease和isHeldExclusively方法。
ReentranctLock内部还有FairSync和NonfairSync两个类分别代表公平锁和非公平锁。
公平锁:每个线程抢占锁的顺序为先后调用lock方法的顺序依次获取锁,类似于排队。
非公平锁:每个线程抢占锁的顺序不定,谁运气好,谁就获取到锁。
公平锁
调用lock方法,由子类调用对应的lock方法,这个lock为Sync类里面的抽象方法,由子类FairSync和NonfairSync实现。
对于FairSync实现如下:
acquire方法其实是AQS里面的模板方法:
tryAcquire是子类需要实现的方法,addWaiter方法是将没有获取到锁的线程包装成Node对象入队列。
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 如果state == 0 标示锁未被占用 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 如果队列中没有其他线程,说明没有线程正在占用锁,此处acquire=1 // 通过CAS操作将状态更新,如果成功标示获取锁,后续再有调用都会因为期望值不为0而失败。
setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 可重入锁,如果当前线程已经获取了锁 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
如果成功获取锁返回true,否则返回false并将当前的线程放到队列中调用addWaiter(Node.EXCLUSIVE), arg),在此之前,先看看Node数据结构:
static final class Node { // 节点类型:EXCLUSIVE和SHARED // 标示是共享节点 static final Node SHARED = new Node(); // 标示以独占节点等待 static final Node EXCLUSIVE = null; // 节点状态: SIGNAL, CANCELLED, CONDITION, PROPAGATE // 线程(节点)已经取消,由于超时或interrupt,节点到此状态就不在变化,同时线程也不会再次被阻塞。 static final int CANCELLED = 1; // 后续线程需要唤醒,当前节点的后继节点已经阻塞(或即将阻塞),当前节点在释放或取消之前必须唤醒后续节点 static final int SIGNAL = -1; // 线程在等待条件 static final int CONDITION = -2; // 标示无条件传播。releaseShared应该被传播给其他节点,这个只允许设置头结点 static final int PROPAGATE = -3; // 节点状态;可取值:SIGNAL, CANCELLED, CONDITION, PROPAGATE, 0;非负值标示不需要signal,初始化为0.
// 描述节点的状态,AQS队列中,在有并发时,肯定会保存一定数量的节点,每个节点代表了一个线程的状态,有的线程可能等不急获取锁
// 需要放弃竞争,退出队列,有些线程在等待一些条件满足后恢复执行。 volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; // 线程 Node nextWaiter;// 类型 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } }
private Node addWaiter(Node mode) {
// 创建节点 Node node = new Node(Thread.currentThread(), mode); // 如果队列不空,则CAS直接入队列,如果失败了,则自旋入队列直到成功。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node);// 入队列 return node; }
将节点放入队列之后,还需要将线程挂起,这个由acquireQueued来完成。
AQS的队列结构如下,队列由Node类型的节点组成,其中至少有两个变量:一个封装线程,一个封装节点类型。第一次插入节点时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点。
// 自旋入队列
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize,如果队列为空,则创建一个空节点 if (compareAndSetHead(new Node())) tail = head; } else { // 入队列,不成功,则在循环,成功则退出 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();
// 如果当前节点的pre节点是头节点,则说明它是第一个等待锁的节点(head是当前持有锁的节点)因此尝试获取锁 if (p == head && tryAcquire(arg)) { setHead(node); // 如果获取锁成功,则移除head节点,当前节点变成头节点 p.next = null; // help GC failed = false; return interrupted; }
// 检测前一个节点的状态,看当前线程是否需要挂起;只有当前节点的前一个节点为SIGNAL时,当前节点才能挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// 挂起 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
至此,一个线程对于锁的一次竞争结束:(1)成功(2)进入队列被挂起。
如果线程被挂起,则等待下次被唤醒后继续循环尝试获取锁,AQS队列为FIFO队列。
下面看看释放锁流程:
调用AQS的release:
对于ReentrantLock调用的tryRelease方法都是在Sync里面实现的:
protected final boolean tryRelease(int releases) { int c = getState() - releases;
// 如果释放锁的线程和占用锁的线程不是同一个,抛出非负监视器异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false;
// 因为可重入锁的原因,不是每次释放锁c都等于零,直到最后一次释放锁时,才通知AQS不需要记录占用锁的线程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
如果队列不空,则需要唤醒后续的等待节点:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 设置waitStatus为0,以后该状态就不会改变了 /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 如果头结点的后继节点被取消或为null,则反向遍历,寻找未取消的节点作为即将唤醒线程 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
非公平锁
与公平锁唯一的区别就是获取锁的方式不同,非公平锁是抢占式的获取锁。
直接CAS修改一次state(线程抢占获取锁)如果成功返回,否则排队。
CountDownLatch
从CountDownLatch角度看看AQS的共享功能。在开始解读AQS的共享功能前,我们再重温一下CountDownLatch,CountDownLatch为java.util.concurrent包下的计数器工具类,常被用在多线程环境下,它在初始时需要指定一个计数器的大小,然后可被多个线程并发的实现减1操作,并在计数器为0后调用await方法的线程被唤醒,从而实现多线程间的协作。
// CountDownLatch的全部源代码 public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } // 重写AQS protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 重写AQS protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; // 构造函数,设置计数器个数 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); // 调用AQS的releaseShared方法 } public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
使用步骤
(1)一般通过new CountDownLatch(count)创建CountDownLatch对象,count为计数器个数
然后调用Sync构造函数:
setState方法为AQS中设置state值得方法,而status是同步状态值:
(2)主线程调用await方法等待计数器为零,这一步相当于申请一个锁。
acquireSharedInterruptibly是AQS定义的模板方法,从方法名上看,这个方法的调用是响应线程的打断的,所以在前两行会检测线程是否被打断,接着尝试获取共享锁。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) // 如果-1标示获取失败,则需要阻塞,否则直接退出方法 doAcquireSharedInterruptibly(arg); } // 如果state为0,即计数器为0,则返回1,否则-1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 在队列中增加共享模式的节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 如果当前节点的pre节点是head节点,则当前节点是第一个等待获取锁的节点,则试着获取 if (p == head) { int r = tryAcquireShared(arg); // 返回1或-1 if (r >= 0) { // 获取成功,设置头结点,并且传播(有可能很多线程调用await方法阻塞,需要一个个通知) setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 设置signal,阻塞线程,与独占模式相同 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus;
// 如果当前节点是SIGNAL意味着他正在等待一个信号,因此做2件事:(1)重置waitState(2)唤醒下一个节点 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); }
// 如果本身是重置状态,则设置其状态为PROPAGATE,意味着需要将状态向后一个节点传播 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
(3)各个线程调用countDown方法减一,相对于释放一个锁:
releaseShared方法为AQS实现的模板方法
tryReleaseShared由CountDownLatch.Sync类实现:
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) // 如果已经释放了,则返回失败 return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
如果state的值为0,在CountDownLatch中意味:所有的子线程已经执行完毕,这个时候可以唤醒调用await()方法的线程了,而这些线程正在AQS的队列中,并被挂起的,所以下一步应该去唤醒AQS队列中的头结点了(AQS的队列为FIFO队列),然后由头节点去依次唤醒AQS队列中的其他共享节点。如果tryReleaseShared返回true,进入doReleaseShared()。
当线程被唤醒后,会重新尝试获取共享锁,而对于CountDownLatch线程获取共享锁判断依据是state是否为0,而这个时候显然state已经变成了0,因此可以顺利获取共享锁并且依次唤醒AQS队里中后面的节点及对应的线程。
总结
以上的分析都是从AQS子类的角度去看待AQS的部分功能的,而如果直接看待AQS,或许可以这么去解读:
首先,AQS并不关心“是什么锁”,对于AQS来说它只是实现了一系列的用于判断“资源”是否可以访问的API,并且封装了在“访问资源”受限时将请求访问的线程的加入队列、挂起、唤醒等操作, AQS只关心“资源不可以访问时,怎么处理?”、“资源是可以被同时访问,还是在同一时间只能被一个线程访问?”、“如果有线程等不及资源了,怎么从AQS的队列中退出?”等一系列围绕资源访问的问题,而至于“资源是否可以被访问?”这个问题则交给AQS的子类去实现。
当AQS的子类是实现独占功能时,例如ReentrantLock,“资源是否可以被访问”被定义为只要AQS的state变量不为0,并且持有锁的线程不是当前线程,则代表资源不能访问。
当AQS的子类是实现共享功能时,例如:CountDownLatch,“资源是否可以被访问”被定义为只要AQS的state变量不为0,说明资源不能访问。这是典型的将规则和操作分开的设计思路:规则子类定义,操作逻辑因为具有公用性,放在父类中去封装。当然,正式因为AQS只是关心“资源在什么条件下可被访问”,所以子类还可以同时使用AQS的共享功能和独占功能的API以实现更为复杂的功能。
比如:ReentrantReadWriteLock,我们知道ReentrantReadWriteLock的中也有一个叫Sync的内部类继承了AQS,而AQS的队列可以同时存放共享锁和独占锁,对于ReentrantReadWriteLock来说分别代表读锁和写锁,当队列中的头节点为读锁时,代表读操作可以执行,而写操作不能执行,因此请求写操作的线程会被挂起,当读操作依次推出后,写锁成为头节点,请求写操作的线程被唤醒,可以执行写操作,而此时的读请求将被封装成Node放入AQS的队列中。如此往复,实现读写锁的读写交替进行。
综上所述,本系列文章从AQS独占锁和共享锁两个方面深入分析了AQS的实现方式和独特的设计思路。
以上是关于多线程-AbstractQueuedSynchronizer(AQS)的主要内容,如果未能解决你的问题,请参考以下文章