Java多线程系列--“JUC锁”11之 Semaphore信号量的原理和示例

Posted 学无止境

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程系列--“JUC锁”11之 Semaphore信号量的原理和示例相关的知识,希望对你有一定的参考价值。

一、Semaphore简介

Semaphore是一个计数信号量,它的本质是一个"共享锁",是基于AQS实现的,通过state变量来实现共享。通过调用acquire方法,对state值减去一,当调用release的时候,对state值加一。当state变量小于0的时候,在AQS队列中阻塞等待。

信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。

 

更多的semaphore介绍见《Java 信号量 Semaphore 入门介绍》,本文从源码层面介绍一下semaphore原理。

二、Semaphore数据结构

Semaphore的UML类图如下:

从图中可以看出:
(01) 和"ReentrantLock"一样,Semaphore也包含了sync对象,sync是Sync类型;而且,Sync是一个继承于AQS的抽象类。
(02) Sync包括两个子类:"公平信号量"FairSync 和 "非公平信号量"NonfairSync。sync是"FairSync的实例",或者"NonfairSync的实例";默认情况下,sync是NonfairSync(即,默认是非公平信号量)。

三、源码分析

Semaphore源码分析(基于JDK1.8)

在《Java 信号量 Semaphore 入门介绍》的示例里,创建了一个拥有5个许可证的信号量,代码片段如下:

// 初始化信号量,个数为 5
private static Semaphore s = new Semaphore(5);

3.1、非公平信号量

我们看一下构造器:

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

从构造器里面可以看出来semaphore默认实现的是非公平锁,我们在看一下NonfairSync类,它是Semaphore的内部类:java.util.concurrent.Semaphore$NonfairSync.java 

3.1.1、非公平信号量 类源码

    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

我们可以看到NonfairSync类继承了Sync,而Sync继承了AQS,从这里其实可以看出来semaphore是基于AQS实现的。

3.1.2、 非公平信号量获取

我们初始化的5个信号量,传递在NonfairSync构造器中,NonfairSync构造器调用了super方法,super方法会调用NonfairSync的父类,也就是Sync的构造器。我们看一下Sync的构造器。

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }
        //其它
     }

从这里可以看出来调用了AQS的setState方法,读过前面的文章应该明白AQS的核心就是内部维护着一个volatile修饰的同步状态值state。所以说当我们new Semaphore(5)时候,实际上是在AQS的框架中初始化了一个同步状态为5的值。

我们在看一下semaphore.acquire()方法,从这里可以看出来调用了AQS中的共享可中断模式,每次改变的状态值是1

public void acquire() throws InterruptedException {//--------------1
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)   
        throws InterruptedException { //---------------------------2
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

如果线程中断则抛出异常,然后调用了tryAcquireShared方法,此方法在AQS中是一个方法。

我们知道AQS内部用的是模版方法的设计模式,也就是顶层已经设计好了架构,tryAcquireShared方法就是其中的一个。NonfairSync继承了Sync,而Sync继承了AQS,我们实例化了Semaphore,而Semaphore在构造器中实例化了NonfairSync,所以AQS调用tryAcquireShared方法的时候,实际上调用的是NonfairSync重写的方法,此处也是java多态的一个表现。从这里可以看出来Doug Lea是宗师级的别的人物,让AQS和上层业务解耦,AQS只关注底层数据的处理,上层交个用户来实现。

我们看一下tryAcquireShared方法做了哪些事情:

protected int tryAcquireShared(int acquires) {//------------------3
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {//---------------4
    for (;;) {
        // 获取“可以获得的信号量的许可数”放入available变量里
        int available = getState();
        // 设置“获得acquires个信号量许可之后,剩余的信号量许可数”
        int remaining = available - acquires;
        //如果“剩余的信号量许可数<0”时,或 表达式为true了,后面的表达式不执行了,直接返回remaining
        // 如果“剩余的信号量许可数>=0”,或 表达式的前半部分位false,后面的表达式继续执行,通过CAS设置“可以获得的信号量许可数”为remaining(新值),返回变化后的新值
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

这就是semphore处理锁的核心逻辑,通过源代码我们发现此处semphore处理获取锁的业务逻辑是:

  1. 获取同步状态值
  2. 每个线程进来就减去请求的值,此处请求的值是1.然后用可用同步状态值减去请求的值得到同步状态剩余的值。
  3. 如果请求的值大于可用的值或者CAS操作把可用值改为剩余可用的值那么就返回剩下可用的值。
  4. 在方法2中,当tryAcquireShared返回值小于0的时候,调用doAcquireSharedInterruptibly方法。此方法主要的目的就是处理那些没有获取到锁的线程在队列中的一个处理

下面是AQS的doAcquireSharedInterruptibly()的实现:

private void doAcquireSharedInterruptibly(long arg)
    throws InterruptedException {
    // 创建”当前线程“的Node节点,且Node中记录的锁是”共享锁“类型;并将该节点添加到CLH队列末尾。
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 获取上一个节点。
            // 如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
            final Node p = node.predecessor();
            if (p == head) {
                long r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 当前线程一直等待,直到获取到共享锁。
            // 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

说明

doAcquireSharedInterruptibly(int arg)会使当前线程一直等待,直到当前线程获取到共享锁(或被中断)才返回。
(01) addWaiter(Node.SHARED)的作用是,创建”当前线程“的Node节点,且Node中记录的锁的类型是”共享锁“(Node.SHARED);并将该节点添加到CLH队列末尾。关于Node和CLH在"Java多线程系列--“JUC锁”03之 公平锁(一)"已经详细介绍过,这里就不再重复说明了。
(02) node.predecessor()的作用是,获取上一个节点。如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
(03) shouldParkAfterFailedAcquire()的作用和它的名称一样,如果在尝试获取锁失败之后,线程应该等待,则返回true;否则,返回false。
(04) 当shouldParkAfterFailedAcquire()返回ture时,则调用parkAndCheckInterrupt(),当前线程会进入等待状态,直到获取到共享锁才继续运行。
doAcquireSharedInterruptibly()中的shouldParkAfterFailedAcquire(), parkAndCheckInterrupt等函数在"Java多线程系列--“JUC锁”03之 公平锁(一)"中介绍过,这里也就不再详细说明了。

看for循环的代码,通过for循环不断的进行自旋操作,去判断当前节点的前继节点是不是头节点,如果前继节点是头节点那么那么就去挣抢锁,如果争抢锁成功那么就把当前节点设置为头节点,同时唤醒队列中的所有节点,一块在去争夺锁。如果线程中断或者阻塞那么就抛出异常。最后如果方法中抛出了异常那么就把当前节点先设置为取消状态然后在清除该节点。

记住独占模式和共享模式的区别就是,独占模式是当前节点获取到锁后,不会释放队列中的所有的节点一块争夺锁,而是按照队列中排好的顺序一个个的释放。而共享模式会在设置为头节点后,把队列中的所有节点释放出来。读过前面的文章其实应该知道doAcquireSharedInterruptibly和doAcquireShared代码几乎一样,只是doAcquireSharedInterruptibly方法在线程阻塞或者中断直接抛出异常,而doAcquireShared方法在线程阻塞和中断是返回的状态值,供上层来处理。

总之:当tryAcquireShared返回值小于0的时候,调用doAcquireSharedInterruptibly方法的源代码。此处tryAcquireShared返回小于0代表的是许可证已经用完,剩余的线程要放在AQS队列中了。

3.1.3、 非公平信号量释放

semphore的释放处理代码:

Semaphore semaphore=new Semaphore(10);
semaphore.release();
public void release() {
    sync.releaseShared(1);
}

此处sync调用了AQS中的方法releaseShared,在这个方法中如果释放成功那么就调用doReleaseShared方法,此方法在前面AQS共享模式一文中已经讲解过,此处不在详细讲解。它主要作用就是释放队列中的节点。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

tryReleaseShared方法也是AQS模版方法中的一个,它会调用Semaphore重写的方法,我们看一下tryReleaseShared释放方法在Semaphore中是怎么实现的:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取“可以获得的信号量的许可数”
        int current = getState();
        // 获取“释放releases个信号量许可之后,剩余的信号量许可数”
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 设置“可以获得的信号量的许可数”为next。
        if (compareAndSetState(current, next))
            return true;
    }
}

 

通过代码我可以发现释放的核心步骤是:

    1. 获取当前同步状态值
    2. 当前同步状态值+释放值得到最新的一个值
    3. 如果得到最新的状态值小于当前获取到的状态值那么就抛出异常
    4. 如果CAS操作把当前得到的值更新为最新的值,那么就返回true。

如果tryReleaseShared()尝试释放共享锁失败,则会调用doReleaseShared()去释放共享锁。doReleaseShared()的源码如下:

private void doReleaseShared() {
    for (;;) {
        // 获取CLH队列的头节点
        Node h = head;
        // 如果头节点不为null,并且头节点不等于tail节点。
        if (h != null && h != tail) {
            // 获取头节点对应的线程的状态
            int ws = h.waitStatus;
            // 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。
            if (ws == Node.SIGNAL) {
                // 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 唤醒“头节点的下一个节点所对应的线程”。
                unparkSuccessor(h);
            }
            // 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果头节点发生变化,则继续循环。否则,退出循环。
        if (h == head)                   // loop if head changed
            break;
    }
}

 

说明:doReleaseShared()会释放“共享锁”。它会从前往后的遍历CLH队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的信号量。

 

3.1.4、执行图

 下面画一个semphore的执行图更好的让大家理解

 

 3.2、公平信号量

创建一个拥有5个许可证的信号量,代码片段如下:

    // 初始化信号量,个数为 5
    private static Semaphore s = new Semaphore(5, true);

 

构造器:

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

 

 3.2.1、公平信号量 源码

    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

我们可以看到FairSync类继承了Sync,而Sync继承了AQS,从这里其实可以看出来semaphore是基于AQS实现的。

3.2.2、 公平信号量获取

Semaphore中的公平信号量是FairSync。它的获取API如下:

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

     public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

信号量中的acquire()获取函数,实际上是调用的AQS中的acquireSharedInterruptibly()。

acquireSharedInterruptibly()的源码如下:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

Semaphore中”公平锁“对应的tryAcquireShared()实现如下:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 判断“当前线程”是不是CLH队列中的第一个线程线程,
        // 若是的话,则返回-1。
        if (hasQueuedPredecessors())//----------------和上面的非公平锁不同
            return -1;
        // 设置“可以获得的信号量的许可数”
        int available = getState();
        // 设置“获得acquires个信号量许可之后,剩余的信号量许可数”
        int remaining = available - acquires;
        // 如果“剩余的信号量许可数<0”时,或 表达式为true了,后面的表达式不执行了,直接返回remaining
//如果“剩余的信号量许可数>=0”,或 表达式的前半部分位false,后面的表达式继续执行,通过CAS设置“可以获得的信号量许可数”为remaining(新值),返回变化后的新值
if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
AQS的hasQueuedPredecessors():
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

 

说明:tryAcquireShared()的作用是尝试获取acquires个信号量许可数。
对于Semaphore而言,state表示的是“当前可获得的信号量许可数”。

3.2.3、 公平信号量的释放

Semaphore中公平信号量(FairSync)的释放API如下:

public void release() {
    sync.releaseShared(1);
}

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

 

信号量的releases()释放函数,实际上是调用的AQS中的releaseShared()。

其它和非公平信号量的释放相同。

 

 

 

四、总结

1、"公平信号量"和"非公平信号量"的区别

"公平信号量"和"非公平信号量"的释放信号量的机制是一样的!不同的是它们获取信号量的机制:线程在尝试获取信号量许可时,对于公平信号量而言,如果当前线程不在CLH队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在CLH队列的头部,它都会直接获取信号量。该差异具体的体现在,它们的tryAcquireShared()函数的实现不同。

2、一般而言,非公平时候的吞吐量要高于公平锁”,这是为什么呢?

非公平锁性能高于公平锁性能的原因:在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟。假设线程A持有一个锁,并且线程B请求这个锁。由于锁被A持有,因此B将被挂起。当A释放锁时,B将被唤醒,因此B会再次尝试获取这个锁。与此同时,如果线程C也请求这个锁,那么C很可能会在B被完全唤醒之前获得、使用以及释放这个锁。这样就是一种双赢的局面:B获得锁的时刻并没有推迟,C更早的获得了锁,并且吞吐量也提高了。当持有锁的时间相对较长或者请求锁的平均时间间隔较长,应该使用公平锁。在这些情况下,插队带来的吞吐量提升(当锁处于可用状态时,线程却还处于被唤醒的过程中)可能不会出现。

3、Semaphore与jdk中的Lock的区别

1. 使用Lock.unlock()之前,该线程必须事先持有这个锁(通过Lock.lock()获取),如下:

public class LockTest {
    public static void main(String[] args) {
        Lock lock=new ReentrantLock();
        lock.unlock();
    }
}

则会抛出异常,因为该线程事先并没有获取lock对象的锁:

Exception in thread "main" java.lang.IllegalMonitorStateException
    at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:155)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1260)
    at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:460)
    at LockTest.main(LockTest.java:12)

对于Semaphore来讲,如下:

public class SemaphoreTest {    
    public static void main(String[] args) {
        Semaphore semaphore=new Semaphore(1);//总共有1个许可
        System.out.println("可用的许可数目为:"+semaphore.availablePermits());
        semaphore.release();
        System.out.println("可用的许可数目为:"+semaphore.availablePermits());
    }
}

结果如下:

可用的许可数目为:1
可用的许可数目为:2

i. 并没有抛出异常,也就是线程在调用release()之前并不要求先调用acquire() 
ii. 我们看到可用的许可数目增加了一个,但我们的初衷是保证只有一个许可来达到互斥排他锁的目的,所以这里要注意一下

参考:https://blog.csdn.net/qiang_zi_/article/details/104526769?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-0&spm=1001.2101.3001.4242

https://www.cnblogs.com/duanxz/p/6063711.html

以上是关于Java多线程系列--“JUC锁”11之 Semaphore信号量的原理和示例的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程系列--“JUC锁”07之 LockSupport

Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock

Java多线程系列--“JUC锁”05之 非公平锁

Java多线程系列--“JUC锁”02之 互斥锁ReentrantLock

Java多线程系列--“JUC锁”03之 公平锁

Java多线程系列--“JUC锁”04之 公平锁