ReentrantLock源码分析

Posted 小灰和小白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReentrantLock源码分析相关的知识,希望对你有一定的参考价值。

文章目录

一、ReentrantLock的介绍

1.简介

Java中提供的锁:synchronized, lock锁(ReentrantLock、ReentrantReadWriteLock)
ReentrantLock就是一个互斥锁,可以让多线程执行期间,只有一个线程在执行指定的一段代码。

2.使用方式

/*前置知识:CAS、volatile、AQS*/
ReentrantLock lock = new ReentrantLock();
//加锁
lock.lock();
try 
    // 执行业务...
 finally 
    //释放锁
    lock.unlock();

二、加锁的相关方法源码分析

ReentrantLock的所有方法如下图左侧所示:

1.lock方法

(1)在lock方法内部调用了sync.lock()抽象方法,发现该方法有两个实现。
Sync是一个内部静态抽象类,继承了AQS类

public class ReentrantLock implements Lock, java.io.Serializable 
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer 
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs @link Lock#lock. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();


(2)抽象方法lock()有两种实现:公平的FairSync,和不公平的NonfairSync。
a. FairSync公平锁
每个线程在执行lock方法时,会先查看是否有线程排队,如果有,直接去排队。如果没有才去尝试竞争锁资源。
b. NonfairSync非公平锁
每个线程都会在执行lock方法时,先尝试获取锁资源,获取不到再排队。

(3)如果需要使用公平锁:在new ReentrantLock时,传入参数true.
如果需要使用非公平锁:直接调用无参构造方法(默认非公平锁)。
PS:更推荐非公平锁,非公平锁的效率比公平锁高。
(4)从源码的角度也发现了公平锁直接调用acquire方法尝试获取锁,

而非公平锁会先基于CAS的方式尝试获取锁资源,如果获取不到,才会执行acquire方法尝试获取锁。

2.分析AQS

AQS就是AbstractQueuedSynchronizer类,AQS内部维护着一个队列,还有三个核心属性:state、head、tail。
state代表当前锁重入的个数,用getState和setState方法读取和设置。

3. lock方法源码

非公平锁的lock方法:

//非公平锁的lock方法
final void lock() 
    //以CAS的方式,尝试将state从0改为1
    if (compareAndSetState(0, 1))
        // 进入if证明修改state成功,也就代表获取锁资源成功
        // 将当前线程设置到AQS中的exculusiveOwnerThread(AOS中,AOS为AQS的父类),代表当前线程拿着锁资源(和后面的可重入锁有关)
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);


protected final boolean compareAndSetState(int expect, int update) 
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);


protected final void setExclusiveOwnerThread(Thread thread) 
    exclusiveOwnerThread = thread;

公平锁的lock方法:

//公平锁的lock方法
final void lock() 
    acquire(1);

4.ReentrantLock的acquire方法

acquire方法中调用了tryAcquire、addWaiter、acquireQueued三个方法
非公平锁和公平锁调用的acquire方法是一样的,只是tryAcquire方法分为公平锁和非公平锁的实现

// 公平锁还是非公平锁都会调用当前的acquire方法
public final void acquire(int arg) 
    // tryAcquire方法,分为两种实现。第一种是公平锁,第二种是非公平锁。
    // 公平锁:如果state为0,再看是否有线程排队,如果有就去排队。如果是锁重入的操作,直接获取锁。
    // 非公平锁:如果state为0,直接尝试CAS修改。如果是锁重入的操作,直接获取锁。
    if (!tryAcquire(arg) && // 如果tryAcquire返回fasle说明获取锁失败,要去尝试排队
        // addWaiter方法,在线程没有通过tryAcquire拿到锁资源时,需要将当前线程封装为Node对象,去AQS内部排队
        // acquireQueued方法:查看当前线程是否是排在队伍前面的,如果是就尝试获取锁资源。如果长时间没拿到锁,也需要将当前线程挂起
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();

注:遇到的锁几乎都是可重入锁。只有一些特殊的实现,如线程池就单独实现了非可重入锁。

总结:acquire主要是三个部分:①tryAcquire方法获取锁,根据公平锁和非公平锁不同处理。②如果没拿到锁资源,调用addWaiter方法把当前线程封装成AQS的Node对象去排队。③排队后调用acquireQueued方法判断当前Node是不是头节点,能不能拿到锁资源,如果一直没拿到要把线程挂起。

5.ReentrantLock的tryAcquire方法

tryAcquire方法是AQS提供的,内部并没有任何的实现,需要继承AQS的类自己去实现逻辑代码
查看到tryAcquire在ReentrantLock中提供了两种实现:公平锁、非公平锁
非公平锁的实现逻辑:

protected final boolean tryAcquire(int acquires) 
    return nonfairTryAcquire(acquires);


// 非公平锁实现
final boolean nonfairTryAcquire(int acquires) 
    //获取了当前线程
    final Thread current = Thread.currentThread();
    // 获取AQS的state值
    int c = getState();
    // 如果state为0,这就代表当前没有线程占用锁资源
    if (c == 0) 
        // 直接基于CAS的方式,尝试修改state,从0~1,如果成功就代表拿到锁资源
        if (compareAndSetState(0, acquires)) 
            // 将exclusiveOwnerThread属性设置为当前线程(exclusiveOwnerThread为AQS父类AOS的属性)
            setExclusiveOwnerThread(current);
            return true;
        
    
    // 说明state肯定不为0,不为0就代表当前lock被线程占用
    // 判断占用锁资源的线程是不是当前线程
    else if (current == getExclusiveOwnerThread()) 
        // 锁重入操作!
        // 对state + 1
        int nextc = c + acquires;
        // 判断锁重入是否已经达到最大值(整数溢出变为负数)
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 将AQS的state设置好
        setState(nextc);
        // 返回true
        return true;
    
    //返回false表示在tryAcquire方法中想尝试拿到锁的资源但是没有拿到,要去排队了
    return false;

公平锁的实现逻辑:

protected final boolean tryAcquire(int acquires) 
    //获取了当前线程
    final Thread current = Thread.currentThread();
    // 获取AQS的state值
    int c = getState();
    // 没有线程占用锁资源
    if (c == 0) 
        //首先查看有没有线程排队
        if (!hasQueuedPredecessors() &&// 与非公平锁的实现只差在这一行代码上,先判断有无线程排队
            // 如果没有线程排队,CAS尝试获取锁资源(没有线程排队时,下面的代码逻辑公平锁和非公平锁是一样的)
            compareAndSetState(0, acquires)) 
            setExclusiveOwnerThread(current);
            return true;
        
    
    else if (current == getExclusiveOwnerThread()) 
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    
    return false;

6.ReentrantLock的addWaiter方法

在线程执行tryAcquire方法没有获取到锁资源之后会返回false,再配合上if中的!操作,会执行&&后面的方法,而在acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的参数中执行了addWaiter方法,要将当前获取锁失败的线程封装为Node,排队到AQS的队列中。
addWaiter方法参数是一个标识,代表当前锁是一个互斥锁

static final Node EXCLUSIVE = null;
// 获取锁失败,封装Node,排队到AQS的队列中
private Node addWaiter(Node mode) // mode = null表示互斥锁
    // 将线程封装为Node对象
    Node node = new Node(Thread.currentThread(), mode);
    // 获取到tail节点,pred
    Node pred = tail;
    // 如果tail节点不为null(说明有人在排队)
    if (pred != null) 
        // 将当前节点的prev指向tail
        node.prev = pred;
        //为了避免并发问题,基于CAS的方式将tail指向当前线程
        if (compareAndSetTail(pred, node)) 
            // 将之前的tail的next,指向当前节点
            pred.next = node;
            //返回当前节点
            return node;
        
    
    // 如果在队列为空,或者CAS操作失败后,会执行enq方法,将node排到队列的末尾
    enq(node);
    return node;


// enq方法,传入的node就是当前节点
private Node enq(final Node node) 
    // 死循环
    for (;;) 
        // 获取tail节点
        Node t = tail;
        if (t == null)  // Must initialize
            // 如果队列为空,先初始化head节点作为头
            if (compareAndSetHead(new Node()))
                tail = head;
         else 
            // 到这里,队列肯定不为空,采用之前的逻辑,将当前节点插入到队列的末尾作为tail,循环到插入成功为止
            node.prev = t;
            if (compareAndSetTail(t, node)) 
                t.next = node;
                return t;
            
        
    

整体逻辑为,先初始化Node节点,将当前线程传入,并且标识为互斥锁。
尝试将当前Node插入到AQS队列的末尾

  • 队列为空:执行enq,先初始化空Node作为头,然后再将当前Node插入
  • 队列不为空:直接将当前Node插入,作为tail

总结:tryAcquire先尝试获取锁,拿不到要排队,在addWaiter方法中完成Node的封装并将它放到末尾。

7.ReentrantLock的acquireQueue方法

首先查看当前node是否排在队列的第一个位置(不算head),直接再次执行tryAcquire方法竞争锁资源。
如果不是在第一个位置,就尝试将当前线程挂起,最终排在有效节点后,才会将当前线程挂起。

// 队伍前面,竞争锁资源。队伍非前面,挂起线程。
final boolean acquireQueued(final Node node, int arg) //参数node就是上面addWaiter中封装好返回的node,arg就是1
    // 竞争锁资源失败标识,默认true
    boolean failed = true;
    try 
        // 线程中断标识
        boolean interrupted = false;
        // 死循环 核心代码
        for (;;) 
            // predecessor获取当前节点的上一个节点
            final Node p = node.predecessor();
            // 如果当前节点的上一个节点是head,就执行tryAcquire竞争锁资源
            if (p == head && tryAcquire(arg)) 
                // 竞争锁资源成功,进入当前业务代码
                // 因为当前线程已经拿到锁资源,将当前线程的Node设置为head,并且将Node中的prev和thread置为null
                /*node的线程获取了锁之后,这个node就没有用了,可以置空作为下一个头节点*/
                setHead(node);
                // 将之前的头节点的next置为null,让GC将之前的head回收掉
                p.next = null; 
                // 将获取锁失败的标识置为false
                failed = false;
                // 返回线程中断标识,默认情况为false
                return interrupted;
            
            // 如果当前node不是第一个节点,那就尝试将它挂起,并且中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        
     finally 
        if (failed)
            cancelAcquire(node);
    

尝试将当前线程挂起,涉及到了判断以及LockSupport的方法挂起线程
shouldParkAfterFailedAcquire(p, node)是一个判断,只有它返回ture才会调用中断方法,参数为上一个节点和当前节点。

// 判断当前线程是否可以挂起
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
    // 拿到了上一个节点的状态
    int ws = pred.waitStatus;
    // 如果ws为-1,直接返回true,代表当前节点可以挂起线程
    if (ws == Node.SIGNAL)
        return true;
    // 如果ws > 0, 说明肯定是CANCELLED状态,绕过这个节点,找上一个节点的上一个
    if (ws > 0) 
        // 循环,直到找到上一个节点为小于等于0的节点
        do 
            node.prev = pred = pred.prev;
         while (pred.waitStatus > 0);
        pred.next = node;
     else 
        // 可能为0,-2,-3,直接以CAS的方式将节点状态改为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    
    return false;


//线程状态
static final int CANCELLED =  1; // 线程被取消
static final int SIGNAL    = -1; // 正常状态
static final int CONDITION = -2; // 在执行锁时用到了condition对象调用await或signal操作,可以把状态变成SIGNAL
static final int PROPAGATE = -3; // 贡献锁,可以把状态变成SIGNAL
// 找到上一个节点状态是正常的后,就可以调用当前方法将线程挂起
private final boolean parkAndCheckInterrupt() 
    // 直接使用Unsafe类的park方法挂起线程
    LockSupport.park(this);
    return Thread.interrupted();


总结:acquireQueue方法一共两部分。第一部分是上一个节点是head时,直接tryAcquire。第二部分如果上一个节点不是head,要把线程挂起,当上一个节点状态为-1时可以挂起,如果不是-1,就找到前面-1的节点。
整个加锁过程:先用tryAcquire拿锁,拿不到添加一个node排到队列末尾,看一下队列的上一个是不是头节点,是就尝试竞争,不是就尝试挂起(调用park方法挂起)。当持有锁的线程释放锁时调用unpark方法唤醒线程。

三、释放锁的相关方法源码分析

1.ReentrantLock的unlock方法

ReentrantLock的lock、acquire、tryAcquire、addWaiter、acquireQueued方法用来获取锁资源,即把state成功的从0改为1. 同样,释放锁即把state从1改为0.
其中acquireQueued方法会将获取锁失败的线程用pack方法挂起,那么释放锁时会用unpack来唤醒挂起线程。
unlock是释放锁操作的入口

public void unlock() 
    //每次只释放1
    sync.release(1);

unlock释放锁操作不分公平和非公平,都是执行sync的release方法
释放锁的核心,就是将state从大于0的数值更改为0即为释放锁成功(如果涉及到锁重入,state的值可能是2)
并且unlock方法应该会涉及到将AQS队列中堵塞的线程进行唤醒,阻塞用的是park方法,唤醒必然是unpark方法。

2.ReentrantLock的release方法

在释放锁时,只有state被减为0之后,才会去唤醒AQS队列中被挂起的线程
在唤醒挂起线程时,如果head的next状态不正确,会从后往前找到离head最近的节点进行唤醒。
为什么从后往前找?addWaiter方法中是先将prev指针赋值,最后才会将上一个节点的next指针赋值。为了避免丢失节点或者跳过节点,必须从后往前找。

// 释放锁操作
public final boolean release(int arg)  //参数arg为1
    // 先查看tryRelease方法
    if (tryRelease(arg)) 
        // 释放锁成功,进行后续处理
        Node h = head;
        // 如果head不为null,并且当前head的状态不为0(只有状态为-1时,后面才有挂起的node线程。如果状态是0,后面没有挂起,不需要unpack唤醒线程的操作)
        if (h != null && h.waitStatus != 0)
            // 说明AQS的队列中,有Node在排队,并且线程已经挂起了
            // 需要唤醒被挂起的Node
            unparkSuccessor(h);
        return true;
    
    // 返回false,代表释放一次没有完全释放掉
    return false;

protected boolean tryRelease(int arg) 
    throw new UnsupportedOperationException();


//ReentrantLock实现的release方法
//优先查看的tryRelease
protected final boolean tryRelease(int releases) 
    // 直接获取state,并且 - releases.将state - 1
    int c = getState() - releases;
    // 如果释放锁的线程,不是占用锁的线程,直接抛出异常。(健壮性判断)
    if (Thread.currentThread() != getExclusiveOwnerThread())
    /*注:try代码块中不要写lock.lock()方法,因为finally中有lock.unlock。避免抛出IllegalMonitorStateException下面的异常
    lock.lock()方法要放到try外面,在没有获取锁成功的情况下,不能执行try里面的业务代码*/
        throw new IllegalMonitorStateException();
    // 声明了一个标识
    boolean free = false;
    // 判断state - 1后是否为0
    if (c == 0) 
        // 如果为0,锁资源释放掉了
        free = true;
        // 将占用互斥锁的线程标识置为null
        setExclusiveOwnerThread(null);
    
    // 锁之前重入了,一次没释放掉,将c赋值给state,等待下次再次执行时,再次判断
    setState(c);
    return free;

// 唤醒AQS中被挂起的线程
private void unparkSuccessor(Node node) 
    // 获取head的状态
    int ws = node.waitStatus;
    if (ws < 0)
        // 将当前head状态设置为0(代表当前的head节点已经失效)
        compareAndSetWaitStatus(node, ws, 0);
    // 拿到next节点
    Node s = node.next;
    // 如果下一个节点为null,或者状态为CANCEL,需要找到离head节点最近的有效Node
    if (s == null || s.waitStatus > 0) 
        s = null;
        // 从后往前找这个节点(为什么从后往前找,需要查看addWaiter的内容)
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    
    //找到最近的node后,直接唤醒
    if (s != null)
        // 唤醒线程
        LockSupport.unpark(s.thread);

ReentrantLock源码分析--jdk1.8

JDK1.8

ArrayList源码分析--jdk1.8
LinkedList源码分析--jdk1.8
HashMap源码分析--jdk1.8
AQS源码分析--jdk1.8
ReentrantLock源码分析--jdk1.8

ReentrantLock概述

??1. ReentrantLock是独占锁。
??2. ReentrantLock分为公平模式和非公平模式。
??3. ReentrantLock锁可重入(重新插入)

ReentrantLock源码分析

/**
 * @since 1.5
 * @author Doug Lea
 * 独占锁 --默认使用非公平锁模式
 * 可重入
 */
public class ReentrantLock implements Lock, java.io.Serializable 

    private static final long serialVersionUID = 7373984872572414699L;

    private final Sync sync;

    /**
     * Sync内部类,继承AQS,实现独占锁模式,作为基础内部类
     */
    abstract static class Sync extends AbstractQueuedSynchronizer 
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * 加锁
         */
        abstract void lock();

        /**
         * 判断 reentranLock 状态 是否被锁住(state ?= 0)
         * <p>如果没被锁住尝试 原子性上锁 失败返回false</>
         * <p>如果被锁住 判断是否是当前线程持有锁(重入锁的实现) 如果是 state + 1
         * (信号量  记录该线程持有锁的次数。 该线程每次释放所 信号量 -1。 信号量为零 代表 锁被真正释放)</>
         * <p>else 返回false</p>
         */
        final boolean nonfairTryAcquire(int acquires) 
            final Thread current = Thread.currentThread(); //获取到当前的线程
            int c = getState(); //获取锁的状态
            if (c == 0)  //目前没有人在占有锁 如果锁已被经释放 再次尝试获取锁
                if (compareAndSetState(0, acquires))  //直接尝试把当前只设置成1,如果成功,把owner设置自己,并且退出
                    setExclusiveOwnerThread(current);
                    return true;
                
            
            else if (current == getExclusiveOwnerThread())  // 如果当前线程为锁的拥有者
                int nextc = c + acquires; //这里就是重入锁的概念,如果还是自己,则进行加1操作,因为释放和获取一定要是对等的
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc); // 累加 state 的值  此段代码 实现了重入锁
                return true;
            
            return false; //当前锁被其他线程占用,退出。
        

        /**
         * 释放锁,默认releases传1
         */
        protected final boolean tryRelease(int releases) 
            int c = getState() - releases; //获取当前的锁的状态并且减1,因为要释放锁
            if (Thread.currentThread() != getExclusiveOwnerThread()) //如果当前自己不是锁的持有者,只有自己才能释放锁
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0)  //释放成功
                free = true;
                setExclusiveOwnerThread(null);
            
            setState(c); //重新设置成状态
            return free;
        

        /**
         * 如果当前线程独占着锁,返回true
         */
        protected final boolean isHeldExclusively() 
            // While we must in general read state before owner,
            // we don‘t need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        

        /**
         * 条件队列
         */
        final ConditionObject newCondition() 
            return new ConditionObject();
        

        /**
         * 返回锁的拥有者的线程
         * 当前状态为0返回null,说明在等待中
         * 当前状态不为0返回当前线程
         */
        final Thread getOwner() 
            return getState() == 0 ? null : getExclusiveOwnerThread();
        

        /**
         * 当前线程占着锁返回 state,否则返回0
         */
        final int getHoldCount() 
            return isHeldExclusively() ? getState() : 0;
        

        /**
         * state状态不为0标识上锁,为0表示在等待,不上锁
         */
        final boolean isLocked() 
            return getState() != 0;
        

        /**
         * 反序列化
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException 
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        
    
    /**
     * 构造方法,默认选择非公平锁
     */
    public ReentrantLock() 
        sync = new NonfairSync();
    

    /**
     * 构造方法,true公平锁,false非公平锁
     */
    public ReentrantLock(boolean fair) 
        sync = fair ? new FairSync() : new NonfairSync();
    

ReentrantLock继承和实现分析

技术图片

?? ReentrantLock implements Lock
?? Sync extends AbstractQueuedSynchronizer
?? 1.ReentrantLock实现Lock接口,Lock接口定义了加锁、条件队列、解锁、加锁(中断异常)
?? 2.Sync继承AQS抽象类,实现了独占锁,作为基础内部类

ReentrantLock源码分析

1. FairSync公平锁--内部类

/**
 * 公平锁
 */
static final class FairSync extends Sync 
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() 
        acquire(1);
    
    /**
     * Fair version of tryAcquire.  Don‘t grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) 
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) 
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) 
                setExclusiveOwnerThread(current);
                return true;
            
        
        else if (current == getExclusiveOwnerThread()) 
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        
        return false;
    

2. NonfairSync非公平锁--内部类

/**
 * 非公平锁的同步对象
 */
static final class NonfairSync extends Sync 
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     * 非公平锁,每次先去获取对象,所以不排队,不公平
     */
    final void lock() 
        //  通过原子操作 改变上锁状态
        if (compareAndSetState(0, 1)) // 变更成功,说明获取锁成功
            setExclusiveOwnerThread(Thread.currentThread()); // 设置持有者为当前线程
        else //变更失败
            acquire(1); //尝试以独占模式获取锁,如果失败加入node节点到队列中
    
    protected final boolean tryAcquire(int acquires) 
        return nonfairTryAcquire(acquires);
    

/**
 * 是否有等待线程
 */
public final boolean hasQueuedThreads() 
    return sync.hasQueuedThreads();

/**
 * 是否有等待线程
 */
public final boolean hasQueuedThreads() 
    return head != tail;

ReentrantLock总结

1)ReentrantLock是可重入的公平/非公平模式的独占锁。
2)ReentrantLock公平锁往往没有非公平锁的效率高,但是,并不是任何场景都是以TPS作为唯一指标,公平锁
能够减少“饥饿”发生的概率,等待越久的请求越能够得到优先满足。

以上是关于ReentrantLock源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程:ReentrantLock-FairSync源码分析(hasQueuedPredecessors)

Java并发系列ReentrantLock源码分析

Java并发编程(十三):ReentrantLock-tryLock(long timeout, TimeUnit unit)源码分析

Java并发编程:ReentrantLock-NonfairSync源码逐行深度分析(中)

JDK源码分析-ReentrantLock

ReentrantLock实现原理及源码分析