java semaphore 实现等待队列问题为啥死锁

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java semaphore 实现等待队列问题为啥死锁相关的知识,希望对你有一定的参考价值。

两个生产者,每个都生产三十次。
两个消费者,每个都消费三十次。
篮子初始化四个苹果,总容量十个。
full 为 semaphone(4)
empty 为 semaphore(6)
mutex 为全局锁 semaphore(1)
消费者:
full.Wait();
mutex.Wait();
消费....
mutex.Signal();
empty.Signal();

生产者:
empty.Wait();
mutex.Wait();
生产....
mutex.Signal();
full.Signal

Semaphore 同步的 Wait()
this.value--;
if(value<0) wait; // 省略 try catch
.

常规的 Semaphore 使用 while(value < 0) 控制 wait(). 但是上述的 Semaphore 使用 while() 就死锁,使用 if 没有任何问题。
我的问题:
问题1:
两个消费者都 wait() 等待生产者时,生产者生产完 mutex.Signal() 中有 notify(), full.Signal() 中又有 notify(), 这里唤醒了大家几次?
问题2:
两个消费者都 wait() 等待生产者时,生产者生产完一个通知大家,如果等待的消费者1抢到机会开始消费,之后释放mutex锁时 notify() 通知大家,如果等待的消费者2抢到机会消费,但是却是没有可以消费的... 这里是怎么造成的那?

谢谢
因为标准的 Semaphore 不允许负值,使用 while(value<=0) 进行 wait(), 然后才能 value--.

但是如果允许负值,则可以实现等待队列。这个问题就是要实现队列。如果使用 value--; while(value<0) wait() 就会死锁,但是需要修改为 if 才没有死锁。这是为什么那?

比如有两个线程执行,线程t1, 线程t2 t1 需要获取方法A的锁标志,同时方法A调用了方法B,t1获取了A的锁标志,并获取了B的锁标志,才能完成执行 同时t2也在执行,t2获取方法B的锁标志,方法B调用了方法A,t2也需要获取两个方法A,B的锁标志才能执行完成 当t1 获取了A方法的锁标志,同时t2获取了B方法的锁标志 那么t1会等待t2释放方法B的锁标志,t2也在等待t1释放方法A的锁标志,这样就形成了死锁,都在等待.... 参考技术A class Stack<T> private Vector<T> v; public Stack() v = new Vector<T>(); public T pop() if (v.size()==0) return null; return v.get(v.size()-1); public void push(T t) v.add(t); public boolean isEmpty() return v.size()==0; class Queue<T> private Vector<T> v; public Queue() v = new Vector<T>(); //入队列 public void enqueue(T t) v.add(t); //出队列 public T dequeue() if (v.size()==0) return null; return v.get(0); public boolean isEmpty() return v.size() == 0;

AQS 源码解读

一、AQS

AQSAbstractQueuedSynchronizer 的简称,又称为同步阻塞队列,是 Java 中的一个抽象类。在其内部维护了一个由双向链表实现的 FIFO 线程等待队列,同时又提供和维护了一个共享资源 state ,像我们平常使用的 ReentrantLock、Semaphore、ReentrantReadWriteLock、SynchronousQueue、FutureTask等都是基于AQS 进行实现的。

AQS 可以实现什么功能呢?在 AQS 中不考虑资源的获取和释放,主要关注资源获取不到时,如何将线程加入队列以及阻塞,当释放资源后,如何再进行线程的出列和唤醒。而对于资源的操作则交予具体实现的子类进行完成。

在此基础上 AQS 为了使线程的控制可以更灵活,又提供了两种同步模型,独占模式共享模式

  • 独占模式:表示并发情况下只有一个线程能执行,其余则需等待,例如 Lock 锁,一次只能有一个线程获取到锁。

  • 共享模式:允许多线程根据规则执行,例如 Semaphore 进行多个线程的协调。

AQS 已经帮我们实现了队列的维护,以及线程的等待和唤醒,但是具体资源的获取和释放都需要由继承类实现,对于资源的获取和释放也是区分了独占模式和共享模式,相应方法如下:

//查询是否正在独占资源,condition会使用
boolean isHeldExclusively()	
//独占模式,尝试获取资源,成功则返回true,失败则返回false
boolean tryAcquire(int arg)
//独占模式,尝试释放资源,成功则返回true,失败则返回false
boolean tryRelease(int arg)
//共享模式,尝试获取资源,如果返回负数表示失败,否则表示成功。
int tryAcquireShared(int arg)
//共享模式,尝试释放资源,成功则返回true,失败则返回false。
boolean tryReleaseShared(int arg)

例如在 ReentrantLock 公平锁中,tryAcquire 的实现逻辑如下:

protected final boolean tryAcquire(int acquires) 
        // 当前线程
        final Thread current = Thread.currentThread();
        // AQS 中共享 state
        int c = getState();
        if (c == 0) 
            // 如果队列中没有其他线程,并对state进行修改,
            // 如果修改成功则设置独占锁的线程为当前线程
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) 
                setExclusiveOwnerThread(current);
                return true;
            
        
        else if (current == getExclusiveOwnerThread()) 
            // 如果独占线程就是当前线程,则是重入的场景,对 state + 1
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        
        // 如果都没成功,则获取锁失败
        return false;
    

可以看到在 ReentrantLock 公平锁中,通过 state 的值来标识是否有锁资源可用,并且重入情况下也是对 state 的值进行修改标识 ,对于 state 的修改和判断是否有等待队列线程, AQS 中都提供了相应的方法。

AQS 中几个核心的方法如下,同样区分了独占模式和共享模式:

// 返回共享资源的当前值
final int getState()
// 设置共享资源的值
final void setState(int newState)
// CAS设置共享资源的值
final boolean compareAndSetState(int expect, int update)

// 独占模式获取同步资源,会调用重写的tryAcquire(int arg),
// 如果获取成功,则不做任何处理,否则将会加入同步队列并挂起线程等待
final void acquire(int arg)
// 独占模式式获取同步资源,但是可以响应中断
final void acquireInterruptibly(int arg)
// 独占模式获取同步资源,但多出了超时时间,
// 如果当前线程在 nanosTimeout 时间内没有获取到同步资源,
// 那么将会返回false,否则返回true
final boolean tryAcquireNanos(int arg, long nanosTimeout)
// 独占模式式释放同步资源,会调用重写的 tryRelease(int arg) 方法,
// 在释放同步资源之后,会将同步队列中第一个节点包含的线程唤醒
final boolean release(int arg)

// 共享模式式获取同步资源,会调用重写的 tryAcquireShared(int arg) ,
// 如果当前线程未获取到同步资源,会加入同步队列等待,
// 和独占式的区别这里 tryAcquireShared(int arg) < 0 时才认为未获取到资源
final void acquireShared(int arg)
// 共享模式式获取同步资源,可以响应中断
final void acquireSharedInterruptibly(int arg)
// 共享模式获取同步资源,但多出了超时时间
final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
// 共享式释放同步资源,会调用重写的 tryReleaseShared(int arg) 方法,
// 在释放同步资源之后,会将同步队列中第一个节点包含的线程唤醒
final boolean releaseShared(int arg)

下面一起从源码的角度,分析 AQS 是如何实现线程的协调和管理的。

二、共享资源 state

共享资源 state 就是 AQS 中的一个 int 类型的全局变量,使用了 volatile 进行修饰,保证了多线程下的数据可见性,并且 AQS 为其提供了普通和 CAS 方式的修改方法,该共享资源主要用来做资源的标记。

例如:

ReentrantLock 锁中用来表示是否获取到锁,默认情况 0 表示无锁状态,获取到锁后进行 +1 ,如果是重入的场景下同样进行 +1 ,最后释放锁后再进行 -1

Semaphore 中用来表示信号量的标记,当获取信号量时 state 进行 -1 ,释放信号量再进行 +1


三、FIFO 阻塞队列

AQS 中阻塞队列采用双向链表进行实现,具体源码如下:

	//等待队列节点类,双向链表
    static final class Node 
        // 标记,指示节点正在共享模式下等待
        static final Node SHARED = new Node();
        // 标记,指示节点正在独占模式下等待
        static final Node EXCLUSIVE = null;

        //  waitStatus值表示线程已取消
        static final int CANCELLED = 1;
        //  waitStatus值表示后继线程需要唤醒
        static final int SIGNAL = -1;
        //  waitStatus值,表示线程正在等待状态
        static final int CONDITION = -2;
        // waitStatus值指示下一个被获取的应该无条件的传播
        static final int PROPAGATE = -3;

        // 线程等待状态
        volatile int waitStatus;

        // 上一个节点
        volatile Node prev;

        // 下一个节点
        volatile Node next;

        // 当前线程
        volatile Thread thread;

        // 节点的模式,独占还是贡献
        Node nextWaiter;

        // 是否为共享模型
        final boolean isShared() 
            return nextWaiter == SHARED;
        

        // 获取上一个节点
        final Node predecessor() throws NullPointerException 
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        

        Node()     // Used to establish initial head or SHARED marker
        

        Node(Thread thread, Node mode)      // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        

        Node(Thread thread, int waitStatus)  // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        
    

其中通过 nextWaiter 表示当前线程是独占模型还是共享模式,线程的所属状态使用 waitStatus 来进行表示,其中包括:

  • 默认值为 0,表示当前节点在 sync 队列中,等待着获取资源。
  • CANCELLED,值为1,表示当前的线程被取消。
  • SIGNAL,值为-1,释放资源后需唤醒后继节点。
  • CONDITION,值为-2, 等待condition唤醒。
  • PROPAGATE,值为-3,工作于共享锁状态,需要向后传播,比如根据资源是否剩余,唤醒后继节点。

四、独占模式 acquire 获取资源

AQS acquire() 方法中,首先调用子类的 tryAcquire 获取资源,如果资源获取成功则不做任何处理,如果失败则首先使用 addWaiter 将当前线程加入到队列中,并指定 Node 的类型为独占模式:

addWaiter 方法中,会将当前线程的 Node 加入到队列的尾端,如果尾节点为空或修改尾节点失败则进入到 enq 中使用自旋的方式修改:

enq 方法中可以看出,当为节点为空时,也就是队列中无数据时,会初始化一个空的 Head 节点。

再回到 acquire() 方法,加入队列后会进入到 acquireQueued 方法中,在该方法循环中如果当前节点 的pred 上一个节点是 head 节点的话,那该节点不就是第一个节点吗,因为从上面就可以看出,初始情况下 head 是一个空的 node ,那 head 的下一个节点不就是第一个进入到队列的节点了,这种情况下遵循队列先进先出的原则,再次尝试是否能获取到资源,如果可以成功获取资源到则将当前节点置为 head 节点,同时再次将 head 节点置为空 node,此时线程也无需阻塞可以直接执行:


但是如果当前节点的上一个节点不是 head 节点,或者没有获取到资源,则此时需要进行挂起阻塞,下面首先会触发 shouldParkAfterFailedAcquire 方法,这里先看后面的 parkAndCheckInterrupt 方法,该方法主要做了将线程挂起阻塞的作用,采用 LockSupport.park 进行线程的阻塞:

再来看 shouldParkAfterFailedAcquire 方法就是控制当前线程是否需要挂起,这里就需要使用到 Node 中的 waitStatus,在该方法中有三种类型的判断:

  • 如果当前是 SIGNAL 状态则可以直接挂起
  • waitStatus大于 0 时,在 NodewaitStatus 大于 0 的状态就是 CANCELLED 状态,也就是标识线程被取消了,此时这种线程进行阻塞也就没有意义了,那就一直循环向上取线程未被取消的作为当前节点,继续执行。
  • waitStatus小于等于 0 时,将状态置为 SIGNAL 类型

后面当阻塞的线程被唤醒后,会继续在 acquireQueued 的循环中,不断找寻第一个入队的线程进行尝试获取资源操作。

五、独占模式 release 释放资源

release 方法中,首先会调用子类的 tryRelease 方法释放资源:


然后会将当前的 head 节点传入 unparkSuccessor 方法中,在该方法中首先将该Node节点的 waitStatus 修改到默认的 0 值,然后获取到下一个节点,因为 head 节点始终保持为空节点,下一个节点才是真正的队列中第一个线程。但如果下一个节点为空的话,或者已经被取消了,则循环从 tail 节点向上找最前面正常的节点,最后直接使用 LockSupport.unpark 唤醒该节点的线程:

六、共享模式 acquireShared 获取资源

acquireShared 方法中,会首先调用子类的 tryAcquireShared 方法获取资源,但与独占模式不同的是,这里当资源的数量小于 0 时,则认为获取资源失败:


当资源获取失败时,会进入到 doAcquireShared 方法,在该方法中同样先将自己加入到阻塞队列中,将 Node 的类型设为 Node.SHARED 共享模式:

下面的判断逻辑和独占模式差不多,取当前节点的上一个节点,如果是 head 节点,那当前节点便是队列的第一个线程,此时则可以尝试获取资源,如果资源大于 0 认为获取资源成功,则将当前节点置为 head 节点:

setHeadAndPropagate 方法中,与独占模式不同,将当前节点置为 head 节点后并没有进行置空操作,而且又会判断资源大于 0 的话,通过 doReleaseShared 唤醒更多的线程继续执行:

这里 doReleaseShared 方法的逻辑,在下面 releaseShared解读时进行解释:

回到 doAcquireShared 方法中,下面 shouldParkAfterFailedAcquireparkAndCheckInterrupt 则和独占模式调用方法相同,将符合条件的线程进行阻塞:


后面当阻塞的线程被唤醒后,会继续在 doAcquireShared 的循环中,不断找寻第一个入队的线程进行尝试获取资源操作。

七、共享模式 releaseShared 释放资源

releaseShared 方法中,会首先调用子类的 tryReleaseShared 方法释放资源:

释放资源后会进到 doReleaseShared 方法唤醒等待的线程,对 head 节点进行唤醒:

head 节点唤醒后,会和 doAcquireShared 的方法中的 setHeadAndPropagate 形成呼应,如果获取到的资源数大于 0 则继续使用 doReleaseShared 进行唤醒,从而控制多个线程执行。

八、总结

AQS 没有限制具体某个场景的应用,但通过其内部维护的 FIFO 队列和共享资源 state便可以实现很多种不同的场景,在阅读了 AQS 源码后,应该有了更深入的理解,后面再去看 ReentrantLock、Semaphore 等的源码会发现很容易理解。

以上是关于java semaphore 实现等待队列问题为啥死锁的主要内容,如果未能解决你的问题,请参考以下文章

Java AQS 概述

Java并发编程:CountDownLatchCyclicBarrier和Semaphore (总结)

java8 中concurrenthashmap数据结构和HashMap一样,且线程安全 为啥还要HashMap

J.U.C锁之 Semaphore

Linux(内核剖析):31---内核同步之(信号量(semaphore)读写信号量(rw_semaphore))

CountDownLatchCyclicBarrier和Semaphore