AQS解析与实战

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS解析与实战相关的知识,希望对你有一定的参考价值。

前言

前段时间在面试,发现面试官都有问到同步器AQS的相关问题。AQS为Java中几乎所有的锁和同步器提供一个基础框架,派生出如ReentrantLock、Semaphore、CountDownLatch等AQS全家桶。本文基于AQS原理的几个核心点,谈谈对AbstractQueuedSynchronizer的理解,并实现一个自定义同步器。

AQS原理面试题的核心回答要点

  1. state 状态的维护。
  2. CLH队列
  3. ConditionObject通知
  4. 模板方法设计模式
  5. 独占与共享模式。
  6. 自定义同步器。
  7. AQS全家桶的一些延伸,如:ReentrantLock等。

AQS的类图结构

AQS全称是AbstractQueuedSynchronizer,即抽象同步队列。下面看一下AQS的类图结构:
技术图片
为了方便下面几个关键点的理解,大家先熟悉一下AQS的类图结构。

state 状态的维护

在
AQS
中维持了一个单一的共享状态
state
,来实现同步器同步。看一下
state
的相关代码如下:

state源码

/**
     * The synchronization state.
     */

private

volatile

int
 state
;

/**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a {@code volatile} read.
     * @return current state value
     */

protected

final

int
 getState
()

{

return
 state
;

}

/**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a {@code volatile} write.
     * @param newState the new state value
     */

protected

final

void
 setState
(
int
 newState
)

{
        state 
=
 newState
;

}

/**
     * Atomically sets synchronization state to the given updated
     * value if the current state value equals the expected value.
     * This operation has memory semantics of a {@code volatile} read
     * and write.
     *
     * @param expect the expected value
     * @param update the new value
     * @return {@code true} if successful. False return indicates that the actual
     *         value was not equal to the expected value.
     */

protected

final

boolean
 compareAndSetState
(
int
 expect
,

int
 update
)

{

// See below for intrinsics setup to support this

return
 unsafe
.
compareAndSwapInt
(
this
,
 stateOffset
,
 expect
,
 update
);

}

state 源码设计几个回答要点:

  • state用volatile修饰,保证多线程中的可见性。
  • getState()和setState()方法采用final修饰,限制AQS的子类重写它们两。
  • compareAndSetState()方法采用乐观锁思想的CAS算法,也是采用final修饰的,不允许子类重写。

CLH队列

谈到CLH队列,我们结合以上state状态,先来看一下AQS原理图:

技术图片

CLH(Craig, Landin, and Hagersten locks) 同步队列 是一个FIFO双向队列,其内部通过节点head和tail记录队首和队尾元素,队列元素的类型为Node。AQS依赖它来完成同步状态state的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

Node节点

CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),condition队列的后续节点(nextWaiter)如下图:

技术图片

waitStatus几种状态状态:

技术图片

我们再看一下CLH队列入列以及出列的代码:

入列

CLH队列入列就是tail指向新节点、新节点的prev指向当前最后的节点,当前最后一个节点的next指向当前节点。addWaiter方法如下:

//构造Node
private

Node
 addWaiter
(
Node
 mode
)

{

Node
 node 
=

new

Node
(
Thread
.
currentThread
(),
 mode
);

// Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)

Node
 pred 
=
 tail
;

if

(
pred 
!=

null
)

{
            node
.
prev 
=
 pred
;

//CAS设置尾节点

if

(
compareAndSetTail
(
pred
,
 node
))

{
                pred
.
next 
=
 node
;

return
 node
;

}

}

//多次尝试
        enq
(
node
);

return
 node
;

}

由以上代码可得,addWaiter设置尾节点失败的话,调用enq(Node node)方法设置尾节点,enq方法如下:


private

Node
 enq
(
final

Node
 node
)

{

//死循环尝试,知道成功为止

for

(;;)

{

Node
 t 
=
 tail
;

//tail 不存在,设置为首节点

if

(
t 
==

null
)

{

// Must initialize

if

(
compareAndSetHead
(
new

Node
()))
                    tail 
=
 head
;

}

else

{
                node
.
prev 
=
 t
;

if

(
compareAndSetTail
(
t
,
 node
))

{
                    t
.
next 
=
 node
;

return
 t
;

}

}

}

}

出列

首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点。可以看一下以下两段源码:


Node
 h 
=
 head
;

if

(
h 
!=

null

&&
 h
.
waitStatus 
!=

0
)
  unparkSuccessor
(
h
);

private

void
 unparkSuccessor
(
Node
 node
)

{

/*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */

int
 ws 
=
 node
.
waitStatus
;

if

(
ws 
<

0
)
            compareAndSetWaitStatus
(
node
,
 ws
,

0
);

/*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */

Node
 s 
=
 node
.
next
;

if

(
s 
==

null

||
 s
.
waitStatus 
>

0
)

{
            s 
=

null
;

for

(
Node
 t 
=
 tail
;
 t 
!=

null

&&
 t 
!=
 node
;
 t 
=
 t
.
prev
)

if

(
t
.
waitStatus 
<=

0
)
                    s 
=
 t
;

}

if

(
s 
!=

null
)

LockSupport
.
unpark
(
s
.
thread
);

}

CLH核心几个回答要点

  • 双向链表入列出列
  • CAS算法设置尾节点+死循环自旋。

CAS算法,可以看一下我工作实战中仿造CAS算法解决并发问题的实现https://juejin.im/post/5d0616ade51d457756536791

ConditionObject

ConditionObject简介

我们都知道,synchronized控制同步的时候,可以配合Object的wait()、notify(),notifyAll() 系列方法可以实现等待/通知模式。而Lock呢?它提供了条件Condition接口,配合await(),signal(),signalAll() 等方法也可以实现等待/通知机制。ConditionObject实现了Condition接口,给AQS提供条件变量的支持 。

Condition队列与CLH队列的那些事

我们先来看一下图:

技术图片

ConditionObject队列与CLH队列的爱恨情仇:

  • 调用了await()方法的线程,会被加入到conditionObject等待队列中,并且唤醒CLH队列中head节点的下一个节点。
  • 线程在某个ConditionObject对象上调用了singnal()方法后,等待队列中的firstWaiter会被加入到AQS的CLH队列中,等待被唤醒。
  • 当线程调用unLock()方法释放锁时,CLH队列中的head节点的下一个节点(在本例中是firtWaiter),会被唤醒。

区别:

  • ConditionObject对象都维护了一个单独的等待队列 ,AQS所维护的CLH队列是同步队列,它们节点类型相同,都是Node。

独占与共享模式。

AQS支持两种同步模式:独占式和共享式。

独占式

同一时刻仅有一个线程持有同步状态,如ReentrantLock。又可分为公平锁和非公平锁。

公平锁: 按照线程在队列中的排队顺序,有礼貌的,先到者先拿到锁。

非公平锁: 当线程要获取锁时,无视队列顺序直接去抢锁,不讲道理的,谁抢到就是谁的。

acquire(int arg)是独占式获取同步状态的方法,我们来看一下源码:

  • acquire(long arg)方法
public

final

void
 acquire
(
long
 arg
)

{

if

(!
tryAcquire
(
arg
)

&&
            acquireQueued
(
addWaiter
(
Node
.
EXCLUSIVE
),
 arg
))
            selfInterrupt
();

}
  • addWaiter方法
//构造Node
private

Node
 addWaiter
(
Node
 mode
)

{

Node
 node 
=

new

Node
(
Thread
.
currentThread
(),
 mode
);

// Try the fast path of enq; backup to full enq on failure(快速尝试添加尾节点)

Node
 pred 
=
 tail
;

if

(
pred 
!=

null
)

{
            node
.
prev 
=
 pred
;

//CAS设置尾节点

if

(
compareAndSetTail
(
pred
,
 node
))

{
                pred
.
next 
=
 node
;

return
 node
;

}

}

//多次尝试
        enq
(
node
);

return
 node
;

}
  • acquireQueued(final Node node, long arg)方法
final

boolean
 acquireQueued
(
final

Node
 node
,

long
 arg
)

{

boolean
 failed 
=

true
;

try

{

boolean
 interrupted 
=

false
;

for

(;;)

{

final

Node
 p 
=
 node
.
predecessor
();

if

(
p 
==
 head 
&&
 tryAcquire
(
arg
))

{
                    setHead
(
node
);
                    p
.
next 
=

null
;

// help GC
                    failed 
=

false
;

return
 interrupted
;

}

if

(
shouldParkAfterFailedAcquire
(
p
,
 node
)

&&
                    parkAndCheckInterrupt
())
                    interrupted 
=

true
;

}

}

finally

{

if

(
failed
)
                cancelAcquire
(
node
);

}

}
  • selfInterrupt()方法
static

void
 selfInterrupt
()

{

Thread
.
currentThread
().
interrupt
();

}

结合源代码,可得acquire(int arg)方法流程图,如下:
技术图片

共享式

多个线程可同时执行,如Semaphore/CountDownLatch等都是共享式的产物。

acquireShared(long arg)是共享式获取同步状态的方法,可以看一下源码:


public

final

void
 acquireShared
(
long
 arg
)

{

if

(
tryAcquireShared
(
arg
)

<

0
)
            doAcquireShared
(
arg
);

}

由上可得,先调用tryAcquireShared(int arg)方法尝试获取同步状态,如果获取失败,调用doAcquireShared(int arg)自旋方式获取同步状态,方法源码如下:


private

void
 doAcquireShared
(
long
 arg
)

{

final

Node
 node 
=
 addWaiter
(
Node
.
SHARED
);

boolean
 failed 
=

true
;

try

{

boolean
 interrupted 
=

false
;

for

(;;)

{

final

Node
 p 
=
 node
.
predecessor
();

if

(
p 
==
 head
)

{

long
 r 
=
 tryAcquireShared
(
arg
);

if

(
r 
>=

0
)

{
                        setHeadAndPropagate
(
node
,
 r
);
                        p
.
next

=

null
;

// help GC

if

(
interrupted
)
                            selfInterrupt
();
                        failed 
=

false
;

return
;

}

}

if

(
shouldParkAfterFailedAcquire
(
p
,
 node
)

&&
                    parkAndCheckInterrupt
())
                    interrupted 
=

true
;

}

}

finally

{

if

(
failed
)
                cancelAcquire
(
node
);

}

}

AQS的模板方法设计模式

模板方法模式

模板方法模式: 在一个方法中定义一个算法的骨架,而将一些步骤延迟到子类中。模板方法使得子类可以在不改变算法结构的情况下,重新定义算法中的某些步骤。

模板方法模式生活中的例子: 假设我们要去北京旅游,那么我们可以坐高铁或者飞机,或者火车,那么定义交通方式的抽象类,可以有以下模板:买票->安检->乘坐xx交通工具->到达北京。让子类继承该抽象类,实现对应的模板方法。
技术图片
AQS定义的一些模板方法如下:

isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

简言之,就是AQS提供tryAcquire,tryAcquireShared等模板方法,给子类实现自定义的同步器。

自定义同步器。

基于以上分析,我们都知道state,CLH队列,ConditionObject队列 等这些关键点,你要实现自定义锁的话,首先需要确定你要实现的是独占锁还是共享锁,定义原子变量state的含义,再定义一个内部类去继承AQS,重写对应的模板方法。

我们来看一下基于 AQS 实现的不可重入的独占锁的demo,来自《Java并发编程之美》:

public

class

NonReentrantLock

implements

Lock
,
Serializable
{

//内部类,自定义同步器

static

class

Sync

extends

AbstractQueuedSynchronizer

{

//是否锁已经被持有

public

boolean
 isHeldExclusively
()

{

return
 getState
()

==

1
;

}

//如果state为0 则尝试获取锁

public

boolean
 tryAcquire
(
int
 arg
)

{

assert
 arg
==

1

;

//CAS设置状态,能保证操作的原子性,当前为状态为0,操作成功状态改为1

if
(
compareAndSetState
(
0
,

1
)){

//设置当前独占的线程
                setExclusiveOwnerThread
(
Thread
.
currentThread
());

return

true
;

}

return

false
;

}

//尝试释放锁,设置state为0

public

boolean
 tryRelease
(
int
 arg
)

{

assert
 arg 
==
1
;

//如果同步器同步器状态等于0,则抛出监视器非法状态异常

if
(
getState
()

==

0
)

throw

new

IllegalMonitorStateException
();

//设置独占锁的线程为null
            setExclusiveOwnerThread
(
null
);

//设置同步状态为0
            setState
(
0
);

return

true
;

}

//返回Condition,每个Condition都包含了一个Condition队列

Condition
 newCondition
(){

return

new

ConditionObject
();

}

}

//创建一个Sync来做具体的工作

private

final

Sync
 sync
=

new

Sync

();

@Override

public

void
 lock
()

{
        sync
.
acquire
(
1
);

}

public

boolean
 isLocked
()

{

return
 sync
.
isHeldExclusively
();

}

@Override

public

void
 lockInterruptibly
()

throws

InterruptedException

{
        sync
.
acquireInterruptibly
(
1
);

}

@Override

public

boolean
 tryLock
()

{

return
 sync
.
tryAcquire
(
1
);

}

@Override

public

boolean
 tryLock
(
long
 time
,

TimeUnit
 unit
)

throws

InterruptedException

{

return
 sync
.
tryAcquireNanos
(
1
,
 unit
.
toNanos
(
time
));

}

@Override

public

void
 unlock
()

{
        sync
.
release
(
1
);

}

@Override

public

Condition
 newCondition
()

{

return
 sync
.
newCondition
();

}

}

NonReentrantLockDemoTest:

public

class

NonReentrantLockDemoTest

{

private

static

NonReentrantLock
 nonReentrantLock 
=

new

NonReentrantLock
();

public

static

void
 main
(
String
[]
 args
)

{

for

(
int
 i 
=

0
;
 i 
<

10
;
 i
++)

{

Thread
 thread 
=

new

Thread
(()

->

{
                nonReentrantLock
.
lock
();

try

{

System
.
out
.
println
(
Thread
.
currentThread
().
getName
());

Thread
.
sleep
(
3000
);

}

catch

(
InterruptedException
 e
)

{
                    e
.
printStackTrace
();

}

finally

{
                    nonReentrantLock
.
unlock
();

}

});
            thread
.
start
();

}

}
}

运行结果:

技术图片

AQS全家桶实战

AQS派生出如ReentrantLock、Semaphore等AQS全家桶,接下来可以看一下它们的使用案例。

ReentrantLock

ReentrantLock介绍

  • ReentrantLock为重入锁,能够对共享资源能够重复加锁,是实现Lock接口的一个类。
  • ReentrantLock支持公平锁和非公平锁两种方式

ReentrantLock案例

使用ReentrantLock来实现个简单线程安全的list,如下:

public

class

ReentrantLockList

{

// 线程不安全的list

private

ArrayList
<
String
>
 array 
=

new

ArrayList
<>();

//独占锁

private

volatile

ReentrantLock

lock

=

new

ReentrantLock
();

//添加元素

public

void
 add
(
String
 e
){

lock
.
lock
();

try

{
            array
.
add
(
e
);

}
finally

{

lock
.
unlock
();

}

}

//删除元素

public

void
 remove
(
String
 e
){

lock
.
lock
();

try

{
            array
.
remove
(
e
);

}
finally

{

lock
.
unlock
();

}

}

//获取元素

public

String

get
(
int
 index
){

lock
.
lock
();

try

{

return
 array
.
get
(
index
);

}
finally

{

lock
.
unlock
();

}

}
}

Semaphore

Semaphore介绍

  • Semaphore也叫信号量,可以用来控制资源并发访问的线程数量,通过协调各个线程,以保证合理的使用资源。

    Semaphore案例

Java多线程有一到比较经典的面试题:ABC三个线程顺序输出,循环10遍。

public

class

ABCSemaphore

{

private

static

Semaphore
 A 
=

new

Semaphore
(
1
);

private

static

Semaphore
 B 
=

new

Semaphore
(
1
);

private

static

Semaphore
 C 
=

new

Semaphore
(
1
);

static

class

ThreadA

extends

Thread

{

@Override

public

void
 run
()

{

try

{

for

(
int
 i 
=

0
;
 i 
<

10
;
 i
++)

{
                    A
.
acquire
();

System
.
out
.
print
(
"A"
);
                    B
.
release
();

}

}

catch

(
InterruptedException
 e
)

{
                e
.
printStackTrace
();

}

}

}

static

class

ThreadB

extends

Thread

{

@Override

public

void
 run
()

{

try

{

for

(
int
 i 
=

0
;
 i 
<

10
;
 i
++)

{
                    B
.
acquire
();

System
.
out
.
print
(
"B"
);
                    C
.
release
();

}

}

catch

(
InterruptedException
 e
)

{
                e
.
printStackTrace
();

}

}

}

static

class

ThreadC

extends

Thread

{

@Override

public

void
 run
()

{

try

{

for

(
int
 i 
=

0
;
 i 
<

10
;
 i
++)

{
                    C
.
acquire
();

System
.
out
.
print
(
"C"
);
                    A
.
release
();

}

}

catch

(
InterruptedException
 e
)

{
                e
.
printStackTrace
();

}

}

}

public

static

void
 main
(
String
[]
 args
)

throws

InterruptedException

{

// 开始只有A可以获取, BC都不可以获取, 保证了A最先执行
        B
.
acquire
();
        C
.
acquire
();

new

ThreadA
().
start
();

new

ThreadB
().
start
();

new

ThreadC
().
start
();

}

参考

  • 《Java并发编程之美》
  • 【死磕Java并发】—–J.U.C之AQS

以上是关于AQS解析与实战的主要内容,如果未能解决你的问题,请参考以下文章

AQS(AbstractQueuedSynchronizer)源码深度解析—AQS的设计与总体结构

Java高并发编程实战6,通过AQS源码分析lock()锁机制

Java高并发编程实战6,通过AQS源码分析lock()锁机制

Java并发编程实战—– AQS:CLH同步队列

AQS源码解析

AQS (Abstract Queued Synchronizer)源码解析 -- 独占锁与共享锁的加锁与解锁