Semaphore 使用及源码分析

Posted 小猪快跑22

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Semaphore 使用及源码分析相关的知识,希望对你有一定的参考价值。

前言:

之前的博文也做了一些关于AQS相关的线程同步的分析,这篇分析下 Semaphore 的使用和原理。

一、Semaphore 的用法

Semaphore就类似于许可证的概念,获取到许可就可以执行,不然就得等待。
举个例子:一个超小型停车场只有3个车位,那么前3名来停车是可以获取到许可的,即可以停车;后面来的车辆必须要等前面的3辆车之一的出去,即释放许可。

Semaphore semaphore = new Semaphore(2);

semaphore.acquire() 默认的表示申请一个许可证,还有带参数的,表示申请多个许可证 public void acquire(int permits)

release() : 表示释放一个许可证;同样的 public void release(int permits) 表示释放多个许可证。

写个例子,例子如下:

 Semaphore semaphore = new Semaphore(2);
 new Thread("A") {
     @Override
     public void run() {
         try {
             semaphore.acquire();
             System.out.println("thread "+Thread.currentThread().getName() + " >>> start , time = "+System.currentTimeMillis());
             Thread.sleep(2_000);
             semaphore.release(1);
             System.out.println("thread "+Thread.currentThread().getName() + " >>> end !!! , time = "+System.currentTimeMillis())
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }
 }.start();
 new Thread("B") {
     @Override
     public void run() {
         try {
             semaphore.acquire();
             System.out.println("thread "+Thread.currentThread().getName() + " >>> start , time = "+System.currentTimeMillis());
             Thread.sleep(2_000);
             semaphore.release();
             System.out.println("thread "+Thread.currentThread().getName() + " >>> end !!!, time = "+System.currentTimeMillis());
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }
 }.start();
 new Thread("C") {
     @Override
     public void run() {
         try {
             semaphore.acquire();
             System.out.println("thread "+Thread.currentThread().getName() + " >>> start , time = "+System.currentTimeMillis());
             Thread.sleep(2_000);
             semaphore.release();
             System.out.println("thread "+Thread.currentThread().getName() + " >>> end !!!, time = "+System.currentTimeMillis());
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
     }
 }.start();

例子中 Semaphore 的许可证为2,表示最多同时允许2个线程获取许可。然后线程 A 、B、C分别去获取许可,可以发现,线程C 只有等线程A线程B释放了许可之后才可以获取许可继续执行。

结果如下:

thread A >>> start , time = 1631950075265
thread B >>> start , time = 1631950075266
thread A >>> end !!! , time = 1631950077270
thread C >>> start , time = 1631950077270
thread B >>> end !!!, time = 1631950077270
thread C >>> end !!!, time = 1631950079275

二、aquire() 方法解析

Semaphore 的内部类 Sync 继承了AQS: abstract static class Sync extends AbstractQueuedSynchronizer

非公平锁的实现类 NonfairSync继承了Sync,如下:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

前面说过,Semaphore分为公平锁和非公平锁,这里主要说非公平锁。

aquire 方法如下:

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly 调用的是AQS的方法:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
         // 线程是否被中断,是则抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 尝试获取锁,小于0表示获取失败,
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared 调用的是 Semaphore 内部类 NonfairSync 的方法,如下:

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

nonfairTryAcquireSharedNonfairSync 父类 Sync 的方法:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 获取当前剩余许可
        int available = getState();
        // 当前剩余许可减去你申请的许可
        int remaining = available - acquires;
        // 小于0 或者 cas 操作设置 state值失败则返回
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

如果 tryAcquireShared 返回值小于0,就执行 doAcquireSharedInterruptibly,如下:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 把当前结点对应的线程插入到等待队列的尾部,且标记改结点是共享的
    final Node node = addWaiter(Node.SHARED); // 注释【1】
    boolean failed = true;
    try {
        for (;;) { // 自旋
            // 获取 node 的前驱结点
            final Node p = node.predecessor(); // 注释【2】
            // 如果前驱结点是头结点 head
            if (p == head) { // 注释【3】
                // 尝试去获取许可证
                int r = tryAcquireShared(arg); // 注释【4】
                // r>=0表示获取到证书,那么设置当前结点为头结点,并且尝试唤醒 node的后继结点
                if (r >= 0) {
                    setHeadAndPropagate(node, r); // 注释【5】
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 主要是判断是否需要挂起线程
            if (shouldParkAfterFailedAcquire(p, node) // 注释【6】
            && parkAndCheckInterrupt()) // 挂起当前线程 // 注释【7】
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上面的注释大家可能看的比较懵逼,下面举个例子来加以说明:

假设 Semaphore semaphore = new Semaphore(2),有4个线程A、B、C、D去获取许可,假设4个线程的都是执行耗时的任务。

由于开始有2个许可,那么线程A和B是可以拿到许可继续执行任务的,此时剩余许可等于0了,那么 线程C 再去获取许可会失败,会走到上面的方法 doAcquireSharedInterruptibly中;

注释【1】:addWaiter(Node.SHARED) 的方法如下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail; // 注释【1】
    if (pred != null) { // 注释【2】
        node.prev = pred; // 注释【3】
        if (compareAndSetTail(pred, node)) { // 注释【4】
            pred.next = node;
            return node;
        }
    }
    enq(node); // 注释【5】
    return node;
}
  1. new 一个结点:
Node(Thread thread, Node mode) {
    this.nextWaiter = mode;
    this.thread = thread;
}

设置 nextWaiterNode.SHARED 结点,作用就是用来判断该结点是否是共享结点

final boolean isShared() {
    return nextWaiter == SHARED;
}

由于线程C是第一个申请许可失败的,所以 addWaiter 方法的注释【1】中,pred = tail 是等于 null 的,走到 注释【5】的 enq(node),如下:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // t = null 表示等待队列为空需要初始化
        if (t == null) { 
            // new 一个空结点作为头结点 head
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // 设置 node 为尾结点
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

执行完上述的操作如下图:

addWaiter方法总结:操作其实很简单就是双链表的尾插法,主要有2点:

  1. 如果等待队列为空,即队列未初始化,那么需要新建一个空的结点,然后让head指向该空结点
  2. 然后采用尾插法将当前线程对应的结点插入队列的尾部,注意,由于这里面是共享队列,所以 nodenextWaiter = Node.SHARED

分析到这里,其实才分析了 doAcquireSharedInterruptibly 方法注释【1】,现在来看注释【2】:
final Node p = node.predecessor()p 指向 node的前驱结点,即 head 结点;由于 p 指向 head ,所以注释【3】成立;然后走到注释【4】再次尝试去获取许可证;
我们假设 线程A线程B执行的是耗时任务,所以再次获取许可证还是会失败,会走到注释【6】的 shouldParkAfterFailedAcquire方法,如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 表示已经设置为SIGNAL,需要在释放的时候唤醒后继结点
        return true;
    if (ws > 0) {
        /*
         * 表示该结点pred已经被取消,那么跳过该结点,然后从该结点向前重试,
         * 跳过已取消的结点,直到遇到未被取消的结点
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 设置当前线程结点的前驱结点的waitStatus为SIGNAL,
        // 为SIGNAL则表示该结点的后继结点需要被唤醒
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

线程C 走到这里,由于线程C结点的前驱结点head,且 headwaitStatus = 0,所以走到
compareAndSetWaitStatus(pred, ws, Node.SIGNAL)即设置 headwaitStatus = SIGNAL,然后返回false,接着继续 doAcquireSharedInterruptibly方法的for循环;最终还是走到 shouldParkAfterFailedAcquire中,此时的 pred 对应的任然是 head,即 waitStatusSIGNAL,直接返回true,返回ture 则会执行 doAcquireSharedInterruptibly 方法中的 注释【7】,执行parkAndCheckInterrupt 挂起当前线程,如下图:

后续的线程D 申请许可的时候,流程还是一样的,如下图:

上述就是获取许可证的流程,下面分析下释放许可的流程。

三、release 方法

semaphore.release() 调用的方法如下:

public void release() {
    sync.releaseShared(1);
}

然后调用AQS类中的方法,如下:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

我们假设此时线程A的任务执行完了,要释放许可了,首先执行tryReleaseShared,,调用的是Semaphore 内部类Sync中的方法如下:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState(); // 获取当前许可证数量
        // 当前许可证数量加上要释放的许可证数量,默认的加1
        int next = current + releases; 
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
            // 通过cas更新 state 数量为next
        if (compareAndSetState(current, next))
            return true;
    }
}

那么,线程A执行完 tryReleaseShared 方法后,state = 1。然后执行doReleaseShared,如下:

private void doReleaseShared() {
   
    for (;;) {
        Node h = head;
        // h 不等于 null且 不等于 tail
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // ws = SIGNAL,表示后继结点需要唤醒
            if (ws == Node.SIGNAL) {
                // 设置 头结点 head 的waitStatus = 0,
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h); // 唤醒head结点的后继结点对应的线程
            }
            // ws == 0,那么就设置waitStatus= PROPAGATE
            // 即表示要唤醒head后继结点的后继结点
            // 这种情况可能发生的情况就是线程A释放许可刚通过cas操作
            // compareAndSetWaitStatus(h, Node.SIGNAL, 0)设置waitStatus=0
            // 此时,线程B也恰好释放许可,发现waitStatus=0,那么设置waitStatus=PROPAGATE
            // waitStatus=PROPAGATE就表示要唤醒head后继结点的后继结点
            // 想想也是这个道理,A和B都释放许可了,肯定要唤醒后继结点的后继结点啊,
            // 不然已经有2张许可却只有C拿到许可,D还在傻乎乎的等C释放许可来唤醒D
            else if (ws == 0 && 
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

unparkSuccessor 就是唤醒head后继结点对应的线程,如果该结点被取消了,那么从tail结点开始往前找,找到未被取消结点对应的线程来唤醒。

线程唤醒了,那么被唤醒的线程C要接着执行 doAcquireSharedInterruptibly 中的for循环,部分代码如下:

for (;;) {
    final Node p = node.predecessor();
    if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
            setHeadAndPropagate(node, r);
            p.next = null; // help GC
            failed = false;
            return;
        }
    }
    if (shouldParkAfterFailedAcquire(p, nod
        parkAndCheckInterrupt())
        throw new InterruptedException();
}

线程C继续for循环,由于线程A已经释放了许可,所以此时 r = 0,所以走到setHeadAndPropagate,如下:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node); // 设置当前线程对应的结点为 head
    // propagate > 0表示可以尝试唤醒 node 结点的后继结点
    //  (h = head) == null || h.waitStatus < 0) 可能会引起没必要的唤醒操作
    // 比如线程A任务结束后释放许可,但是线程B任务还没结束,此时线程C获取到许可走到这里
    // 执行完上面的 setHead 后,然后 h = head 即h执行线程C的结点,
    // 而线程C对应的结点的 waitStatus = SIGNAL,所以也会执行doReleaseShared唤醒线程D
    // 线程D唤醒后接着去执行 doAcquireSharedInterruptibly 中的for循环,
    // 执行 tryAcquireShared 去拿许可证的时候发现是小于0,接着还是会挂起线程D
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) { 
        Node s = node.next;
        if (s == null || s.isShared()) // 是否是共享的,就是用到了 NODE.SHARED来表示的
            doReleaseShared(); // 唤醒 node结点的后继结点,这个上面分析过
    }
}

上面setHeadAndPropagate方法参数 propagate > 0的情况就是,线程A线程B差不多同时释放了许可,那么需要唤醒下一个结点的线程。

总结:

  1. 获取许可的时,如果发现许可用完了,则会new Node且设置 nextWaiter = Node.SHARED,然后插入等待队列的尾部,如果等待队列未初始化,则 new 一个空节点作为head。
  2. 释放许可的时候,会唤醒head结点的后继结点,也可能会唤醒后继结点的后继结点,如上所分析的那样。

以上是关于Semaphore 使用及源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Semaphore 源码解读

Semaphore源码分析

Semaphore源码分析

源码分析:Semaphore之信号量

Java并发:Semaphore信号量源码分析

Semaphore 源码分析