JUC-AQS源码分析
Posted 玩葫芦的卷心菜
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC-AQS源码分析相关的知识,希望对你有一定的参考价值。
文章目录
前置知识
- CAS
- 可重入锁
- LockSupport
一、介绍
队列同步器(AbstractQueuedSynchronizer)是用来构建锁和其他同步组件的基础框架,他使用一个int成员变量表示同步状态,通过内置的FIFO来完成资源获取线程的排队工作。CountDownLatch、CyclicBarrier、Semaphore等常用api,涉及到了锁的控制,都继承了AQS。
同步器使用的主要方法是继承,子类通过继承同步器并实现他的抽象方法来管理状态,在此过程中会控制状态的改变
二、AQS的重要元素
state:锁的占用状态码,0表示未被占用
head和tail表示AQS队列的头和尾
其中,还有静态内部类Node,用来封装线程
三、场景:
银行办理该业务的只有一个窗口(一把锁),来了三个客户A、B、C来办理该业务,A先来窗口给A办理业务(需要很长时间),所以B和C在银行安排的等待区等待A办理完
ReentrantLock lock=new ReentrantLock();
new Thread(()->
lock.lock();
try
try
TimeUnit.MINUTES.sleep(20);
catch (InterruptedException e)
e.printStackTrace();
finally
lock.unlock();
,"A").start();
new Thread(()->
lock.lock();
try
finally
lock.unlock();
,"B").start();
new Thread(()->
lock.lock();
try
finally
lock.unlock();
,"C").start();
public void lock()
sync.lock();
四、源码分析
用的是非公平锁的获取锁和释放锁进行分析
1、NonfairSync实现sync的lock方法
final void lock()
if (compareAndSetState(0, 1))//如果锁未被占用,则更改占用状态1
setExclusiveOwnerThread(Thread.currentThread());//将当前线程设置为占用线程
else
acquire(1);
A第一次来窗口
compareAndSetState(0, 1),因为锁初始为未占用状态(state为0),比较相同后替换为1(表示锁被占用)
比较并替换成功执行setExclusiveOwnerThread(Thread.currentThread()),设置占用线程为当前线程(占用锁)
接着B来
compareAndSetState(0, 1),因为锁被占用,state为1,比较失败执行acquire(1)
public final void acquire(int arg)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
1.1、NonfairSync实现AbstractQueuedSynchronizer的tryAcquire方法
protected final boolean tryAcquire(int acquires)
return nonfairTryAcquire(acquires);
final boolean nonfairTryAcquire(int acquires)
final Thread current = Thread.currentThread();
int c = getState();//获取占用状态
//针对可能A刚刚结束的情况
if (c == 0) //没有被占用,尝试获取锁
if (compareAndSetState(0, acquires)) //替换占用状态码
setExclusiveOwnerThread(current);//设置占用线程
return true;//获取成功返回,失败返回最底的false
//已经被占用了,判断当前线程是否为锁占用线程
//如果是的话,可重入
else if (current == getExclusiveOwnerThread())
int nextc = c + acquires;//累加
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);//设置占用状态码
return true;
return false;
非公平锁首先尝试获取锁
获取成功替换状态码和设置占用线程,并且返回true
public final void acquire(int arg)
//该条件直接结束
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
线程B尝试获取锁成功,直接退出
获取失败,判断当前线程是否为占用线程
如果是,可重入状态码累加
不是,返回false
1.2、acquire的下一条件acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
如果尝试获取锁失败、可重入失败,判断下一条件acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
C也同样,因为是非公平锁,先尝试获取和判断可重入
public final void acquire(int arg)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
1.2.1、AbstractQueuedSynchronizer的addWaiter方法
private Node addWaiter(Node mode)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null)
node.prev = pred;
if (compareAndSetTail(pred, node))
pred.next = node;
return node;
enq(node);
return node;
node封装B线程得B结点
Node pred = tail;当前AQS队列中为空
pred == null,所以执行enq(node),添加B结点
private Node enq(final Node node)
for (;;)
Node t = tail;
if (t == null) // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
else
node.prev = t;
if (compareAndSetTail(t, node))
t.next = node;
return t;
自旋
Node t = tail;获取尾结点为空,进入if,判断compareAndSetHead(new Node())
private final boolean compareAndSetHead(Node update)
return unsafe.compareAndSwapObject(this, headOffset, null, update);
比较AQS队列的头结点是否为空,是的话更新为new Node();
该结点为哨兵结点
比较并替换头结点成功,尾结点指向头结点
继续自旋
发现尾结点为哨兵结点,不为空,进入else
else
node.prev = t;
if (compareAndSetTail(t, node))
t.next = node;
return t;
node为B结点
B结点的前置指向尾结点(当前为哨兵)
compareAndSetTail(t, node)
private final boolean compareAndSetTail(Node expect, Node update)
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
比较尾结点是否为尾结点并替换为B结点
比较并替换成功t.next=node
C线程进入同样
最终返回哨兵结点并退出自旋
1.2.2、acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addWaiter传回哨兵结点,赋给node,arg为1
final boolean acquireQueued(final Node node, int arg)
boolean failed = true;
try
boolean interrupted = false;
for (;;)
final Node p = node.predecessor();//获取前置结点
if (p == head && tryAcquire(arg)) //是否为头结点,并尝试获取锁
setHead(node);//回收掉哨兵结点,然后将哨兵结点的下一节点更改为哨兵
p.next = null; // help GC
failed = false;
return interrupted;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
finally
if (failed)
cancelAcquire(node);
for (;;)
final Node p = node.predecessor();
if (p == head && tryAcquire(arg))
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
自旋
哨兵结点的前置结点为头结点,尝试获取锁成功就直接结束自旋,返回false
public final void acquire(int arg)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
线程获取到锁直接结束
如果获取失败
1.2.2.1、shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()
p为头结点,node为哨兵
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
int ws = pred.waitStatus;//获取等待码,结点都默认为0
if (ws == Node.SIGNAL)//如果为等待资源-1
return true;
if (ws > 0)
do
node.prev = pred = pred.prev;
while (pred.waitStatus > 0);
pred.next = node;
else
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
return false;
结点waitStatus都默认为0,进入else执行compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update)
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
比较是否为哨兵结点,并将waitStatus替换为Node.SIGNAL(等待资源码,表示进入阻塞)
比较并替换后,结果返回为false,if结束,继续自旋
for (;;)
final Node p = node.predecessor();
if (p == head && tryAcquire(arg))
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
再次判断头结点和尝试获取锁
失败就继续shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
int ws = pred.waitStatus;//获取等待码,结点都默认为0
if (ws == Node.SIGNAL)//如果为等待资源-1
return true;
if (ws > 0)
do
node.prev = pred = pred.prev;
while (pred.waitStatus > 0);
pred.next = node;
else
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
return false;
但是这次进入后,waitStatus已经改变为Node.SIGNAL,返回true,进入下一判断parkAndCheckInterrupt()
阻塞park等待
private final boolean parkAndCheckInterrupt()
LockSupport.park(this);
return Thread.interrupted();
该方法正式将B线程阻塞,等待唤醒(unpark解除)或者中断,从park返回
如果没有唤醒,就一直在这等待
2、A线程执行完毕,释放锁
lock.unlock();
public void unlock()
sync.release(1);
public final boolean release(int arg)
if (tryRelease(arg))
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
return false;
2.1、sync的tryRelease尝试释放
protected final boolean tryRelease(int releases)
int c = getState() - releases;//占用状态码1-1=0
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0)
free = true;
setExclusiveOwnerThread(null);//释放成功,设置占用线程为空
setState(c);//并更改占用码为0(表示没有线程占用)
return free;
尝试释放
释放成功,设置占用线程未空,并更改占用码为0(表示没有线程占用锁)
public final boolean release(int arg)
if (tryRelease(arg))
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
return false;
尝试释放成功,继续执行
头结点为哨兵结点
哨兵结点waitStats=-1,执行unparkSuccessor(h)
private void unparkSuccessor(Node node)
int ws = node.waitStatus;//-1
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//更改为0
Node s = node.next;
if (s == null || s.waitStatus > 0)
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
if (s != null)
LockSupport.unpark(s.thread);
哨兵waitStatus=-1,比较并替换为0
Node s = node.next=B结点
B!=null,B.waitStatus=0
if (s != null)
LockSupport以上是关于JUC-AQS源码分析的主要内容,如果未能解决你的问题,请参考以下文章