Java Review - 并发编程_抽象同步队列AQS
Posted 小小工匠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Review - 并发编程_抽象同步队列AQS相关的知识,希望对你有一定的参考价值。
文章目录
- 概述 AQS——锁的底层支持
- state 的作用
- ConditionObject
- 独占 VS 共享
- Interruptibly
- 维护AQS提供的队列 - 入队操作
- AQS——条件变量的支持
- 基于AQS实现自定义同步器
概述 AQS——锁的底层支持
AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS实现的。
另外,我们基本上直接使用AQS框架开发的机会很少,但是知道其原理对于架构设计还是很有帮助的。
-
AQS是一个FIFO的双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。
-
其中Node中的thread变量用来存放进入AQS队列里面的线程
-
Node节点内部的SHARED用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的,EXCLUSIVE用来标记线程是获取独占资源时被挂起后放入AQS队列的
-
waitStatus记录当前线程等待状态,可以为CANCELLED(线程被取消了)、SIGNAL(线程需要被唤醒)、CONDITION(线程在条件队列里面等待)、PROPAGATE(释放共享资源时需要通知其他节点)
-
prev记录当前节点的前驱节点,next记录当前节点的后继节点。
state 的作用
在AQS中维持了一个单一的状态信息 state,可以通过getState、setState、compareAndSetState
方法修改其值。
-
对于ReentrantLock的实现来说,state可以用来表示当前线程获取锁的可重入次数;
-
对于读写锁ReentrantReadWriteLock来说,state的高16位表示读状态,也就是获取该读锁的次数,低16位表示获取到写锁的线程的可重入次数;
-
对于semaphore来说,state用来表示当前可用信号的个数
-
对于CountDownlatch来说,state用来表示计数器当前的值。
ConditionObject
AQS有个内部类ConditionObject,用来结合锁实现线程同步。
ConditionObject可以直接访问AQS对象内部的变量,比如state状态值和AQS队列。
ConditionObject是条件变量,每个条件变量对应一个条件队列(单向链表队列),其用来存放调用条件变量的await方法后被阻塞的线程, 这个条件队列的头、尾元素分别为firstWaiter和lastWaiter。
独占 VS 共享
对于AQS来说,线程同步的关键是对状态值state进行操作。根据state是否属于一个线程,操作state的方式分为独占方式和共享方式。
在独占方式下获取和释放资源使用的方法为
- void acquire(int arg)
- void acquireInterruptibly(int arg)
- boolean release(int arg)
在共享方式下获取和释放资源的方法为
- void acquireShared(int arg)
- void acquireSharedInterruptibly(int arg)
- boolean releaseShared(int arg)
使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。
比如独占锁ReentrantLock的实现,当一个线程获取了ReentrantLock的锁后,在AQS内部会首先使用CAS操作把state状态值从0变为1,然后设置当前锁的持有者为当前线程,当该线程再次获取锁时发现它就是锁的持有者,则会把状态值从1变为2,也就是设置可重入次数,而当另外一个线程获取锁时发现自己并不是该锁的持有者就会被放入AQS阻塞队列后挂起。
共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可。
比如Semaphore信号量,当一个线程通过acquire()方法获取信号量时,会首先看当前信号量个数是否满足需要,不满足则把当前线程放入阻塞队列,如果满足则通过自旋CAS获取信号量。
独占方式下,获取与释放资源的流程
在独占方式下,获取与释放资源的流程如下
- (1)当一个线程调用
acquire(int arg)
方法获取独占资源时,会首先使用tryAcquire
方法尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.EXCLUSIVE
的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)
方法挂起自己。
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once @link #tryAcquire,
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking @link
* #tryAcquire until success. This method can be used
* to implement method @link Lock#lock.
*
* @param arg the acquire argument. This value is conveyed to
* @link #tryAcquire but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
- (2)当一个线程调用
release(int arg)
方法时会尝试使用tryRelease操作释放资源,这里是设置状态变量state的值,然后调用LockSupport.unpark(thread)
方法激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryAcquire
尝试,看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if @link #tryRelease returns true.
* This method can be used to implement method @link Lock#unlock.
*
* @param arg the release argument. This value is conveyed to
* @link #tryRelease but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from @link #tryRelease
*/
public final boolean release(int arg)
if (tryRelease(arg))
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
return false;
需要注意的是,AQS类并没有提供可用的tryAcquire和tryRelease方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquire和tryRelease需要由具体的子类来实现。
子类在实现tryAcquire和tryRelease时要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。
子类还需要定义,在调用acquire和release方法时state状态值的增减代表什么含义。
比如继承自AQS实现的独占锁ReentrantLock,定义当status为0时表示锁空闲,为1时表示锁已经被占用。在重写tryAcquire时,在内部需要使用CAS算法查看当前state是否为0,如果为0则使用CAS设置为1,并设置当前锁的持有者为当前线程,而后返回true,如果CAS失败则返回false。
比如继承自AQS实现的独占锁在实现tryRelease时,在内部需要使用CAS算法把当前state的值从1修改为0,并设置当前锁的持有者为null,然后返回true,如果CAS失败则返回false。
共享方式下,获取与释放资源的流程
在共享方式下,获取与释放资源的流程如下:
- (1)当线程调用
acquireShared(int arg)
获取共享资源时,会首先使用tryAcquireShared尝试获取资源,具体是设置状态变量state的值,成功则直接返回,失败则将当前线程封装为类型为Node.SHARED的Node节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)
方法挂起自己。
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once @link #tryAcquireShared,
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking @link
* #tryAcquireShared until success.
*
* @param arg the acquire argument. This value is conveyed to
* @link #tryAcquireShared but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg)
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
- (2)当一个线程调用releaseShared(int arg) 时会尝试使用tryReleaseShared操作释放资源,这里是设置状态变量state的值,然后使用LockSupport.unpark(thread)激活AQS队列里面被阻塞的一个线程(thread)。被激活的线程则使用tryReleaseShared查看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起。
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if @link #tryReleaseShared returns true.
*
* @param arg the release argument. This value is conveyed to
* @link #tryReleaseShared but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from @link #tryReleaseShared
*/
public final boolean releaseShared(int arg)
if (tryReleaseShared(arg))
doReleaseShared();
return true;
return false;
同样需要注意的是,AQS类并没有提供可用的tryAcquireShared和tryReleaseShared方法,正如AQS是锁阻塞和同步器的基础框架一样,tryAcquireShared和tryReleaseShared需要由具体的子类来实现。
子类在实现tryAcquireShared和tryReleaseShared时要根据具体场景使用CAS算法尝试修改state状态值,成功则返回true,否则返回false。
比如继承自AQS实现的读写锁ReentrantReadWriteLock里面的读锁在重写tryAcquireShared时,首先查看写锁是否被其他线程持有,如果是则直接返回false,否则使用CAS递增state的高16位(在ReentrantReadWriteLock中,state的高16位为获取读锁的次数)。
比如继承自AQS实现的读写锁ReentrantReadWriteLock里面的读锁在重写tryReleaseShared时,在内部需要使用CAS算法把当前state值的高16位减1,然后返回true,如果CAS失败则返回false。
基于AQS实现的锁除了需要重写上面介绍的方法外 ,还需要重写isHeldExclusively方法,来判断锁是被当前线程独占还是被共享。
Interruptibly
独占方式下的 void acquire(int arg)
和void acquireInterruptibly(int arg)
,与共享方式下的 void acquireShared(int arg)
和void acquireSharedInterruptibly(int arg)
,这两套函数中都有一个带有Interruptibly
关键字的函数,那么带这个关键字和不带有什么区别呢?
-
不带Interruptibly关键字的方法的意思是不对中断进行响应,也就是线程在调用不带Interruptibly关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就是说不对中断进行响应,忽略中断。
-
带Interruptibly关键字的方法要对中断进行响应,也就是线程在调用带Interruptibly关键字的方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程,那么该线程会抛出InterruptedException异常而返回。
维护AQS提供的队列 - 入队操作
最后,我们来看看如何维护AQS提供的队列,主要看入队操作。
入队操作: 当一个线程获取锁失败后该线程会被转换为Node节点,然后就会使用enq(final Node node)方法将该节点插入到AQS的阻塞队列.
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node)
for (;;)
Node t = tail; // 1
if (t == null) // Must initialize
if (compareAndSetHead(new Node()))// 2
tail = head;
else
node.prev = t; // 3
if (compareAndSetTail(t, node)) // 4
t.next = node;
return t;
下面结合代码和节点图来讲解入队的过程。
【第一次循环】
- 如上代码在第一次循环中,当要在AQS队列尾部插入元素时,AQS队列状态如下所示
也就是队列头、尾节点都指向null;
-
当执行代码(1)后节点t指向了尾部节点,这时候队列状态如下图所示。
-
这时候t为null,故执行代码(2),使用CAS算法设置一个哨兵节点为头节点,如果CAS设置成功,则让尾部节点也指向哨兵节点,这时候队列状态如下图所示
【第二次循环】
-
到现在为止只插入了一个哨兵节点,还需要插入node节点,所以在第二次循环后执行到代码(1),这时候队列状态如下图所示
-
然后执行代码(3)设置node(入参)的前驱节点为尾部节点,这时候队列状态如下图所示
- 然后通过CAS算法设置node节点为尾部节点,CAS成功后队列状态如下图所示
- CAS成功后再设置原来的尾部节点的后驱节点为node,这时候就完成了双向链表的插入,此时队列状态如下图所示。
AQS——条件变量的支持
我们知道notify和wait,是配合synchronized内置锁实现线程间同步的基础设施一样,条件变量的signal和await方法也是用来配合锁(使用AQS实现的锁)实现线程间同步的基础设施。
它们的不同在于,synchronized同时只能与一个共享变量的notify或wait方法实现同步,而AQS的一个锁可以对应多个条件变量。
在调用共享变量的notify和wait方法前必须先获取该共享变量的内置锁,同理,在调用条件变量的signal和await方法前也必须先获取条件变量对应的锁。
那么,到底什么是条件变量呢?如何使用呢?不急,下面看一个例子。
// 1
ReentrantLock lock = new ReentrantLock();
// 2
Condition condition = lock.newCondition();
// 3
lock.lock();
try
System.out.println("begin wait");
// 4
condition.await();
System.out.println("end wait");
catch (Exception e)
e.printStackTrace();
finally
// 5
lock.unlock();
// 6
lock.lock();
try
System.out.println("begin single");
// 7
condition.signal();
System.out.println("end single");
catch (Exception e)
e.printStackTrace();
finally
// 8
lock.unlock();
-
代码(1)创建了一个独占锁ReentrantLock对象,ReentrantLock是基于AQS实现的锁。
-
代码(2)使用创建的Lock对象的
newCondition()
方法创建了一个ConditionObject变量,这个变量就是Lock锁对应的一个条件变量。需要注意的是,一个Lock对象可以创建多个条件变量。 -
代码(3)首先获取了独占锁
-
代码(4)则调用了条件变量的await()方法阻塞挂起了当前线程。 当其他线程调用条件变量的signal方法时,被阻塞的线程才会从await处返回。需要注意的是,和调用Object的wait方法一样,如果在没有获取到锁前调用了条件变量的await方法则会抛出 java.lang.IllegalMonitorStateException异常。
-
代码(5)则释放了获取的锁。
其实这里的Lock对象等价于synchronized加上共享变量,调用lock.lock()方法就相当于进入了synchronized块(获取了共享变量的内置锁),调用lock.unLock()方法就相当于退出synchronized块。 调用条件变量的await()方法就相当于调用共享变量的wait()方法,调用条件变量的signal方法就相当于调用共享变量的notify()方法。调用条件变量的signalAll()方法就相当于调用共享变量的notifyAll()方法。
经过上面解释,知道条件变量是什么,它是用来做什么的了。
在上面代码中,lock.newCondition()的作用其实是new了一个在AQS内部声明的ConditionObject对象,ConditionObject是AQS的内部类,可以访问AQS内部的变量(例如状态变量state)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。注意这个条件队列和AQS队列不是一回事。
在如下代码中,当线程调用条件变量的await()方法时(必须先调用锁的lock()方法获取锁),在内部会构造一个类型为Node.CONDITION的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁(也就是会操作锁对应的state变量的值),并被阻塞挂起。
这时候如果有其他线程调用lock.lock()尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by @link #getState.
* <li> Invoke @link #release with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* @link #acquire with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 9 创建新的node节点,并插入到条件队列的对尾
Node node = addConditionWaiter();
// 10 释放当前线程获取的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 11 调用park方法阻塞挂起当前线程
while (!isOnSyncQueue(node))
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
在如下代码中,当另外一个线程调用条件变量的signal方法时(必须先调用锁的lock()方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入AQS的阻塞队列里面,然后激活这个线程。
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if @link #isHeldExclusively
* returns @code false
*/
public final void signal()
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 将条件队列的队首移动到AQS队列
doSignal(first);
需要注意的是,AQS只提供了ConditionObject的实现,并没有提供newCondition函数,该函数用来new一个ConditionObject对象。需要由AQS的子类来提供newCondition函数。
下面来看当一个线程调用条件变量的await()方法而被阻塞后,如何将其放入条件队列。
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter()
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION)
unlinkCancelledWaiters();
t = lastWaiter;
// 1
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 2
if (t == null)
firstWaiter = node;
else
// 3
t.nextWaiter = node;
// 4
lastWaiter = node;
return node;
代码(1)首先根据当前线程创建一个类型为Node.CONDITION的节点,然后通过代码(2)(3)(4)在单向条件队列尾部插入一个元素。
注意:当多个线程同时调用lock.lock()方法获取锁时,只有一个线程获取到了锁,其他线程会被转换为Node节点插入到lock锁对应的AQS阻塞队列里面,并做自旋CAS尝试获取锁。
如果获取到锁的线程又调用了对应的条件变量的await()方法,则该线程会释放获取到的锁,并被转换为Node节点插入到条件变量对应的条件队列里面。
这时候因为调用lock.lock()方法被阻塞到AQS队列里面的一个线程会获取到被释放的锁,如果该线程也调用了条件变量的await()方法则该线程也会被放入条件变量的条件队列里面。
当另外一个线程调用条件变量的signal()或者signalAll()方法时,会把条件队列里面的一个或者全部Node节点移动到AQS的阻塞队列里面,等待时机获取锁。
最后使用一个图总结如下:一个锁对应一个AQS阻塞队列,对应多个条件变量,每个条件变量有自己的一个条件队列。
基于AQS实现自定义同步器
我们基于AQS实现一个不可重入的独占锁,。
自定义AQS需要重写一系列函数,还需要定义原子变量state的含义。这里我们定义,state为0表示目前锁没有被线程持有,state为1表示锁已经被某一个线程持有,由于是不可重入锁,所以不需要记录持有锁的线程获取锁的次数。另外,我们自定义的锁支持条件变量。
【基于AQS实现的不可重入的独占锁】
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/5 22:35
* @mark: show me the code , change the world
*/
public class NonReentrantLock implements Lock, Serializable
//静态内部类,用于辅助
private static class Sync extends AbstractQueuedSynchronizer
@Override
protected boolean tryAcquire(int arg)
assert arg == 1;//如果state为0,则尝试获取锁
if (compareAndSetState(0,1))
setExclusiveOwnerThread(Thread.currentThread());
return true;
return false;
@Override
protected boolean tryRelease(int arg)
assert arg == 1;//如果state为0,则尝试获取锁
if (getState()==0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
@Override
protected boolean isHeldExclusively()
// 是否锁已经被持有
return getState()==1;
//提供条件变量接口
public Condition newCondition()
return new ConditionObject();
Sync sync = new Sync();
@Override
public void lock()
sync.acquire(1);
@Override
public void lockInterruptibly() throws InterruptedException
sync.acquireInterruptibly(1);
@Override
public boolean tryLock()
return sync.tryAcquire(1);
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException
return sync.tryAcquireNanos(1, unit.toNanos(time));
@Override
public void unlock()
sync.release(1);
@Override
public Condition newCondition()
return sync.newCondition();
在如上代码中,NonReentrantLock定义了一个内部类Sync用来实现具体的锁的操作,Sync则继承了AQS。由于我们实现的是独占模式的锁,所以Sync重写了tryAcquire、tryRelease和isHeldExclusively 3个方法。另外,Sync提供了newCondition这个方法用来支持条件变量。
【使用自定义锁实现生产—消费模型】
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/5 22:59
* @mark: show me the code , change the world
*/
public class NonReentrantLockTest
static NonReentrantLock lock = new NonReentrantLock();
static Condition notFull = lock.newCondition();
static Condition notEmpty = lock.newCondition();
static Queue<String> queue = new LinkedBlockingQueue<>();
static int queueSize = 以上是关于Java Review - 并发编程_抽象同步队列AQS的主要内容,如果未能解决你的问题,请参考以下文章
Java Review - 并发编程_DelayQueue原理&源码剖析
Java Review - 并发编程_ConcurrentLinkedQueue原理&源码剖析
Java Review - 并发编程_ConcurrentLinkedQueue原理&源码剖析
Java Review - 并发编程_DelayQueue原理&源码剖析