并发和多线程--AbstractQueuedSynchronizer基本原理
Posted huigelaile
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发和多线程--AbstractQueuedSynchronizer基本原理相关的知识,希望对你有一定的参考价值。
AbstractQueuedSynchronizer简称为AQS,AQS是ReentrantLock、CountdownLatch、CycliBarrier等并发工具的原理/基础,所以了解AQS的原理对学习J.U.C包很重要。
基本原理:
1.AQS中包含两种队列(FIFO),同步队列+条件队列,底层都是双向链表,也就是通过其内部的Node实现。
2.AQS有排他锁和共享锁两种模式,子类可以实现内部类选择实现一种,当然也可以通过两个内部类定义两种锁,例如ReentrantReadWriteLock,一个读锁,一个写锁。
3.子类通过对volatile修饰的state字段赋值,判断当前是否能够获取锁。
4.通过new ConditionObject()获得条件队列。
5.AQS定义了锁的框架,但是如何获取锁,释放锁等需要子类实现,AQS中默认抛出UnsupportOperationException。
类定义:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { }
AQS继承了AbstractQueuedSynchronizer类,这个类只包含一个属性,以及其get、set方法,主要用来记录当前获取锁的线程。
private transient Thread exclusiveOwnerThread;
基本属性:
//同步队列的头结点 private transient volatile Node head; //同步队列的尾结点 private transient volatile Node tail; //当前锁的状态,需要子类去实现,例如0表示可以获取锁,1表示当前锁被持有,无法获取(排他),共享锁模式下,state>0表示持有锁线程的数量 private volatile int state; //旋转超时阈值,设置等待超时时间才生效 static final long spinForTimeoutThreshold = 1000L;
这几个属性中,最重要的就是state,子类通过CAS机制进行赋值,保证其原子性。
条件队列属性:
public class ConditionObject implements Condition, java.io.Serializable { //条件队列的头结点 private transient Node firstWaiter; //条件队列的尾结点 private transient Node lastWaiter; }
条件队列就是通过ConditionObject得到,其实现了Condition接口,Condition内部定义了一些抽象方法,如await()、signal()、signalAll(),相当于Object中的wait、notify、notifyAll。我们看到这两种队列使用的Node,所以下面一起看看Node的定义。
Node属性:
static final class Node { //标记当前node为共享模式 static final Node SHARED = new Node(); //标记当前node为独占模式 static final Node EXCLUSIVE = null; //当前节点的状态,通过waitStatus控制节点的行为,同步队列初始为0,而同步队列节点初始为-2,下面就是几种取值 volatile int waitStatus; //当前节点被取消,属于无效节点 static final int CANCELLED = 1; //当前节点的后继节点将要/已经被阻塞,在当前节点release的时候需要unpark后继节点 static final int SIGNAL = -1; //当前节点处于条件队列 static final int CONDITION = -2; //共享模式下释放锁,应该传播到其他节点 static final int PROPAGATE = -3; //当前节点的前一个节点 volatile Node prev; //当前节点的后一个节点 volatile Node next; //当前节点持有的线程,head不保存Thread,只是保存其后继节点的引用 volatile Thread thread; //等待队列的中表示下一个节点,如果是同步队列,只是表示当前节点处于共享模式还是独占模式 Node nextWaiter; }
我们通过Lock.lock()获取锁的时候,根据定义决定调用acquire()还是tryAcquire(),这两个方法都是获取排他锁的,下面看看具体实现细节。
acquire()获取排他锁:
public final void acquire(int arg) { //tryAcquire:在AQS中默认抛出异常,需要子类去实现,子类一般选择非public的内部类去实现AQS,表示当前是否获取到锁,如果获取到锁,直接结束执行 //addWaiter:当前没有获取到锁,将线程添加到同步队列尾部 //acquireQueued:阻塞当前节点 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //自我打断 selfInterrupt(); }
上面注释中对acquire()中每一个方法的基本描述,我们可以简单实现tryAcquire()。
tryAcquire():
// 尝试获取锁,如果状态为0,获取到锁,更新state为1(1代表exclusive模式下锁已经被持有) public boolean tryAcquire(int acquires) { //CAS实现赋值 if (compareAndSetState(0, acquires)) { //将当前线程set到AbstractOwnableSynchronizer中的exclusiveOwnerThread,为了方便跟踪获得锁的线程 // ,可以帮助监控工具识别哪些线程持有锁。 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
上面是一种tryAcquire的基本实现,通过CAS对state进行赋值,0表示当前可以获取锁,1表示当前锁被别的线程持有,无法获取。如果返回true,直接结束。如果返回false,会有后续流程。
addWaiter():
//acquire方法为EXCLUSIVE独占锁模式 addWaiter(Node.EXCLUSIVE); private Node addWaiter(Node mode) { //将当前线程,封装成一个EXCLUSIVE模式的节点 Node node = new Node(Thread.currentThread(), mode); //取出尾结点 Node pred = tail; // if (pred != null) { //tail赋值给当前节点的prev node.prev = pred; //如果CAS把当前node变成tail节点,返回当前节点 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果一次没有把当前节点放到队列中,进入自旋enq() enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; //如果tail 为null,通过CAS进行head、tail的初始化 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else {//和之前的操作一样,只是外部是for(;;)保证入队成功 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; //注意这里return 的是当前节点的前一个节点,和上面不一样。 return t; } } } }
通过上面的代码将新线程封装成一个新的节点,进行入队操作,但是不知道为啥返回当前节点或者其前一个节点,我也不知道为啥这样,麻烦大佬留言告知。
acquireQueued():
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //p为当前节点的前一个节点 final Node p = node.predecessor(); //如果p为head,也就是当前节点为head的后继节点,就会尝试获取锁,如果成功,将node设置为head,直接返回 if (p == head && tryAcquire(arg)) { //把head节点的thread赋值为null,head节点永远是空节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果当前node不是head的后继节点,将前一个节点的waitStatus设置为signal,并且阻塞自己 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //如果上面失败,取消正在进行中的acquire if (failed) cancelAcquire(node); } }
acquireQueued方法从命名上看到,排队获取,主要逻辑就是讲当前节点的前一个节点waitStatus设置为signal,然后阻塞自己。通过parkAndCheckInterrupt()将自身阻塞的。
shouldParkAfterFailedAcquire():
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //ws为当前节点的前驱节点waitStatus int ws = pred.waitStatus; //如果ws已经是signal,直接返回 if (ws == Node.SIGNAL) return true; //如果ws>0,也就是canceld状态,往前面遍历,知道找到前面不是canceld状态的节点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //通过CAS将ws设置为signal状态 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //通过LockSupport.park挂起线程,直到被唤醒,返回是否interrupt private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt()将自己阻塞,当被唤醒的时候,仍然在for(;;)内部自旋尝试获取锁。独占模式获取锁的流程,大概是这样。
release()释放排它锁:
public final boolean release(int arg) { //尝试去释放锁,方法由子类实现 if (tryRelease(arg)) { Node h = head; //当前队列head不为null,外套Status不是初始状态 if (h != null && h.waitStatus != 0) //唤醒head的后继节点 unparkSuccessor(h); return true; } return false; }
tryRelease()由子类实现,我们看一下ReentrantLock的乐观锁实现。
//unlock方法调用,参数为1 protected final boolean tryRelease(int releases) { int c = getState() - releases; //如果当前节点和AbstractOwnableSynchronizer保存的线程不相同,直接抛出异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //如果当前state状态就是1 if (c == 0) { free = true; //将独占线程设置为null,返回true setExclusiveOwnerThread(null); } //将c赋值state setState(c); return free; }
对于release来说,参数为1,只有当前state为1的状态,返回true,执行后续唤醒操作。
unparkSuccessor():
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
以上是关于并发和多线程--AbstractQueuedSynchronizer基本原理的主要内容,如果未能解决你的问题,请参考以下文章