AQS源码的简单理解
Posted expiator
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS源码的简单理解相关的知识,希望对你有一定的参考价值。
概念
AQS全称 AbstractQueuedSynchronizer。
AQS是一个并发包的基础组件,用来实现各种锁,各种同步组件的。它包含了state变量、加锁线程、等待队列等并发中的核心组件。
ReentrantLock、Semaphore、CountDownLatch、FutrueTask,这些都是基于AQS构建的。
而AQS是基于volatile变量的读/写和CAS( 也就是compareAndSet()方法 )实现的。
volatile可以保证并发中的可见性,还可以禁止指令重排序。CAS,用于管理对共享数据的并发访问。
ReentrantLock和AQS的关系
在ReentrantLock中,state代表了加锁的状态。初始状态下,这个state的值是0。
另外,这个AQS内部还有一个关键变量,用来记录当前加锁的是哪个线程,初始化状态下,这个变量是null。
线程A跑过来调用ReentrantLock的lock()方法尝试进行加锁,这个加锁的过程,直接就是用CAS操作将state值从0变为1。
如果之前没人加过锁,那么state的值肯定是0,此时线程A就可以加锁成功。
一旦线程1加锁成功了之后,就可以设置当前加锁线程是自己。
主要流程
重要的变量
- state
状态变量。
/**
* The synchronization state.
*/
private volatile int state;
等待队列的节点Node
"CLH" (Craig, Landin, and Hagersten) lock queue。
CLH lock queue通常被用来处理并发的情况,它通过双向队列(FIFO)来完成同步状态。每个线程都会被封装成一个Node节点放到同步队列中。
队列的每个Node节点保存了当前线程的同步状态,等待状态,前驱和后继节点等。
static final class Node {
//共享模式
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
//独占模式
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
// 线程的等待状态 表示线程已经被取消
static final int CANCELLED = 1;
// 线程的等待状态 表示后继线程需要被唤醒
static final int SIGNAL = -1;
// 线程的等待状态 表示线程在Condtion上
static final int CONDITION = -2;
// 表示下一个acquireShared需要无条件的传播
static final int PROPAGATE = -3;
/**
* 等待状态有以下几种:
*
* SIGNAL: 当前节点的后继节点处于等待状态时,如果当前节点的同步状态被释放或者取消,
* 必须唤起它的后继节点
*
* CANCELLED: 一个节点由于超时或者中断需要在CLH队列中取消等待状态,被取消的节点不会再次等待
*
* CONDITION: 当前节点在等待队列中,只有当节点的状态设为0的时候该节点才会被转移到同步队列
*
* PROPAGATE: 下一次的共享模式同步状态的获取将会无条件的传播
* waitStatus的初始值时0,使用CAS来修改节点的状态
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
acquire()
线程在独占模式下获取state。
子类继承AQS后,经常会用到acquire() 、 release()。
acquire()获取状态、维护状态。
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
release()
线程在独占模式下释放state。
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor()
唤醒Node的后继节点。
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
compareAndSetState()
通过CAS设置状态变量。当state值为指定的参数expect时,将其修改为另一个指定的值。
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
ConditionObject
而另一个内部类ConditionObject实现了Condition接口,并且实现了其中的await(),signal(),signalALL()等方法。
ConditionObject主要是为并发编程中的同步提供了等待通知的实现方式,可以在不满足某个条件的时候挂起线程等待。直到满足某个条件的时候在唤醒线程。
参考资料:
https://juejin.im/post/5c07e59cf265da617464a09c (大白话聊聊对AQS的理解)
http://ifeve.com/introduce-abstractqueuedsynchronizer/
https://blog.csdn.net/qq_30572275/article/details/80297047
以上是关于AQS源码的简单理解的主要内容,如果未能解决你的问题,请参考以下文章