jdk1.8 J.U.C并发源码阅读------AQS之独占锁的获取与释放

Posted Itzel_yuki

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jdk1.8 J.U.C并发源码阅读------AQS之独占锁的获取与释放相关的知识,希望对你有一定的参考价值。

一、继承关系

since1.5
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable 
继承自AbstractQueuedSynchronizer抽象类。

二、成员变量

private transient volatile Node head;//CLH队列首节点
private transient volatile Node tail;//CLH队列尾节点
private volatile int state;//锁的占用次数

三、内部类

CLH队列(链表结构的队列)的node的数据结构:

static final class Node 
       //标志Node的状态:独占状态
        static final Node SHARED = new Node();
        //共享状态
        static final Node EXCLUSIVE = null;

        //因为超时或者中断,node会被设置成取消状态,被取消的节点时不会参与到竞争中的,会一直保持取消状态不会转变为其他状态;
        static final int CANCELLED =  1;
       //该节点的后继节点被阻塞,当前节点释放锁或者取消的时候(cancelAcquire)需要唤醒后继者。
        static final int SIGNAL    = -1;
       //CONDITION队列中的状态,CLH队列中节点没有该状态,当将一个node从CONDITION队列中transfer到CLH队列中时,状态由CONDITION转换成0
        static final int CONDITION = -2;
        //该状态表示下一次节点如果是Shared的,则无条件获取锁。
        static final int PROPAGATE = -3;

        
		 //当一个新的node在CLH队列中被创建时初始化为0,在CONDITION队列中创建时被初始化为CONDITION状态
        volatile int waitStatus;

        //队列中的前驱节点
        volatile Node prev;

        
		 //next连接指向后继节点,当前节点释放锁时,需要唤醒它后面第一个非cancelled状态的节点。
		 //当一个节点入队时,直接添加到队尾,在acquireQueue中调用shouldParkAfterFailedAcqurie时修改前一个节点的状态,若前一个节点是cancelled则直接删除前驱节点,调整prev指向非cancelled的前驱;若前驱节点的状态是0,-3时,调整状态为signal
		 //在enq方法中,只有当成功的将tail指针指向新的尾节点时,才给之前的尾节点设置next的值,因此,当一个节点的next指针是null时,并不意味着该节点是尾节点。
		 //cancelled状态的node的next指向该节点自身而不是null。
        volatile Node next;

        //当前线程
        volatile Thread thread;

		//CONDITION队列中指向下一个node的指针,CLH队列中不使用
        Node nextWaiter;

        
        final boolean isShared() 
            return nextWaiter == SHARED;
        

       //返回前驱节点
        final Node predecessor() throws NullPointerException 
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        

        Node()     // Used to establish initial head or SHARED marker
        

        Node(Thread thread, Node mode)      // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        

        Node(Thread thread, int waitStatus)  // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        
    


四、方法说明

获取锁有三种方式: (1)acquire:以独占的模式获取一个锁,忽略中断,获取成功返回true,否则将该线程加入等待队列
	public final void acquire(int arg) 
	//tryAcquire尝试获得该arg,失败则先addWaiter(Node.EXCLUSIVE)加入等待队列,然后acquireQueued。
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
			//中断当前线程
            selfInterrupt();
    
	//tryAcquire(arg):尝试获得该arg
	//模板方法:由子类自己实现
	 protected boolean tryAcquire(int arg) 
        throw new UnsupportedOperationException();
    
	//mode:设置该节点是共享的还是独占的, Node.EXCLUSIVE for exclusive, Node.SHARED for shared
	private Node addWaiter(Node mode) 
        Node node = new Node(Thread.currentThread(), mode);
        //pred指向尾节点
        Node pred = tail;
		//尾节点不为null
        if (pred != null) 
		//新node的前驱结点指向尾节点
            node.prev = pred;
			//compareAndSetTail(pred, node):若等待队列的尾节点指向pred,则将尾节点指针指向node
            if (compareAndSetTail(pred, node)) 
			//修改AQS的尾节点成功,则将原先尾节点的next指向新节点
                pred.next = node;
			//返回当前节点
                return node;
            
        
		//pred==null或者修改AQS尾节点失败,则进入enq,用一个for循环修改AQS的尾指针,直至成功
        enq(node);
        return node;
    
	
	private Node enq(final Node node) 
		//循环,原因:可能同时存在多个线程修改tail指针。
        for (;;) 
		//t指向队列的tail
            Node t = tail;
            if (t == null)  // Must initialize
			//尾节点为null,则队列为空,需要初始化队列
                if (compareAndSetHead(new Node()))
                    tail = head;
             else 
			//将node的prev指针指向AQS的tail
                node.prev = t;
				//如果队列的tail==t,则将tail指向node
                if (compareAndSetTail(t, node)) 
				//原先的尾节点的next指向新添加的node
                    t.next = node;
				//修改成功,返回原先的尾节点,失败则进行下一次循环
                    return t;
                
            
        
    
	//compareAndSetHead和compareAndSetTail都是调用unsafe对象的compareAndSwapObject实现的。
	/**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) 
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    
	 /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) 
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    
	//acquireQueued:忽略中断
	final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
			//获取node的前驱节点p
                final Node p = node.predecessor();
				//前驱p是首节点,则当前线程尝试获取锁
                if (p == head && tryAcquire(arg)) 
				//获取成功,则将当前节点设置成首节点
                    setHead(node);
				//删除前驱的next引用
                    p.next = null; // help GC
                    failed = false;
				//返回中断标志
                    return interrupted;
                
				//node的前驱节点p不是首节点head,则通过shouldParkAfterFailedAcquire来判断当前node的线程是否应该挂起park
                if (shouldParkAfterFailedAcquire(p, node) &&
				//应该park,则调用parkAndCheckInterrupt忽略中断进行挂起
                    parkAndCheckInterrupt())
                    interrupted = true;
			//当该线程被唤醒时,尝试再次判断前驱节点p是否为head,如果是则再次尝试获取锁,这样做的目的是为了维持FIFO的顺序
			//采用阻塞park和唤醒unpark的方式进行自旋式检测获取锁
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    

	private void setHead(Node node) 
        head = node;
		//thread已经获得了锁,取消Node中指向thread的引用
        node.thread = null;
		//前驱设置为null
        node.prev = null;
    
	//shouldParkAfterFailedAcquire:当前节点获取锁失败时,是否应该进入阻塞状态
	//(1)前驱节点的waitStatus==SINGNAL(-1):则说明该节点需要继续等待直至被前驱节点唤醒,返回true.
	//(2)前驱节点的waitStatus==cancelled(1):则说明前驱节点曾经发生过中断或者超时,则前驱节点是一个无效节点,继续向前寻找一个node,该node的waitStatus<0,同时将waitStatus>0(cancelled)的节点删除,返回false(重新检测)。若新的前驱节点是head且tryAcquire获取成功,则该线程成功获取到锁。
	//(3)前驱节点的waitStatus==-3或0(PROPAGATE):共享锁向后传播,因此需要将前驱节点的waitStatus修改成signal,然后返回false,重新检测前驱节点;0,表示前驱节点是队列尾节点,该节点新添加进来,修改为signal表示原先尾节点后面还有需要唤醒的节点(该节点)。
	
	private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
	//ws:前驱节点的状态
        int ws = pred.waitStatus;
		//如果前驱节点是SIGNAL状态,则返回true,表示后继节点node应该被阻塞park
        if (ws == Node.SIGNAL)
            return true;
		//ws>0:cancelled,前驱节点是一个超时或者中断被取消的Node
        if (ws > 0) 
           //循环往前找第一个不是cancelled状态的节点,同时删除中间状态是cancelled的节点,调整队列。
            do 
                node.prev = pred = pred.prev;
             while (pred.waitStatus > 0);
            pred.next = node;
         else 
          //状态是-3PROPAGATE或0(CONDITION状态只能在CONDITION队列中出现,不会出现在CLH队列中),则将pred前驱节点的状态设置成SIGNAL状态,表示pred节点被唤醒时,需要park它的后继者。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        
        return false;
    
	//parkAndCheckInterrupt:阻塞当前线程,并且返回中断状态
	private final boolean parkAndCheckInterrupt() 
        LockSupport.park(this);
        return Thread.interrupted();
    
	//当acquireQueued的try中抛出异常,并且node没有成功获取锁时,需要通过cancelAcquire方法取消node节点
	//主要功能:
	//(1)重新设置node的prev指针(相当于删除了node的前面连续几个cancelled节点)
	//(2)将node的状态设置为cancelled
	//(3)如果node的prev不是首节点,并且node的next节点不是一个cancelled节点,则设置node的prev指向node的next节点;否则,node的prev节点head节点,则需要唤醒node的后继节点(unparkSuccessor)
	//(4)将node的next指针指向自己
	private void cancelAcquire(Node node) 
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        //如果node的前驱节点也是一个cancelled的节点,则修改node的prev指针,直至指向一个不是cancelled状态的node
		//相当于删除了node到prev之前的状态为calcelled的前驱节点
		//原因:若新的前驱节点是head节点,则需要在该node失效时唤醒node的后继节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        //predNext是一个cancelled的节点(可能是node)。
        Node predNext = pred.next;

       //直接将node的状态设置为cancelled
        node.waitStatus = Node.CANCELLED;

        //如果是尾节点,直接删除node
        if (node == tail && compareAndSetTail(node, pred)) 
            compareAndSetNext(pred, predNext, null);
         else 
            // 当pred不是首节点时,如果node的后继节点需要signal,则尝试将pred的状态设置成signal,并且将pred的next指针指向node的next节点
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) 
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
             else 
			//当pred是首节点时,需要唤醒node的后继节点。
                unparkSuccessor(node);
            
			//将cancelled的节点的next指针指向node自身
            node.next = node; // help GC
        
    
	//唤醒node的第一个非cancelled状态的node节点
	private void unparkSuccessor(Node node) 
        
        int ws = node.waitStatus;
		//如果该节点是一个cancelled的节点,则不用管它;否则,将该节点状态置0清空,表示该节点已经获取过锁了,要出队。
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //node的后继节点有可能是一个cancelled的节点,而cancelled节点的next指针指向其自身。
		//只能从tail开始寻找到node之间最靠近node的那一个非cancelled的节点,并唤醒它。
		//此处不删除node到s之间的cancelled节点,只负责唤醒s节点,删除操作在shouldParkAfterFailedAquired中进行
        Node s = node.next;
        if (s == null || s.waitStatus > 0) 
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        
        if (s != null)
            LockSupport.unpark(s.thread);
    
总结:
         首先调用tryAcquire尝试获取锁,获取失败则调用addWaiter,将该节点以独占的状态入队,加入到队列的末尾,由于可能存在多个线程调用CAS操作将节点加入到CLH队列末尾,因此enq方法以for循环的方式进行,保证该节点能添加到队列末尾。
然后调用acquireQueued在队列中获取锁,过程为:检查该节点node的前驱节点是否为head节点
如果是head节点并且调用tryAcquire获取锁成功,则setHead,将node作为当前队列的队首,原先的首节点出队。
否则,调用shouldParkAfterFailedAcquire查看当前节点的线程是否应该park,具体为:如果当前结点node的前一个节点的waitStatus为signal则表示node节点应该park等待前一个节点唤醒它,返回true;如果当前节点的小于0,即为cancelled,则应该删除node前面连续的cancelled节点,将node的prev指针指向靠近node的第一个非cancelled节点,返回false,重新判断前驱状态;若为0或者为PROPAGATE,则CAS将前驱状态改成signal返回false,重新判断。
如果应该park,则调用parkAndCheckInterrupt将当前线程park,否则就进行下次循环。
如果以上过程中抛出异常并且该节点还没有成功获取锁,则指向cancelAcquire,将当前节点的状态改为cancelled,该节点不在参与获取锁,同时删除该节点前面连续的cancelled状态的节点,若删除之后发现该节点的前一个节点是head,则应该调用unparkSuccessor唤醒后继。

(2)acquireInterruptibly:以独占的方式获取一个锁,如果该线程发生中断则抛出异常,将该node的waitstate设置为0,放弃等待锁。
public final void acquireInterruptibly(int arg)
            throws InterruptedException 
		//检查当前线程是否被中断,中断则抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
		//尝试获取锁,获取失败则调用doAcquireInterruptibly
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    
	//doAcquireInterruptibly:获取锁,中断则抛出异常,修改node的waitstate=0
	private void doAcquireInterruptibly(int arg)
        throws InterruptedException 
		//将该线程以独占的方式加入锁等待队列
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try 
		//自旋式获取锁,可能经过多个park和unpark的过程
            for (;;) 
				//p指向node的前驱节点
                final Node p = node.predecessor();
				//如果前驱节点p是head,则再次尝试获得锁
                if (p == head && tryAcquire(arg)) 
				//获取锁成功,修改head指针,指向当前节点。
				//这里不用像setTail一样,因为等待队列是按照FIFO的顺序来获取锁的,因此不可能存在多个线程同时操作head,而tail可能同时存在多个node要添加到队列尾部,并发则需要循环添加至tail,直至成功。
                    setHead(node);
					//原先的头结点出队,将内容设置为null,方便回收
                    p.next = null; // help GC
                    failed = false;
                    return;
                
				//否则,前驱节点p不是head或者获取锁失败,则调用shouldParkAfterFailedAcquire查询该node是否应该阻塞park
                if (shouldParkAfterFailedAcquire(p, node) &&
				//应该阻塞,则调用parkAndCheckInterrupt进行中断式阻塞
                    parkAndCheckInterrupt())
					//如果在阻塞时该线程发生异常,则在此处抛出异常。
                    throw new InterruptedException();
            
			//如果抛出异常,则在finally中cancelAcquire(将node的waitstate修改)
         finally 
            if (failed)
                cancelAcquire(node);
        
    
	
	private final boolean parkAndCheckInterrupt() 
		//阻塞当前线程
        LockSupport.park(this);
		//返回中断状态。
        return Thread.interrupted();
    
	//cancelAcquire:取消当前线程等待锁的状态
	private void cancelAcquire(Node node) 
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
		//找到前驱节点不是cancelled的,将node的prev指向它
		//目的是为了判断新前驱如果是head,则该节点失效需要唤醒后继节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        //prev节点的后继节点,可能不是node,因为while循环中只是修改了prev指向,没有更改next指向
        Node predNext = pred.next;

        //当前节点的waiteStatus设置为CANCELLED,表示发生中断或者超时
        node.waitStatus = Node.CANCELLED;
		//如果node是尾节点,则直接删除并重新设置tail指向。
        if (node == tail && compareAndSetTail(node, pred)) 
            compareAndSetNext(pred, predNext, null);
         else 
            
            int ws;
			//前驱结点pred不是head,将pred的waitStatus设置成SIGNAL,并将pred的next指针指向node的下一个节点
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) 
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
             else 
			//尝试唤醒node的后继节点
                unparkSuccessor(node);
            
            node.next = node; // help GC
        
    
	private void unparkSuccessor(Node node) 
        //获取当前node的waitSatus
        int ws = node.waitStatus;
		//<0,该节点等待锁
        if (ws < 0)
			//重置该node的状态为0
            compareAndSetWaitStatus(node, ws, 0);
		//s指向node的后继节点
        Node s = node.next;
		//如果没有后继node或者后继node是canceld,因为超时或者中断被取消
        if (s == null || s.waitStatus > 0) 
            s = null;
			//从尾节点开始,寻找从s到tail之间最靠近s的waitStatus<0的nodek,将s指向它
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        
		//s不为空,则有可以操作的后继节点
        if (s != null)
		//唤醒该节点后继节点的线程。
		//该头结点的线程已经获得过锁并且执行完成了,还唤醒了后继节点,要将该节点从队列中移除(具体操作在该节点后继节点acquiredQueue的方法中实现更新表头)
            LockSupport.unpark(s.thread);
    

(3)tryAcquireNanos:尝试以独占的方式获取锁,如果发生中断和超时则停止
 public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException 
			//发生中断,停止
        if (Thread.interrupted())
            throw new InterruptedException();
			//尝试获取独占锁,失败则调用doAcquireNanos
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    
	private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException 
        if (nanosTimeout <= 0L)
            return false;
		//截止时间
        final long deadline = System.nanoTime() + nanosTimeout;
		//创建一个Node,独占模式
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try 
            for (;;) 
			//前驱节点
                final Node p = node.predecessor();
				//如果p是head,并且tryAcquire获取成功,则该node获取锁成功
                if (p == head && tryAcquire(arg)) 
				//设置新node为head节点,node之前的节点都出队了
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                
				//还剩多少时间
                nanosTimeout = deadline - System.nanoTime();
				//超时
                if (nanosTimeout <= 0L)
                    return false;
				//shouldParkAfterFailedAcquire,查看是否需要阻塞该线程,如果需要则
                if (shouldParkAfterFailedAcquire(p, node) &&
				//查看剩余时间大于spinForTimeoutThreshold,如果大于则阻塞当前线程,否则,剩余时间很短,直接查看是否能成功获取锁(为了超时时间的准确性)
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
				//如果该线程发生了中断,则抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
            
         finally 
		//如果获取失败,则cancelAcquire,该节点要么超时,要么抛出异常了
            if (failed)
                cancelAcquire(node);
        
    

释放独占锁的过程分析: 总结: 调用tryRelease释放锁,若成功了,则查看head的状态,若waitStatus为0,则表示CLH队列中没有后继节点了,则不用唤醒,否则,需要调用unparkSuccessor唤醒后继节点,unparkSuccessor唤醒后继节点的原理是:找到node的后面第一个非cancelled状态的节点进行唤醒。
public final boolean release(int arg) 
//tryRelease,模板方法,子类实现。尝试释放锁,释放成功,则需要唤醒后继节点
        if (tryRelease(arg)) 
            Node h = head;
			//h.waitStatus为0说明CLH队列中已经没有成员了,head是最后一个节点
            if (h != null && h.waitStatus != 0)
			//唤醒后继节点
                unparkSuccessor(h);
            return true;
        
        return false;
    
	private void unparkSuccessor(Node node) 
       
        int ws = node.waitStatus;
		//waitStatus<0,重置node的状态为0,该节点已经获取过锁了。
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
		//s节点的状态是cancelled,需要寻找s后面第一个非cancelled状态的节点唤醒
		//从队尾开始向前找的原因:
		//next节点不一定指向它的后继节点,有可能指向其自身,当该节点的状态为cancelled的时候,该节点的next指针指向自身。
		//之所以cancelled状态节点的next指针指向自身,是为了方便该方法isOnSyncQueue(Node node)判断,若next不为空则说明该Node一定在CLH队列中。
        if (s == null || s.waitStatus > 0) 
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        
        if (s != null)
		//唤醒线程,后继线程在AcquireQueue中被唤醒。
		//删除node到s之间的cancelled状态的线程的工作,在shouldParkAfterFailedAcquire中进行,当前驱状态为cancelled时,
		//寻找第一个非cancelled状态的节点,调整s的prev指向,如果前驱是head,并且tryAcquire成功,则s成功获取到锁了。
            LockSupport.unpark(s.thread);
    





以上是关于jdk1.8 J.U.C并发源码阅读------AQS之独占锁的获取与释放的主要内容,如果未能解决你的问题,请参考以下文章

jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析

jdk1.8 J.U.C并发源码阅读------ReentrantLock源码解析

jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析

jdk1.8 J.U.C并发源码阅读------CountDownLatch源码解析

jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析

jdk1.8 J.U.C并发源码阅读------CyclicBarrier源码解析