java semaphore 实现等待队列问题为啥死锁
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java semaphore 实现等待队列问题为啥死锁相关的知识,希望对你有一定的参考价值。
两个生产者,每个都生产三十次。
两个消费者,每个都消费三十次。
篮子初始化四个苹果,总容量十个。
full 为 semaphone(4)
empty 为 semaphore(6)
mutex 为全局锁 semaphore(1)
消费者:
full.Wait();
mutex.Wait();
消费....
mutex.Signal();
empty.Signal();
生产者:
empty.Wait();
mutex.Wait();
生产....
mutex.Signal();
full.Signal
Semaphore 同步的 Wait()
this.value--;
if(value<0) wait; // 省略 try catch
.
常规的 Semaphore 使用 while(value < 0) 控制 wait(). 但是上述的 Semaphore 使用 while() 就死锁,使用 if 没有任何问题。
我的问题:
问题1:
两个消费者都 wait() 等待生产者时,生产者生产完 mutex.Signal() 中有 notify(), full.Signal() 中又有 notify(), 这里唤醒了大家几次?
问题2:
两个消费者都 wait() 等待生产者时,生产者生产完一个通知大家,如果等待的消费者1抢到机会开始消费,之后释放mutex锁时 notify() 通知大家,如果等待的消费者2抢到机会消费,但是却是没有可以消费的... 这里是怎么造成的那?
谢谢
因为标准的 Semaphore 不允许负值,使用 while(value<=0) 进行 wait(), 然后才能 value--.
但是如果允许负值,则可以实现等待队列。这个问题就是要实现队列。如果使用 value--; while(value<0) wait() 就会死锁,但是需要修改为 if 才没有死锁。这是为什么那?
AQS 源码解读
一、AQS
AQS
是 AbstractQueuedSynchronizer
的简称,又称为同步阻塞队列,是 Java
中的一个抽象类。在其内部维护了一个由双向链表实现的 FIFO
线程等待队列,同时又提供和维护了一个共享资源 state
,像我们平常使用的 ReentrantLock、Semaphore、ReentrantReadWriteLock、SynchronousQueue、FutureTask
等都是基于AQS
进行实现的。
AQS
可以实现什么功能呢?在 AQS
中不考虑资源的获取和释放,主要关注资源获取不到时,如何将线程加入队列以及阻塞,当释放资源后,如何再进行线程的出列和唤醒。而对于资源的操作则交予具体实现的子类进行完成。
在此基础上 AQS
为了使线程的控制可以更灵活,又提供了两种同步模型,独占模式和共享模式:
-
独占模式:表示并发情况下只有一个线程能执行,其余则需等待,例如
Lock
锁,一次只能有一个线程获取到锁。 -
共享模式:允许多线程根据规则执行,例如
Semaphore
进行多个线程的协调。
AQS
已经帮我们实现了队列的维护,以及线程的等待和唤醒,但是具体资源的获取和释放都需要由继承类实现,对于资源的获取和释放也是区分了独占模式和共享模式,相应方法如下:
//查询是否正在独占资源,condition会使用
boolean isHeldExclusively()
//独占模式,尝试获取资源,成功则返回true,失败则返回false
boolean tryAcquire(int arg)
//独占模式,尝试释放资源,成功则返回true,失败则返回false
boolean tryRelease(int arg)
//共享模式,尝试获取资源,如果返回负数表示失败,否则表示成功。
int tryAcquireShared(int arg)
//共享模式,尝试释放资源,成功则返回true,失败则返回false。
boolean tryReleaseShared(int arg)
例如在 ReentrantLock
公平锁中,tryAcquire
的实现逻辑如下:
protected final boolean tryAcquire(int acquires)
// 当前线程
final Thread current = Thread.currentThread();
// AQS 中共享 state
int c = getState();
if (c == 0)
// 如果队列中没有其他线程,并对state进行修改,
// 如果修改成功则设置独占锁的线程为当前线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires))
setExclusiveOwnerThread(current);
return true;
else if (current == getExclusiveOwnerThread())
// 如果独占线程就是当前线程,则是重入的场景,对 state + 1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
// 如果都没成功,则获取锁失败
return false;
可以看到在 ReentrantLock
公平锁中,通过 state
的值来标识是否有锁资源可用,并且重入情况下也是对 state
的值进行修改标识 ,对于 state
的修改和判断是否有等待队列线程, AQS
中都提供了相应的方法。
在 AQS
中几个核心的方法如下,同样区分了独占模式和共享模式:
// 返回共享资源的当前值
final int getState()
// 设置共享资源的值
final void setState(int newState)
// CAS设置共享资源的值
final boolean compareAndSetState(int expect, int update)
// 独占模式获取同步资源,会调用重写的tryAcquire(int arg),
// 如果获取成功,则不做任何处理,否则将会加入同步队列并挂起线程等待
final void acquire(int arg)
// 独占模式式获取同步资源,但是可以响应中断
final void acquireInterruptibly(int arg)
// 独占模式获取同步资源,但多出了超时时间,
// 如果当前线程在 nanosTimeout 时间内没有获取到同步资源,
// 那么将会返回false,否则返回true
final boolean tryAcquireNanos(int arg, long nanosTimeout)
// 独占模式式释放同步资源,会调用重写的 tryRelease(int arg) 方法,
// 在释放同步资源之后,会将同步队列中第一个节点包含的线程唤醒
final boolean release(int arg)
// 共享模式式获取同步资源,会调用重写的 tryAcquireShared(int arg) ,
// 如果当前线程未获取到同步资源,会加入同步队列等待,
// 和独占式的区别这里 tryAcquireShared(int arg) < 0 时才认为未获取到资源
final void acquireShared(int arg)
// 共享模式式获取同步资源,可以响应中断
final void acquireSharedInterruptibly(int arg)
// 共享模式获取同步资源,但多出了超时时间
final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
// 共享式释放同步资源,会调用重写的 tryReleaseShared(int arg) 方法,
// 在释放同步资源之后,会将同步队列中第一个节点包含的线程唤醒
final boolean releaseShared(int arg)
下面一起从源码的角度,分析 AQS
是如何实现线程的协调和管理的。
二、共享资源 state
共享资源 state
就是 AQS
中的一个 int
类型的全局变量,使用了 volatile
进行修饰,保证了多线程下的数据可见性,并且 AQS
为其提供了普通和 CAS
方式的修改方法,该共享资源主要用来做资源的标记。
例如:
在 ReentrantLock
锁中用来表示是否获取到锁,默认情况 0
表示无锁状态,获取到锁后进行 +1
,如果是重入的场景下同样进行 +1
,最后释放锁后再进行 -1
。
在 Semaphore
中用来表示信号量的标记,当获取信号量时 state
进行 -1
,释放信号量再进行 +1
:
三、FIFO 阻塞队列
在 AQS
中阻塞队列采用双向链表进行实现,具体源码如下:
//等待队列节点类,双向链表
static final class Node
// 标记,指示节点正在共享模式下等待
static final Node SHARED = new Node();
// 标记,指示节点正在独占模式下等待
static final Node EXCLUSIVE = null;
// waitStatus值表示线程已取消
static final int CANCELLED = 1;
// waitStatus值表示后继线程需要唤醒
static final int SIGNAL = -1;
// waitStatus值,表示线程正在等待状态
static final int CONDITION = -2;
// waitStatus值指示下一个被获取的应该无条件的传播
static final int PROPAGATE = -3;
// 线程等待状态
volatile int waitStatus;
// 上一个节点
volatile Node prev;
// 下一个节点
volatile Node next;
// 当前线程
volatile Thread thread;
// 节点的模式,独占还是贡献
Node nextWaiter;
// 是否为共享模型
final boolean isShared()
return nextWaiter == SHARED;
// 获取上一个节点
final Node predecessor() throws NullPointerException
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
Node() // Used to establish initial head or SHARED marker
Node(Thread thread, Node mode) // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
Node(Thread thread, int waitStatus) // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
其中通过 nextWaiter
表示当前线程是独占模型还是共享模式,线程的所属状态使用 waitStatus
来进行表示,其中包括:
- 默认值为
0
,表示当前节点在sync
队列中,等待着获取资源。 CANCELLED
,值为1
,表示当前的线程被取消。SIGNAL
,值为-1
,释放资源后需唤醒后继节点。CONDITION
,值为-2
, 等待condition
唤醒。PROPAGATE
,值为-3
,工作于共享锁状态,需要向后传播,比如根据资源是否剩余,唤醒后继节点。
四、独占模式 acquire 获取资源
在 AQS
acquire()
方法中,首先调用子类的 tryAcquire
获取资源,如果资源获取成功则不做任何处理,如果失败则首先使用 addWaiter
将当前线程加入到队列中,并指定 Node
的类型为独占模式:
在 addWaiter
方法中,会将当前线程的 Node
加入到队列的尾端,如果尾节点为空或修改尾节点失败则进入到 enq
中使用自旋的方式修改:
在 enq
方法中可以看出,当为节点为空时,也就是队列中无数据时,会初始化一个空的 Head
节点。
再回到 acquire()
方法,加入队列后会进入到 acquireQueued
方法中,在该方法循环中如果当前节点 的pred
上一个节点是 head
节点的话,那该节点不就是第一个节点吗,因为从上面就可以看出,初始情况下 head
是一个空的 node
,那 head
的下一个节点不就是第一个进入到队列的节点了,这种情况下遵循队列先进先出的原则,再次尝试是否能获取到资源,如果可以成功获取资源到则将当前节点置为 head
节点,同时再次将 head
节点置为空 node
,此时线程也无需阻塞可以直接执行:
但是如果当前节点的上一个节点不是 head
节点,或者没有获取到资源,则此时需要进行挂起阻塞,下面首先会触发 shouldParkAfterFailedAcquire
方法,这里先看后面的 parkAndCheckInterrupt
方法,该方法主要做了将线程挂起阻塞的作用,采用 LockSupport.park
进行线程的阻塞:
再来看 shouldParkAfterFailedAcquire
方法就是控制当前线程是否需要挂起,这里就需要使用到 Node
中的 waitStatus
,在该方法中有三种类型的判断:
- 如果当前是
SIGNAL
状态则可以直接挂起 - 当
waitStatus
大于0
时,在Node
中waitStatus
大于0
的状态就是CANCELLED
状态,也就是标识线程被取消了,此时这种线程进行阻塞也就没有意义了,那就一直循环向上取线程未被取消的作为当前节点,继续执行。 - 当
waitStatus
小于等于0
时,将状态置为SIGNAL
类型
后面当阻塞的线程被唤醒后,会继续在 acquireQueued
的循环中,不断找寻第一个入队的线程进行尝试获取资源操作。
五、独占模式 release 释放资源
在 release
方法中,首先会调用子类的 tryRelease
方法释放资源:
然后会将当前的 head
节点传入 unparkSuccessor
方法中,在该方法中首先将该Node
节点的 waitStatus
修改到默认的 0
值,然后获取到下一个节点,因为 head
节点始终保持为空节点,下一个节点才是真正的队列中第一个线程。但如果下一个节点为空的话,或者已经被取消了,则循环从 tail
节点向上找最前面正常的节点,最后直接使用 LockSupport.unpark
唤醒该节点的线程:
六、共享模式 acquireShared 获取资源
在 acquireShared
方法中,会首先调用子类的 tryAcquireShared
方法获取资源,但与独占模式不同的是,这里当资源的数量小于 0
时,则认为获取资源失败:
当资源获取失败时,会进入到 doAcquireShared
方法,在该方法中同样先将自己加入到阻塞队列中,将 Node
的类型设为 Node.SHARED
共享模式:
下面的判断逻辑和独占模式差不多,取当前节点的上一个节点,如果是 head
节点,那当前节点便是队列的第一个线程,此时则可以尝试获取资源,如果资源大于 0
认为获取资源成功,则将当前节点置为 head
节点:
在 setHeadAndPropagate
方法中,与独占模式不同,将当前节点置为 head
节点后并没有进行置空操作,而且又会判断资源大于 0
的话,通过 doReleaseShared
唤醒更多的线程继续执行:
这里 doReleaseShared
方法的逻辑,在下面 releaseShared
解读时进行解释:
回到 doAcquireShared
方法中,下面 shouldParkAfterFailedAcquire
和 parkAndCheckInterrupt
则和独占模式调用方法相同,将符合条件的线程进行阻塞:
后面当阻塞的线程被唤醒后,会继续在 doAcquireShared
的循环中,不断找寻第一个入队的线程进行尝试获取资源操作。
七、共享模式 releaseShared 释放资源
在 releaseShared
方法中,会首先调用子类的 tryReleaseShared
方法释放资源:
释放资源后会进到 doReleaseShared
方法唤醒等待的线程,对 head
节点进行唤醒:
当 head
节点唤醒后,会和 doAcquireShared
的方法中的 setHeadAndPropagate
形成呼应,如果获取到的资源数大于 0
则继续使用 doReleaseShared
进行唤醒,从而控制多个线程执行。
八、总结
AQS
没有限制具体某个场景的应用,但通过其内部维护的 FIFO
队列和共享资源 state
便可以实现很多种不同的场景,在阅读了 AQS
源码后,应该有了更深入的理解,后面再去看 ReentrantLock、Semaphore
等的源码会发现很容易理解。
以上是关于java semaphore 实现等待队列问题为啥死锁的主要内容,如果未能解决你的问题,请参考以下文章
Java并发编程:CountDownLatchCyclicBarrier和Semaphore (总结)
java8 中concurrenthashmap数据结构和HashMap一样,且线程安全 为啥还要HashMap