AbstractQueuedSynchronizer源码

Posted emoji-emoji

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AbstractQueuedSynchronizer源码相关的知识,希望对你有一定的参考价值。

 /**
     * 独占模式下进行请求,如果当前线程被中断,放弃方法执行(抛异常),
     * 1.检查当前线程的中断状态,然后至少执行一次tryAcquire,
     * 2.如果成功,方法返回;
     * 3.如果失败,当前线程会在同步等待队列中排队,直到方法返回成功或者线程被中断。
     */
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

 //acquireQueued类似,但是这里响应中断(抛异常)
    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                //通过抛异常,来响应中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 

指定时间内尝试请求控制权,

/**
     * 独占模式下进行请求,如果当前线程被中断,响应中断(抛异常),返回
     * 指定时间内尝试请求控制权,
     */
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
    }

 

共享模式下请求控制权,

 /**
     * 1.调用tryAcquireShared尝试进行控制权请求,如果成功,返回。
     * 2.如果请求失败,那么会用当前线程建立一个共享模式的节点,然后将节点
     *  放到同步等待队列的尾部,进入循环。
     * 3.循环中会判断当前同步等待队列中是否有其他线程,如果没有,再次调用tryAcquireShared
     * 4.如果请求成功,将当前节点设置为同步等待队列的头节点,同时检查是否需要
     *  继续唤醒下一个共享模式的节点,如果需要就继续执行唤醒动作。这里还会想上传递中断
     *  状态,然后退出循环。
     * 5.如果在同步等待队列中,在当前线程前面有其他线程,或者第3步失败,
     *  那么首先需要检查当前节点是否已经设置等待唤醒标记,即将非取消状态前驱节点的等待
     *  状态设置为SIGNAL。
     * 6.如果没有设置等待唤醒标记,进行设置,然后继续循环,进入第三步
     * 7.如果已经设置等待唤醒标记,那么阻塞当前线程
     * 8.当前线程被唤醒后,设置传递中断标记,然后继续循环,继续第3步。
     * 9.最后在循环退出后,要判断请求是否失败,如果失败,当前线程取消请求、
     */

    /**
     * 共享模式下请求控制权,忽略中断,
     */
    public final void acquireShared(int arg) {
        //如果以共享模式尝试请求失败
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }



/**
     * 在共享模式下尝试请求(控制权),需要先查看一下对象的状态是否允许在共享
     * 模式下请求,如果允许在进行请求。
     *
     * 这个方法总是被请求线程执行,如果方法执行失败,会将当前线程放到同步等待队列
     * 中(如果当前线程还不在同步等待队列中),直到被其他线程的释放操作唤醒。
     * 
     * 留给子类实现
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }


private void doAcquireShared(int arg) {
        //将当前线程以共享模式加入同步等待队列。
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            //循环
            for (;;) {
                //获取当前节点的前驱p
                final Node p = node.predecessor();
                //如果p是头节点,
                if (p == head) {
                    //调用tryAcquireShared
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //到这里,说明tryAcquireShared执行成功,
                        //即在共享模式下获取控制权成功,
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        //检测中断状态,
                        if (interrupted)
                            selfInterrupt();
                        //
                        failed = false;
                        return;
                    }
                }
                //如果当前节点的前驱节点不是头节点,判断当前节点请求失败后
                //是否需要被阻塞,如果需要,阻塞并保存当前线程的中断状态
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


 /**
     * 将node设置为同步等待队列的头节点,并且检测一下node的后继节点是否
     * 在共享模式下等待,如果是,并且propagate>0或者之前头节点的等待状态是
     * PROPAGATE,唤醒后继节点。
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /**
         * 尝试去唤醒队列中的下一个节点,如果满足下面的条件:
         * 传递(propagate>0),
         * 或者h.waitStatus为PROPAGATE,且下一个节点处于共享模式或者为null
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }


/**
     * 共享模式下释放控制权,唤醒后继节点并确保传递。
     * 注:在独占模式下,释放仅仅意味着,唤醒头结点的后继节点。
     */
    private void doReleaseShared() {
        /**
         * 保证释放动作传递(向同步等待队列尾部),即使没有其他正在进行的请求或释放动作。
         * 如果头节点的后继节点需要唤醒,那么执行唤醒动作;如果不需要唤醒,将头节点的等待状态
         * 设置为PROPAGATE保证唤醒传递。另外,为了防止过程中有新节点进入(队列),
         * 这里必须循环,所以,和其他unparkSuccessor方法使用方式不一样的是,如果
         * 头结点等待状态设置失败,重新检测。
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                //如果同步等待队列不为空,获取头节点的等待状态。
                int ws = h.waitStatus;
                //如果等待状态是SIGNAL。说明后继节点需要唤醒
                if (ws == Node.SIGNAL) {
                    //尝试修改等待状态
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        //修改失败,重新循环
                        continue;  // loop to recheck cases
                    //修改成功,唤醒头节点的后继节点
                    unparkSuccessor(h);
                }
                //如果等待状态是0,尝试将头节点设置为PROPAGATE
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    //失败,继续循环
                    continue;                // loop on failed CAS
            }
            //头节点没有发生变化才退出循环
            if (h == head)                   // loop if head changed
                break;
        }
    }

 

共享模式下尝试请求,响应中断

 //共享模式下尝试请求,响应中断,抛异常
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

 

共享模式下尝试请求,响应中断,且支持自定义时间,
//共享模式下尝试请求,响应中断,且支持自定义时间
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
                doAcquireSharedNanos(arg, nanosTimeout);
    }

 

 

独占模式下的释放方法,

/**
     * 独占模式下的释放方法。
     * 如果tryRelease返回true,会唤醒一个或多个线程。
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            //如果tryRelease成功
            Node h = head;
            //判断同步等待队列里面是否右需要唤醒的线程,
            if (h != null && h.waitStatus != 0)
                //如果有,就唤醒
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 

共享模式下的释放方法:

 /**
     * 共享模式下的释放方法。
     * 如果tryReleaseShared返回true,会唤醒一个或者多个线程
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }


 /**
     * 共享模式下释放控制权,唤醒后继节点并确保传递。
     * 注:在独占模式下,释放仅仅意味着,唤醒头结点的后继节点。
     */
    private void doReleaseShared() {
        /**
         * 保证释放动作传递(向同步等待队列尾部),即使没有其他正在进行的请求或释放动作。
         * 如果头节点的后继节点需要唤醒,那么执行唤醒动作;如果不需要唤醒,将头节点的等待状态
         * 设置为PROPAGATE保证唤醒传递。另外,为了防止过程中有新节点进入(队列),
         * 这里必须循环,所以,和其他unparkSuccessor方法使用方式不一样的是,如果
         * 头结点等待状态设置失败,重新检测。
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                //如果同步等待队列不为空,获取头节点的等待状态。
                int ws = h.waitStatus;
                //如果等待状态是SIGNAL。说明后继节点需要唤醒
                if (ws == Node.SIGNAL) {
                    //尝试修改等待状态
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        //修改失败,重新循环
                        continue;  // loop to recheck cases
                    //修改成功,唤醒头节点的后继节点
                    unparkSuccessor(h);
                }
                //如果等待状态是0,尝试将头节点设置为PROPAGATE
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    //失败,继续循环
                    continue;                // loop on failed CAS
            }
            //头节点没有发生变化才退出循环
            if (h == head)                   // loop if head changed
                break;
        }
    }

 

//返回同步等待队列中第一个线程,没有返回null。
    public final Thread getFirstQueuedThread() {
        // handle only fast path, else relay
        return (head == tail) ? null : fullGetFirstQueuedThread();
    }

 private Thread fullGetFirstQueuedThread() {
        /**
         * 通常情况下,头节点的next指向的就是队列里第一个节点。
         * 尝试获取第一个节点的线程,保证读取的一致性:如果线程为null,
         * 或者第一个节点的前驱节点已经不是头节点,那么说明其他线程正在
         * 调用setHead方法。这里尝试获取两次,如果失败,再进行下面的遍历
         */
        Node h, s;
        Thread st;
        if (((h = head) != null && (s = h.next) != null &&
                s.prev == head && (st = s.thread) != null) ||
                ((h = head) != null && (s = h.next) != null &&
                        s.prev == head && (st = s.thread) != null))
            return st;
        /**
         * 头节点的next可能还没有设置,或者已经在setHead后被重置。
         * 所以我们必须验证尾节点是否是真的是第一个节点。如果不是,
         * 从尾节点反向遍历去查找头节点,确保程序退出。
         */
        Node t = tail;
        Thread firstThread = null;
        while (t != null && t != head) {
            Thread tt = t.thread;
            if (tt != null)
                firstThread = tt;
            t = t.prev;
        }
        return firstThread;
    }

 

/**
     * 如果同步等待队列中第一个线程是独占模式,返回true。
     * 如果这个方法返回true,并且当前线程正在尝试在共享模式下请求,
     * 那么可以保证当前线程不是同步等待队列里的第一个线程。
     */
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
    }

 

 

看看内部类ConditionObject:(条件等待队列)

await方法:

/**
         * 1.如果当前线程有中断状态,抛异常,响应中断。
         * 2.添加当前线程当条件等待队列。
         * 3.释放当前线程对AQS的控制权,并保存释放前AQS的状态。
         * 4.进入条件循环,条件为,判断当前线程是否在AQS同步队列中,
         *  如果不在,那么阻塞当前线程;如果在同步队列中,跳到第7步
         * 5.当前线程被(其他线程)唤醒后,要检查等待过程中是否被中断或者
         *  取消,如果没有继续循环,开始第4步
         * 6.如果是,保存中断状态和模式,然后退出条件循环。
         * 7.请求AQS控制权,然后做一些收尾工作,如果被取消,清理一些条件等待队列。
         *  然后按照中断模式处理一下中断。
         */
        //可中断的条件等待方法。
        public final void await() throws InterruptedException {
            //检查线程中断状态,响应中断
            if (Thread.interrupted())
                throw new InterruptedException();
            //如果不是中断状态,将当前线程添加到条件等待队列。
            Node node = addConditionWaiter();
            //释放当前线程对AQS的控制权,并返回当前AQS中的state的值
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //判断当前线程是否在AQS的同步等待队列中
            while (!isOnSyncQueue(node)) {
                //如果不在,阻塞当前线程。
                LockSupport.park(this);
                //其他线程调用相同条件上的signal/sinalAll方法时,会将这个节点
                //从条件队列转义到AQS的同步等待队列中。
                //被唤醒后需要检查是否在等待过程中被中断
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    //如果有中断,退出循环
                    break;
            }
            //重新请求AQS的控制权
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)//如果发生过中断,在这里处理
                reportInterruptAfterWait(interruptMode);
        }


 /**
         * 向同步等待队列中添加一个新的节点
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //根据当前线程创建一个节点
            Node node = new Node(Thread.currentThread(), Node.CONDITION);

            if (t == null)
                //到这里,说明node是队列中的第一个节点,那么将firstWaiter指向这个节点
                firstWaiter = node;
            else
                //如果队列中已经存在其他节点,那么将t的nextWaiter指向该node节点
                t.nextWaiter = node;
            //将lastWaiter指向node节点
            lastWaiter = node;
            return node;//返回node
        }


/**
         * 移除条件等待队列中的取消状态节点。
         * 这个方法一定是在持有锁(拥有AQS控制权)的情况下被调用的(所以不存在竞争)。
         * 当等待条件时被(节点的线程)取消,或者当lastWaiter被取消后,条件等待队列中进入了
         * 一个新节点时会调用这个方法。这个方法需要避免由于没有signal而引起的
         * 垃圾滞留。所以尽管方法内会做一个完全遍历,也只有超时获取或取消时(没有
         * signal的情况下)才被调用。方法中会遍历所有节点,切断所有指向垃圾节点的引用,
         * 而不是一次取消切断一个引用。
         */
        private void unlinkCancelledWaiters() {
            //获取条件等待队列中的头节点t
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                //如果队列中有等待节点。获取头结点的nextWaiter节点next
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    //如果t被取消,将t的nextWaiter清空
                    t.nextWaiter = null;
                    if (trail == null)
                        //将next设置为头节点(移除之前的取消节点)
                        firstWaiter = next;
                    //否则说明队列前端有未取消的节点,这里链接(移除中间的
                        // 取消节点)
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;//设置尾节点
                }
                else//如果t没被取消,将trail指向t
                    trail = t;
                t = next;
            }
        }

/**
     * 调用release方法,并传入当前的state
     * 成功,返回之前的state
     * 失败,抛异常,并取消当前节点
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

/**
     * 如果一个node最初放在一个条件队列里,而现在正在AQS的同步等待队列里,
     * 返回true
     */
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //如果有后继节点,说明肯定在AQS同步等待队列里面
        if (node.next != null) // If has successor, it must be on queue
            return true;
  
        /**
         * node.prev不为空并不能说明节点在AQS的同步等待队列里面,
         * 因为后续的CAS操作可能会失败,这里从尾节点反向遍历。
         */
        return findNodeFromTail(node);
    }

 

signal方法:

 /**
         * 将条件等待队列里面等待时间最长(链表最前面)的线程(如果存在的话),
         * 移动到AQS同步等待队列里面。
         */
        public final void signal() {
            //判断AQS的控制权是否被当前线程以独占的方式持有。
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)//判断条件队列里面是否有线程等待,
                //如果有,执行唤醒操作
                doSignal(first);
        }


  //唤醒指定节点
        private void doSignal(Node first) {
            do {
                //移除first
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
                //调用transferForSignal,如果调用失败,且条件等待队列不为空,
                //继续循环操作
            } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
        }


 //将一个节点从条件等待队列转移到同步等待队列,如果成功,返回true。
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        //如果设置等待状态失败,说明节点已经被取消了,返回false。
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        //将node加入到AQS同步等待队列中,并返回node的前驱
        Node p = enq(node);
        int ws = p.waitStatus;
        //如果前驱节点被取消,或者尝试设置前驱节点的状态SIGNAL(表示node需要唤醒)
        //失败,那么唤醒node节点上的线程。
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

 

signalAll方法:

  public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

//唤醒所有节点
        private void doSignalAll(Node first) {
            //将条件等待队列的头尾节点置空
            lastWaiter = firstWaiter = null;
            //循环遍历清空所有节点。
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

 

以上是关于AbstractQueuedSynchronizer源码的主要内容,如果未能解决你的问题,请参考以下文章

ReentrantLock原理源码详解

一行一行源码分析清楚 AbstractQueuedSynchronizer

Java并发-- AQS 原理