AQS之lock源码分析

Posted 码农后勤部部子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS之lock源码分析相关的知识,希望对你有一定的参考价值。

文章来源:若水


AQS(AbstractQueuedSynchronizer)是JAVA中众多锁 以及并发⼯具的基础,其底层采⽤ 乐观锁,⼤量使⽤了CAS操作,并且在冲突时,采⽤⾃旋⽅式重试。AQS 虽然被定义为抽象类,但事实上它并不包含任何抽象⽅法,AQS将⼀些需要⼦类覆写的⽅法都设计成protect⽅法(即当前类和⼦类可以调⽤),将其默认实现抛出UnsupportedOperationException异常,如果⼦类使⽤这些⽅法,但是没有覆写,则会抛出异常。如果⼦类没有使⽤到这些⽅法,则不需要任何操作。


并发⼯具套路:状态,队列,CAS状态:由于状态是全局共享,⼀般会被设置成volatile 类型,以保证其修改的可⻅性。队列:队列通常⼀个等待的集合,⼤多数以链表的形式表现,队列采⽤是悲观锁的思想,表示当前所等待的资源、状态或条件短时间内⽆法满⾜,因此,会将当前线程包装成某类型的数据结构,扔到⼀个等待队列中,当⼀定条件满⾜后,再从等待队列中取出。CAScas操作是最轻量的并发处理,通常我们对于状态的修改都会⽤到CAS操作,因为状态可能被多个线程同时修改,CAS操作保证同⼀时刻,只有⼀个 线程能修改成功,保证线程安全,CAS操作基本由Unsafe⼯具类的compareAndSwapXXX来实现的,cas采⽤ 乐观锁的思想,因此伴随⾃旋,如果发现⽆法成功,则不断重试,直到成功,⾃旋的表现形式通常 是死循环 for( ;; )


AQS - 状态

AQS - 状态/** * The synchronization state. */private volatile int state;

独占锁 同⼀时刻,锁只能被⼀个线程所持有,通过state变量是否为0,判断当前锁是否被占⽤。

在监视器锁中,我们⽤ObjectMonitor对象的_owner属性记录了当前拥有监视器锁的线程。

AQS中,我们通过 exclusiveOwnerThread 属性:

/** * The current owner of exclusive mode synchronization. */private transient Thread exclusiveOwnerThread;

// 继承⾃ AbstractOwnableSynchronizer

setExclusiveOwnerThread(Thread.currentThread());

exclusiveOwnerThread 属性值即为当前持有锁的线程。

队列

AQS中,队列的实现是⼀个双向链表,被称为sync queue表示所有等待锁的线程集合,有点类似于synchronized 原理的wait set

并发编程中使⽤队列 将当前线程包装成某类型的数据结构 扔到等待队列中,

static final class Node {……….volatile Thread thread;// 节点所代表的线程// 双向链表,每个节点需要保存⾃⼰的前驱节点和后继节点的引⽤volatile Node prev;volatile Node next;// 线程所处的等待锁的状态, 初始化,该值为0volatile int waitStatus;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needsunparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;/** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */static final int PROPAGATE = -3;// 该属性⽤于条件队列或者共享锁Node nextWaiter; …………. }

说明:Node类中有⼀个状态变量waitStatus,表示当前Node所代表的线程的等待锁状态。在独占锁模式下,我们只需要关注CANCELLED SIGNAL 两种状态(CANCELLED 线程被取消了,SIGNAL 线程需要被唤醒,CONDITION 线程在条件队列⾥⾯等待,PROPAGATE 释放共享资源时需要通知其他节点)。nextWaiter 属性,它在独占锁模式下永远为nullAQS队列 是双向链表,需要头节点和尾节点:

// 头节点,不代表任何线程,是⼀个哑节点private transient volatile Node head; // 尾节点,每⼀个请求锁的线程会加到队尾private transient volatile Node tail;

AQS之lock源码分析

AQS中队列是⼀个CLH队列,它的head节点永远是⼀个哑节点(dummy node),不代表任何线程(某些情况下可以看作代表了当前持有锁的线程),因此head所指向的Nodethread属性永远是null

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}

说明:

        ⼊队操作:当⼀个线程获取锁失败后该线程会被转换成Node 节点,使⽤enq(fifinal Node node) ⽅法将该节点插⼊到AQS阻塞队列⾥。

compareAndSetHead(new Node()) 初始化head (哑节点,不代表任何线程即thread属性永远是null 

当前 线程没有抢到锁被封装成Node扔到队列中,即使队列是空的,也不会排在第⼆个。

总结

thread: 表示当前Node所表示的线程

waitStatus:表示节点所处的等待状态,共享锁模式只需关注三种状态:SIGNAL=-1 

       CANCELLED=1 初始态(0)

prev next: 节点的前驱和后继

nextWaiter:进作为标记,值永远null, 表示当前处于独占锁模式


CAS 操作

CAS⽤来改变状态,⼀般 在静态代码块中初始化需要CAS操作的属性的偏移量

private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;static { try { stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } }

以上代码可以看出,CAS操作主要针对5个属性,包括AQS三个属性 statusheadtail,以及node对象的两个属性waitStatusnext


cas 操作

/** * CAS head field. Used only by enq. */private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null,update); }/** * CAS tail field. Used only by enq. */private final boolean compareAndSetTail(Node expect, Node update){ return unsafe.compareAndSwapObject(this, tailOffset, expect,update); }/** * CAS waitStatus field of a node. */private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); }/** * CAS next field of a node. */private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect,update); }


AQS 核⼼属性

1)锁相关的属性有两个:

/** * The synchronization state. */private volatile int state; // 锁的状态/** * The current owner of exclusive mode synchronization. */private transient Thread exclusiveOwnerThread; // 当前持有锁的线程,这个属性继承 AbstractOwnableSynchronizer

2) sync queue相关属性, 两个

private transient volatile Node head; // 队头 为dummy nodeprivate transient volatile Node tail; // 队尾 新⼊队的节点

3)队列中的Node中需要关注属性有三组:

volatile Thread thread;// 节点所代表线程// 双向链表,每个节点需要保存⾃⼰的前驱节点和后继节点的s引⽤volatile Node prev;volatile Node next;// 线程所处的等待锁的状态,初始化时,值为0volatile int waitStatus;/** waitStatus value to indicate thread has cancelled */static final int CANCELLED = 1;/** waitStatus value to indicate successor's thread needsunparking */static final int SIGNAL = -1;/** waitStatus value to indicate thread is waiting on condition */static final int CONDITION = -2;

获取锁

ReentrantLock lock = new ReentrantLock();lock.lock();//3

获取 ⾮公平锁

static final class NonfairSync extends Sync { private static final long serialVersionUID =7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }}

代码说明:

如果cas状态status 0->1 成功,则设置当前持有锁的线程。

如果cas失败,则

*/public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

代码说明:如果获取锁失败,添加Nodesync queue 队列

tryAcquire 由⼦类实现,获取锁的 具体逻辑

final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}

代码说明:

获取当前锁状态

若锁状态=0 

CAS锁状态status 0-> 参数acquires = 1,若true,则设置当前持有锁线程,返回true

若锁 状态 != 0

判断 当前线程 == 当前持有锁线哼,若true,则 判断最新锁状态值 = 锁状态 + 参 数acquires 值与0的⼤⼩,若 ⼩于,则抛出错误 “超过最⼤锁计数

设置新锁状态值 返回true

其他返回false,即 没有获取到锁


addWaiter(Node node)

该⽅法由AQS实现,负责在获取锁失败后调⽤,将当前请求锁的线程包装成Node扔 到sync queue 中,并返回Node

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) {// 如果队列不为空,则⽤CAS⽅式将当前节点设为尾节点 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

代码说明:

              创建Node实例(当前线程,mode = Node.EXCLUSIVE ) 

newNode

               获取队列最后⼀个 pred

               如果队列最后 ⼀个不为空,则 newNode的 前驱节点为predcas队列最后Node,成功后pred 的后驱节点为newNode,即 ⽤CAS⽅式将当前节点设为尾节点,返回 newNode

pred == null 或者 casTail 失败后,enq(node) 将节点插⼊队列,返回节点newNode可⻅ 每⼀个处于独占模式下的节点,它的nextWaiter ⼀定是null

enq

能执⾏这个⽅法,说明当前线程获取锁已经失败了,封装成Node 放进等待队列中,

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; // 赋值后没有返回 } else { // 尾分叉 到这⾥说明队列已经不为空,再继续尝试将节点加到队 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t;       } } }}

代码说明: 

              for(;;) 是死循环

              获取队尾节点

              若为空,则CAS设置head,哑节点没有thread属性赋值,head节点赋值给尾节点,并没有⽴即返回

尾分叉

若不为空,参数node节点的前驱指向尾节点, 将参数node节点cas设置尾节点 直到成功, 原尾节点的后继指向参数节点node

当⼤量线程同时⼊队,同⼀时刻,只有⼀个线程完成,所以出现尾分叉将⼀个节点添加到sync queue的末尾也需要 三步:

1 node的前驱节点为当前的尾节点:node.prev = t

修改tail属性,使它指向当前节点

修改原来的尾节点,使它的next指向当前节点


因为这三部不是原⼦操作,第⼀步容易成功,第⼆步由⼀个cas操作,并发条件下会失败,很有可能会出现完成第⼆步 没有完成第三步时 即 将新的节点设置成尾节点,此时原来旧的尾节点的next值可能还是null如果此时有线程恰巧从头节点开始向后遍历链表,则遍历不到新加尾节点。从尾节点遍历有值。

⾄于分叉⼊队失败的节点,继续尝试CAS操作,最终都会通过⾃旋尝试⼊队

addWaiter总结

addWaiter(Node.EXCLUSIVE)⽅法最终返回了代表当前线程的Node节点,在返回的那⼀刻,这个节点必然是当时sync queue的尾节点


acquireQueued

1)能执⾏到该⽅法,说明addWaiter⽅法已经成功将封装了当前的Thread节点添加到等待队列的队尾

2)该⽅法中再次尝试获取锁

3)在再次尝试获取锁失败后,判断是否需要把当前线程挂起

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}

代码说明:

死循环 for(;;)

获取队列的当前驱节点。

如果 前驱节点是head 且 尝试获取锁 ,若成功,则 设置head 将当前 赋值给head,线程置为空,节点的前驱置为null,节点的后继也置为null,返回中断表示false

为什么前⾯获取锁失败,这⾥还要再次尝试获取锁?

再次尝试获取锁是基于⼀定条件:当前节点的前驱节点是HEAD节点。head 节点是个哑节点,不代表任何线程,或者代表持有锁的线程,如果当前节点的前驱节点是head节点,说明当亲节点已经排在整个队列最前⾯


shouldParkAfterFailedAcquire

该⽅法⽤于决定在获取锁失败后,

是否将线程挂起判断依据是前驱节点的waitStatus值


waitStatus值的状态:

static final int CANCELLED = 1;static final int SIGNAL = -1;static final int CONDITION = -2;static final int PROPAGATE = -3;

⼀共初始化值是0,不属于上⾯任何⼀种状态。


CANCEL四种状态,独占锁的获取操作,⽤到CANCELLED  SIGNALLED 状态 ,表示Noded 的当前线程已经取消了排队,即放弃获取锁。


SIGNAL 状态 不是表征当前节点状态,⽽是当前节点的下⼀个节点的状态。当⼀个节点的waitStatus被置为SIGNAL, 说明下⼀个节点(即后继节点已经被挂起了(或者),因此当前节点释放或者放弃取锁时,如果它的waitStatus属性为SIGNAL,还有完成⼀个额外操作-唤起 它的后继节点。signal这个状态的设置常常不是 节点⾃⼰给⾃⼰设的。是后继节点设置的。


private static boolean shouldParkAfterFailedAcquire(Node pred,Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}

代码说明:

获取 前驱节点的waitStatus的状态 ws

ws等于Node.SIGNAL 即 前驱节点的状态已经是SIGNAL,说明闹钟已经没了,可以直接睡了返回。

ws⼤于0, 即状态为CANCELLED,说明前驱节点已经取消了等待锁(由于超时或者中断等原因)

既然前驱节点不等了,继续往前找,直到找到⼀个还在等待锁的节点。

然后我们跨过这些不等待的节点,直接排到等待锁的节点后⾯


else 前驱节点状态不是SIGNAL,也不是CANCELLED,⽤CAS设置前驱节点的wsNode.SIGNAL, 给⾃⼰设定⼀个闹钟


parkAndCheckInterrupt

将线程挂起,等待被唤醒

private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//线程被挂起,停下不往下执⾏ return Thread.interrupted(); }

注意:LockSupport.park 执⾏完成后线程就被挂起,除⾮其他线程unpark了当前线程,或者当前线程被中断,,否则代码是不会往下执⾏的。


总结:

AQS 中⽤statuss 属性表示锁,如果能成功将state属性通过CAS操作从0->1 获取锁获取锁的线程才能将exclusiveOwnerThreads 设置⾃⼰

addWaiter负责将当前等待锁的线程包装 成Node,并成功地添加到队尾,这⼀点由它调⽤的enq⽅法保证,enq⽅法同时还负责在队列

acquireQueued⽅法⽤于Node成功⼊队,继续尝试获取锁(取决于Node的前驱节点是不是head),或者将线程挂起

shouldParkAfterFailedAcquire ⽅法⽤于保证当前线程的前驱节点的waitStatus属性为SIGNAL,从⽽保证⾃⼰挂起后,前驱节点会负责在合适的时候唤醒⾃⼰

parkAndCheckInterrupt⽅法⽤于挂起当前线程,并检查中断状态如果最终成功获取锁,线程会从lock() ⽅法返回,继续往下执⾏,否则 线程会阻塞等待


往期推荐



END



更多程序员职场内容、面经分享、大厂信息
持续关注部部子......