AQS

Posted angelica-duhurica

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了AQS相关的知识,希望对你有一定的参考价值。

java.util.concurrent.locks.AbstractQueuedSynchronizer

ReentrantLockSemaphoreCountDownLatch都有一个内部类Sync,而所有的Sync都是继承自AbstractQueuedSynchronizer

AQS核心是通过一个共享变量来同步状态,变量的状态由子类去维护,而AQS框架做的是:

  • 线程阻塞队列的维护
  • 线程阻塞和唤醒

共享变量的修改都是通过Unsafe类提供的CAS操作完成的。AbstractQueuedSynchronizer类的主要方法是acquirerelease,典型的模板方法, 下面这4个方法由子类去实现:

// Main exported methods
protected boolean tryAcquire(int arg)  throw new UnsupportedOperationException();
protected boolean tryRelease(int arg)  throw new UnsupportedOperationException();
protected int tryAcquireShared(int arg)  throw new UnsupportedOperationException();
protected boolean tryReleaseShared(int arg)  throw new UnsupportedOperationException();

acquire方法用来获取锁,返回true说明线程获取成功继续执行,一旦返回false则线程加入到等待队列中,等待被唤醒,release方法用来释放锁。 一般来说实现的时候这两个方法被封装为lockunlock方法。

等待的线程是按照阻塞时的顺序依次获取到锁的,这是因为AQS是基于CLH lock queue的一个变种来实现线程阻塞队列的。

CLH lock queue

CLH lock queue其实就是一个FIFO的队列,队列中的每个结点(线程)只要等待其前继释放锁就可以了。

通常就是用CLH lock queue来实现自旋锁(spin lock),简单来说就是线程通过循环来等待而不是睡眠。

AQS中线程不是一直在自旋的,而可能会反复的睡眠和唤醒,这就需要前继释放锁的时候通过next 指针找到其后继将其唤醒,也就是AQS的等待队列中后继是被前继唤醒的。AQS结合了自旋和睡眠/唤醒两种方法的优点。其中线程的睡眠和唤醒就是用到LockSupport

LockSupport

阻塞和唤醒是对于线程来说的,LockSupport的park/unpark更符合这个语义,以“线程”作为方法的参数, 语义更清晰。Object对象的wait和notify方法的实现使得“线程”的阻塞/唤醒对线程本身来说是被动的。

LockSupport并不需要获取对象的监视器。LockSupport机制是每次unpark给线程有且仅有1个许可,而park则相反,如果当前线程有许可,那么park方法会消耗许可并返回,否则会阻塞线程直到线程重新获得许可,在线程启动之前调用park/unpark方法没有任何效果。

用来创建锁和其他同步类的基本线程阻塞原语。

    // 解除阻塞线程,不会遇到Thread.resume所可能引发的死锁问题。
    public static void unpark(Thread thread) 
        if (thread != null)
            UNSAFE.unpark(thread);
    

    // 阻塞线程,不会遇到Thread.suspend所可能引发的死锁问题。
    // 和Object的wait一样也能响应中断,但是跟Thread.sleep()不同的是它不会抛出InterruptedException。
    public static void park(Object blocker) 
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        // 保证在park(Object blocker)整个函数执行完后,该线程的parkBlocker字段又恢复为null
        setBlocker(t, null);
    

    private static final sun.misc.Unsafe UNSAFE;

AbstractQueuedSynchronizer

抽象类,其为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。

底层的数据结构是使用双向链表,是队列的一种实现,故也可看成是队列,其中Sync queue,即同步队列,是双向链表,包括head结点和tail结点,head结点主要用作后续的调度。

    static final class Node 
        // 标记当前结点是共享模式
        static final Node SHARED = new Node();
        // 标记当前结点是独占模式
        static final Node EXCLUSIVE = null;

        // 表示当前的线程被取消
        static final int CANCELLED =  1;
        // 表示当前节点的后继节点包含的线程需要运行,需要进行unpark操作
        static final int SIGNAL    = -1;
        // 表示当前节点在condition queue中,在等待condition
        static final int CONDITION = -2;
        // 代表后续结点会传播唤醒的操作,共享模式下起作用
        static final int PROPAGATE = -3;
        // 结点的等待状态
        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;
        // 拥有当前结点的线程
        volatile Thread thread;

        Node nextWaiter;

        final boolean isShared() 
            return nextWaiter == SHARED;
        

        final Node predecessor() throws NullPointerException 
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        

        Node()     // Used to establish initial head or SHARED marker
        

        Node(Thread thread, Node mode)      // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        

        Node(Thread thread, int waitStatus)  // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        
    

    // Sync queue,即同步队列,是双向链表
    private transient volatile Node head;
    private transient volatile Node tail;
    // 自旋时间,doAcquireNanos方法的for循环用到了这个时间
    static final long spinForTimeoutThreshold = 1000L;

    // 实现了Condition接口
    public class ConditionObject implements Condition, java.io.Serializable 
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
    
入队
    private Node addWaiter(Node mode) 
        Node node = new Node(Thread.currentThread(), mode);
        // 尝试快速入队操作,因为大多数时候尾节点不为 null
        Node pred = tail;
        // 这个if分支其实是一种优化:CAS操作失败的话才进入enq中的循环。
        if (pred != null) 
            node.prev = pred;
            if (compareAndSetTail(pred, node)) 
                pred.next = node;
                return node;
            
        
        // 如果尾节点为空(也就是队列为空) 或者尝试CAS入队失败(由于并发原因),进入enq方法
        enq(node);
        return node;
    

    private Node enq(final Node node) 
        for (;;) 
            Node t = tail;
            if (t == null)  // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
             else 
                // 可以看到这一部分和上面是重复的
                node.prev = t;
                if (compareAndSetTail(t, node)) 
                    t.next = node;
                    return t;
                
            
        
    
出队
    private void setHead(Node node) 
        head = node;
        node.thread = null;
        node.prev = null;
    
独占模式获取
    // 以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。
    // tryAcquire由子类实现本身不会阻塞线程,如果返回true,则线程继续,
    // 如果返回false那么就加入阻塞队列阻塞线程,并等待前继结点释放锁
    public final void acquire(int arg) 
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // acquireQueued返回true,说明当前线程被中断唤醒后获取到锁,
            // 重置其interrupt status为true。
            selfInterrupt();
    

    final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            boolean interrupted = false;
            // 等待前继结点释放锁
            // 自旋re-check
            // 支持超时的获取版本
            for (;;) 
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                
                // parkAndCheckInterrupt就是用LockSupport来阻塞当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    
独占模式释放
public final boolean release(int arg) 
    if (tryRelease(arg)) 
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒后续的结点
            unparkSuccessor(h);
        return true;
    
    return false;
  
共享模式获取
    // 如果没有许可了则入队等待    
    public final void acquireShared(int arg) 
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    

    private void doAcquireShared(int arg) 
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                final Node p = node.predecessor();
                if (p == head) 
                    int r = tryAcquireShared(arg);
                    if (r >= 0) 
                        // 获取成功则前继出队,跟独占不同的是,
                        // 会往后面结点传播唤醒的操作,保证剩下等待的线程能够尽快获取到许可。
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    
共享模式释放
    public final boolean releaseShared(int arg) 
        if (tryReleaseShared(arg)) 
            doReleaseShared();
            return true;
        
        return false;
    

Condition

public interface Condition 
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();

ReentrantLock

一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大。

class ReentrantLock implements Lock

    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer 
        // 由子类实现分为公平和非公平
        abstract void lock();

        // 非公平方式获取
        final boolean nonfairTryAcquire(int acquires) 
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) 
                if (compareAndSetState(0, acquires)) 
                    setExclusiveOwnerThread(current);
                    return true;
                
            
            else if (current == getExclusiveOwnerThread()) 
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            
            return false;
        

        protected final boolean tryRelease(int releases) 
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) 
                free = true;
                setExclusiveOwnerThread(null);
            
            setState(c);
            return free;
        
        
        // 是否被当前线程占有
        protected final boolean isHeldExclusively() 
            return getExclusiveOwnerThread() == Thread.currentThread();
        
        
        // 获取占有资源的线程
        final Thread getOwner() 
            return getState() == 0 ? null : getExclusiveOwnerThread();
        
        ...
    

    static final class NonfairSync extends Sync 

        final void lock() 
            // 比较并设置状态成功,状态0表示锁没有被占用
            if (compareAndSetState(0, 1))
                // 把当前线程设置独占了锁
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 锁已经被占用,或者set失败
                // 以独占模式获取对象,忽略中断
                acquire(1);
        

        protected final boolean tryAcquire(int acquires) 
            return nonfairTryAcquire(acquires);
        
    

    static final class FairSync extends Sync 
        final void lock() 
            // 这里就是调用父类AQS的acquire方法
            // 然后在AQS的方法里调用到下面的tryAcquire方法和AQS的addWaiter方法、acquireQueued方法
            acquire(1);
        

        protected final boolean tryAcquire(int acquires) 
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) 
                // 不存在已经等待更久的线程,比较并且设置状态成功
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) 
                    setExclusiveOwnerThread(current);
                    return true;
                
            
            else if (current == getExclusiveOwnerThread()) 
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            
            return false;
        
    

ReentrantLock的绝大部分操作都是基于AQS类的。

CyclicBarrier

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();

    // 对外提供的await函数在底层都是调用该了doawait函数
    private int dowait(boolean  timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException 
        final ReentrantLock lock = this.lock;
        lock.lock();
        try 
            final Generation g = generation;
            // 判断屏障是否被破坏
            if (g.broken)
                throw new BrokenBarrierException();
            // 判断线程是否被中断
            if (Thread.interrupted()) 
                // 损坏当前屏障,并且唤醒所有的线程
                breakBarrier();
                throw new InterruptedException();
            
            // 判断等待进入屏障的线程数量
            int index = --count;
            if (index == 0)   // tripped,所有线程都已经进入运行的动作标识
                boolean ranAction = false;
                try 
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 进入下一代,在所有线程进入屏障后会被调用,
                    // 即生成下一个版本,所有线程又可以重新进入到屏障中,唤醒所有线程
                    nextGeneration();
                    return 0;
                 finally 
                    if (!ranAction)
                        breakBarrier();
                
            

            // loop until tripped, broken, interrupted, or timed out
            for (;;) 
                try 
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                 catch (InterruptedException ie) 
                    if (g == generation && ! g.broken) 
                        breakBarrier();
                        throw ie;
                     else 
                        Thread.currentThread().interrupt();
                    
                

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) 
                    breakBarrier();
                    throw new TimeoutException();
                
            
         finally 
            lock.unlock();
        
    

CountDownLatch

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

典型的用法是将一个程序分为n个互相独立的可解决任务,并创建值为n的CountDownLatch。当每一个任务完成时,都会在这个锁存器上调用countDown,等待问题被解决的任务调用这个锁存器的await,将他们自己拦住,直至锁存器计数结束。

    // 内部类Sync
    private final Sync sync;

    public void await() throws InterruptedException 
        // 调用了Sync的tryAcquireShared和AQS的doAcquireSharedInterruptibly函数
        // tryAcquireShared:试图在共享模式下获取对象状态
        sync.acquireSharedInterruptibly(1);
    

    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException 
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    

    // 递减锁存器的计数,如果计数到达零,则在共享模式下释放所有等待的线程
    public void countDown() 
        sync.releaseShared(1);
    

    public long getCount() 
        return sync.getCount();
    

CountDownLatch是采用共享模式,而ReentrantLock是采用独占模式。

Semaphore

一个计数信号量,从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个acquire(),然后再获取该许可。每个release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,Semaphore只对可用许可的号码进行计数,并采取相应的行动,不使用实际的许可对象。通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。

与ReentrantLock的内部类的结构相同,Sync、NonfairSync、FairSync三个内部类。基于Semaphore对象的操作绝大多数都转移到了对sync的操作。

不会使用到AQS的条件队列。

ReentrantReadWriteLock

读写锁接口ReadWriteLock的实现类,它包括Lock子类ReadLock和WriteLock。ReadLock是共享锁,WriteLock是独占锁。

class ReentrantReadWriteLock implements ReadWriteLock
public interface ReadWriteLock 
    Lock readLock();
    Lock writeLock();

五个内部类:Sync、NonfairSync、FairSync、ReadLock、WriteLock。

写锁数量由state的低十六位表示。读锁数量由state的高十六位表示。

可以实现多个线程同时读,此时,写线程会被阻塞。并且,写线程获取写入锁后可以获取读取锁,然后释放写入锁,这样写入锁变成了读取锁。

参考:

http://zhanjindong.com/tags/#AQS

https://www.cnblogs.com/leesf456/p/5453091.html

以上是关于AQS的主要内容,如果未能解决你的问题,请参考以下文章

AQS(AbstractQueuedSynchronizer)源码深度解析—AQS的设计与总体结构

4.从AbstractQueuedSynchronizer(AQS)说起——AQS结语

AQS 详解

AQS学习

Java AQS学习

AQS 源码分析