Semaphore源码解析

Posted lomoye

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Semaphore源码解析相关的知识,希望对你有一定的参考价值。

应用场景

举个栗子,30个人上一个公共厕所,但是只有3个坑位

public class SemaphoreTest1 {


    private Semaphore semaphore = new Semaphore(3);

    //上厕所
    public void wc() {
        try {
            semaphore.acquire();
            System.out.println("我占个坑位");
            Thread.sleep(5000L);
            System.out.println("我好了");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }

    public static void main(String[] args) {
        SemaphoreTest1 test = new SemaphoreTest1();
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < 30; i++) {
            Thread thread = new Thread(() -> {
                test.wc();
            });
            threadList.add(thread);
        }
        for (Thread thread : threadList) {
            thread.start();
        }
    }
}

semaphore.acquire

    /**
     * Acquires a permit from this semaphore, blocking until one is
     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
     *(从信号量获取一个许可,阻塞直到一个可用或者线程被中断)
     * <p>Acquires a permit, if one is available and returns immediately,
     * reducing the number of available permits by one.
     *(获取一个许可,如果一个可用将立刻返回,减少一个可用许可的数量)
     * <p>If no permit is available then the current thread becomes
     * disabled for thread scheduling purposes and lies dormant until
     * one of two things happens:
     *(如果没有许可可用,那么当前线程将不可用将休眠至两件事情之一发生)
     * <ul>
     * <li>Some other thread invokes the {@link #release} method for this
     * semaphore and the current thread is next to be assigned a permit; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     *(一些其它线程调用release方法在这个信号量上,当前线程将去获取一个许可;或者一些其它线程中断当前线程)
     * </ul>
     *
     * <p>If the current thread:
     *(如果当前线程)
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * for a permit,
     *(当等待获取许可的过程中设置了中断状态,将会抛出InterruptedException异常并清除掉中断中断)
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread‘s
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *(当前线程被中断将抛出InterruptedException异常)
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

sync#acquireSharedInterruptibly

    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        //中断检测
        if (Thread.interrupted())
            throw new InterruptedException();
        //①tryAcquireShared 尝试在共享模式下获取锁
        if (tryAcquireShared(arg) < 0)
            //②doAcquireSharedInterruptibly 如果没有获取到锁,则再次尝试
            doAcquireSharedInterruptibly(arg);
    }

①NonFair#tryAcquireShared 这里我们看Semaphore的非公平实现

 /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        //构造函数初始化permit数量
        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            //调用父类Sync的方法
            return nonfairTryAcquireShared(acquires);
        }
    }

Sync#nonfairTryAcquireShared

    final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                //剩余permit数量如果小于0,或者置换state为remaining成功,则返回剩余的permit量
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
    }

tryAcquireShared方法如果返回值小于0,说明permit已经被占用完了,需要调用doAcquireSharedInterruptibly

②AQS#doAcquireSharedInterruptibly

    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //同步队列加入共享模式的节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //前继节点
                final Node p = node.predecessor();
                //如果前继节点是头节点,则尝试获取锁
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //r > 0 说明获取到锁了
                    if (r >= 0) {
                        //①将头节点设置为当前节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //检查是否要阻塞,如果将进入休眠状态,休眠结束后续检查当前线程有无被中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

①AQS#setHeadAndPropagate

/**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don‘t know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

semaphore.release

    /**
     * Releases a permit, returning it to the semaphore.
     *
     * <p>Releases a permit, increasing the number of available permits by
     * one.  If any threads are trying to acquire a permit, then one is
     * selected and given the permit that was just released.  That thread
     * is (re)enabled for thread scheduling purposes.
     *
     * <p>There is no requirement that a thread that releases a permit must
     * have acquired that permit by calling {@link #acquire}.
     * Correct usage of a semaphore is established by programming convention
     * in the application.
     */
    public void release() {
        sync.releaseShared(1);
    }

sync#.releaseShared

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        //①cas自旋设置state加上arg
        if (tryReleaseShared(arg)) {
            //②
            doReleaseShared();
            return true;
        }
        return false;
    }

① AQS#tryReleaseShared,具体实现为Semaphore#tryReleaseShared

        protected final boolean tryReleaseShared(int releases) {
            //cas自旋设置state
            for (;;) {
                //获取当前的state
                int current = getState();
                //将当前state值加上releases
                int next = current + releases;
                //检查信号量溢出
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //state设置为新的值
                if (compareAndSetState(current, next))
                    return true;
            }
        }

②AQS#doReleaseShared

    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            //h 头节点
            Node h = head;
            //头节点不为null并且不等于尾节点
            if (h != null && h != tail) {
                //头节点的等待状态
                int ws = h.waitStatus;
                
                if (ws == Node.SIGNAL) {
                    //将Node.SIGNAL状态转换成0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    //唤醒节点的后继节点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         //这里为什么要引入PROPAGATE状态比较复杂 可以参考 https://www.cnblogs.com/micrari/p/6937995.html
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

博文推荐

1.https://segmentfault.com/a/1190000016447307
2.https://www.cnblogs.com/micrari/p/6937995.html

以上是关于Semaphore源码解析的主要内容,如果未能解决你的问题,请参考以下文章

死磕 java同步系列之Semaphore源码解析

死磕 java同步系列之Semaphore源码解析

Semaphore源码解析

Java并发源码解析

AQS源码解析

Java并发编程-Semaphore