JAVA进阶之路-AbstractQueuedSynchronizer(AQS)源码走读
Posted LuckyZhouStar
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA进阶之路-AbstractQueuedSynchronizer(AQS)源码走读相关的知识,希望对你有一定的参考价值。
1.AQS介绍
AbstractQueuedSynchronizer是JUC的lock包实现的基石,内部通过一个int成员变量表示同步状态,也有一个是通过long实现的版本,但是int基本够用。这个state被volatile修饰保证了各个线程之间的可见性。内部通过一个内置的FIFO队列来完成资源获取线程的排队工作。
内部状态state和等待队列的head和尾节点tail通过使用volatile关键字修饰,保证了内存之间的可见性。
子类通过重写tryAcquire和tryRelease抑或tryAcquireShared和tryReleaseShared,通过CAS修改状态state。(CAS是由unsafe提供的硬件级别支持的原子操作,compareAndSet,通过lock前缀指令,cmpxchg指令实现)。
内部的CLH队列由定义Node构成,如下
Node
int waitStatus;//节点状态,状态如下:
//CANCELLED,值为1,表示当前的线程被取消
//SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行
//CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
//PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
//值为0,表示当前节点在sync队列中,等待着获取锁。
Node prev;//前驱节点
Node next;//后继节点
Node nextWaiter;//条件队列里的节点
Thread thread;//当前线程
上面就是大致node节点的属性特征,下面来大概描述下整个的交互过程。
2.AQS获取锁的过程
先看一个代码执行的实例
public class AbstractQueuedSynchronizerTest
private static final Lock lock = new ReentrantLock();
public static void main(String[] args) throws Exception
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(AbstractQueuedSynchronizerTest::action);
executorService.submit(AbstractQueuedSynchronizerTest::action);
executorService.awaitTermination(200, TimeUnit.SECONDS);
executorService.shutdown();
private static void action()
System.out.printf("当前线程[%s],正在等待您的输入",Thread.currentThread().getName());
lock.lock();
try
System.in.read();
System.out.printf("当前线程[%s],执行完毕",Thread.currentThread().getName());
catch (IOException e)
e.printStackTrace();
finally
lock.unlock();
上面的流程是定义了一个固定大小为2的线程池,然后提交了两个执行任务,等待读取控制台,然后最后释放锁。下面来进行编号,加入两个线程的名称分别是Thread1和Thread2.
lock获取锁的过程
public void lock()
//每次获取锁都+1的方式来竞争
sync.acquire(1);
//获取锁的过程
public final void acquire(int arg)
//首先尝试获取锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
//加入上述走的是非公平锁的流程
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires)
//获取当前线程
final Thread current = Thread.currentThread();
//获取state的状态,初始值是0,0代表就可以进行竞争锁
int c = getState();
//如果state等于0,就可以尝试获取锁
if (c == 0)
//cas的方式把stage更新设置为1
if (compareAndSetState(0, acquires))
//如果设置成功后,设置当前线程标识,代表当前线程已经拿到锁了,可重入锁的时候需要用到
setExclusiveOwnerThread(current);
//直接返回true
return true;
//如果state不等于0,判断当前的线程是不是又要再次获取锁,如果当前线程持有该锁了,要再次获取的话,就不需要获取了,可重入锁的概念
else if (current == getExclusiveOwnerThread())
//可重入锁也对stage+1,这块是线程安全的,该线程已经持有了锁,所以可以随意操作state
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//设置当前的状态值,这块是线程安全的,该线程已经持有了锁,所以可以随意操作state
setState(nextc);
return true;
//如果多个线程竞争,既没有更新state,也不是可重入的线程,直接返回false
return false;
//自旋的方式来获取锁
final boolean acquireQueued(final Node node, int arg)
//中断响应是false
boolean interrupted = false;
try
//自旋的方式来获取锁
for (;;)
//获取当前节点的前一个节点
final Node p = node.predecessor();
//如果前一个节点是头节点的话,就先尝试获取锁一次,这里因为,如果之前的线程执行完了的话,也就是头节点的线程执行完毕的话,这里是可以直接获取到锁的,做了一点点的优化
if (p == head && tryAcquire(arg))
//如果前一个节点是头节点的话,获取到锁了,代表头节点执行完毕释放锁了
//重新设置当前节点是head节点
setHead(node);
//去除调用链,help GC
p.next = null; // help GC
//没有被中断,所以返回false
return interrupted;
//
//获取锁失败后,并不是立即阻塞,需要检查该线程的状态,该方法主要依靠前驱节点判断当前线程是否应该被阻塞
if (shouldParkAfterFailedAcquire(p, node))
//如果是阻塞的话,直接调用阻塞方法阻塞
interrupted |= parkAndCheckInterrupt();
catch (Throwable t)
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
//建立CLH队列,通过自旋的方式,也就是一个for(;;)的方式
private Node addWaiter(Node mode)
Node node = new Node(mode);
for (;;)
//如果是第一个线程进入的,那么tail是null,就会进行初始化操作
//如果已经初始化了,那么就向尾部添加
Node oldTail = tail;
if (oldTail != null)
//设置当前节点的前一个节点是尾部节点
node.setPrevRelaxed(oldTail);
//cas原理更新当前节点是尾部节点,如果更新成功
if (compareAndSetTail(oldTail, node))
//如果更新成功,由于是双向链表,把之前尾部的下一个节点设置为新加的节点
oldTail.next = node;
return node;
else
//如果是首次的话,就直接初始化
initializeSyncQueue();
//首次初始化头节点和尾部节点
private final void initializeSyncQueue()
Node h;
//cas原理设置头部节点和尾部节点
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
//这段代码主要检查当前线程是否需要被阻塞,具体规则如下:
/*如果当前线程的前驱节点状态为SINNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞
如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false
如果前驱节点非SINNAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SINNAL,返回false */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
//前驱节点
int ws = pred.waitStatus;
//状态为signal,表示当前线程处于等待状态,直接放回true
if (ws == Node.SIGNAL)
return true;
//前驱节点状态 > 0 ,则为Cancelled,表明该节点已经超时或者被中断了,需要从同步队列中取消
if (ws > 0)
do
node.prev = pred = pred.prev;
while (pred.waitStatus > 0);
pred.next = node;
//前驱节点状态为Condition、propagate
else
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
return false;
上述就是整个获取锁的过程,总之就是通过自旋获取锁,直至异常退出或获取锁成功,但是并不是busy acquire,因为当获取失败后会被挂起,由前驱节点释放锁时将其唤醒。同时由于唤醒的时候可能有其他线程竞争,所以还需要进行尝试获取锁,体现的非公平锁的精髓。
unlock释放锁的过程
//释放锁,对state进行减1操作
public void unlock()
sync.release(1);
//释放锁的过程
public final boolean release(int arg)
if (tryRelease(arg))
//如果释放锁成功
Node h = head;
//判断当前头节点的状态
if (h != null && h.waitStatus != 0)
//唤醒下一个节点
unparkSuccessor(h);
return true;
return false;
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node)
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0)
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
if (s != null)
LockSupport.unpark(s.thread);
//尝试释放锁的流程
protected final boolean tryRelease(int releases)
//对当前的stage进行减1操作
int c = getState() - releases;
//如果unlock的时候,当前线程不是占有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
//定义变量是否释放锁标识
boolean free = false;
//如果c已经变为0了,代表锁已经被释放掉了
if (c == 0)
//free变为true
free = true;
//释放当前占有锁的变量
setExclusiveOwnerThread(null);
//如果c不等于0,可能是有可重入锁的场景,可重入锁,当前线程占有了锁,再次获取锁的时候,进行了加1操作,所以减少完后可能还是有可重入锁的存在
setState(c);
return free;
总结
1.整个获取锁的过程,通过控制state变量,如果是0,则代表当前已经没有锁,如果是1则代表,获取锁变量成功,后续没有竞争获取锁的线程,会包装成node节点加入到CLH的阻塞队列中,并通过自旋的方式,一直判断自己的前一个节点是否是头节点,是的话就尝试获取锁,否则就阻塞。
2.释放锁的过程,直接对state进行减1操作,然后判断是否是0,如果是0的话,就释放当前的线程标识即可,释放成功后,就唤醒阻塞中的后续节点
以上是关于JAVA进阶之路-AbstractQueuedSynchronizer(AQS)源码走读的主要内容,如果未能解决你的问题,请参考以下文章