JUCReentrantLock源码分析

Posted LL.LEBRON

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUCReentrantLock源码分析相关的知识,希望对你有一定的参考价值。

ReentrantLock源码

1.简介

ReentrantLock实现了Lock接口,Lock接口里面定义了java中锁应该实现的几个方法:

//获取锁
void lock( );
//获取锁(可中断)
void lockInterruptibly() throws InterruptedException;
//尝试获取锁,如果没获取到锁,就返回false
boolean tryLock( );
//尝试获取锁,如果没获取到锁,就等待一段时间,这段时间内还没获取到锁就返回false
boolean tryLock(long time,TimeUnit unit) throws InterruptedException;
//释放锁
void unlock( );
//条件锁
condition newCondition( );

Lock接口中主要定义了 获取锁可中断地获取锁尝试非阻塞的获取锁超时的获取锁释放锁获取等待通知组件几个方法。

2.主要内部类

ReentrantLock中主要定义了三个内部类:SyncNonfairSyncFairSync

abstract static class Sync extends AbstractQueuedSynchronizer 
static final class Nonfairsync extends sync 
static final class Fairsync extends Sync 
  1. 抽象类Sync实现了AQS的部分方法
  2. NonfairSync实现了Sync,主要用于非公平锁的获取
  3. FairSync实现了Sync,主要用于公平锁的获取

3.主要属性

private final Sync sync;

主要属性就一个sync,它在构造方法中初始化,决定使用公平锁还是非公平锁的方式获取锁。

4.主要构造方法

默认构造方法(默认实现非公平锁):

public ReentrantLock() 
    sync = new NonfairSync();

自己可选择使用公平锁还是非公平锁:

public ReentrantLock(boolean fair) 
    sync = fair ? new FairSync() : new NonfairSync();

  1. 默认构造方法使用的是非公平锁
  2. 带boolean参数的构造方法可以自己决定使用公平锁还是非公平锁

5.lock()方法

5.1 公平锁实现

这里我们加锁ReentrantLock的实例这样获取:

ReentrantLock  reentrantLock = new ReentrantLock(true);

加锁过程:

//ReentrantLock.lock();
public void lock() 
    //调用sync属性的lock()方法
    //这里的sync是公平锁,所以是FairSync的实例
    sync.lock();

//ReentrantLock.FairSync.lock()
final void lock() 
    //调用AQS的acquire()方法获取锁
    //注意,这里传的值为1
    acquire(1);

//AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) 
    //先尝试加锁
    //如果失败了,就排队
    if (!tryAcquire(arg) &&
            //注意addWaiter()这里传入的节点模式为“独占模式”
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();

//ReentrantLock.FairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) 
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取当前状态变量的值
    int c = getState();
    //如果为0,说明现在还没有人占有锁
    if (c == 0) 
        //如果没有其他线程在排队,那么当前线程尝试更新state的值为1
        //如果成功了,说明当前线程获取了锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) 
            //当前线程获取了锁,把自己设置到exclusiveOwnerThread中
            //exclusiveOwnerThread是AQS的父类AbstractOwnableSynchronizer中提供的变量
            setExclusiveOwnerThread(current);
            //返回true,说明成功获得了锁
            return true;
        
    
    //如果当前线程本身就占有锁,现在又尝试获取锁
    //那么,直接让他获取锁,并返回true
    else if (current == getExclusiveOwnerThread()) 
        //状态变量state的值加一
        int nextc = c + acquires;
        //如果发送一出,则报错
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        //这里为什么不需要CAS更新state?
        //因为当前线程占有锁,其他线程只会CAS把state从0更新到1,是不会成功的
        //所以不存在竞争,自然不需要使用CAS来更新
        setState(nextc);
        //当线程获取锁成功
        return true;
    
    //当线程获取锁失败
    return false;

//AbstractQueuedSynchronizer.addWaiter()
//如果获取锁失败了,就会调用这个方法
private Node addWaiter(Node mode) 
    //新建一个节点
    Node node = new Node(Thread.currentThread(), mode);
    //这里先尝试把新节点加到尾节点后面
    //如果成功了就返回新节点
    //如果没成功再调用enq()方法不断尝试
    Node pred = tail;
    //如果尾节点不为空
    if (pred != null) 
        //设置新节点的前置节点为现在的尾节点
        node.prev = pred;
        // CAS更新尾节点为新节点
        if (compareAndSetTail(pred, node)) 
            //如果成功,把旧尾节点的下一个节点指向新节点
            pred.next = node;
            //返回新节点
            return node;
        
    
    //如果上面尝试入队新节点没成功,调用enq()处理
    enq(node);
    return node;

//AbstractQueuedSynchronizer.enq()
private Node enq(final Node node) 
    //自旋,不断尝试
    for (;;) 
        Node t = tail;
        //如果尾节点为空,说明还未初始化
        if (t == null)  // Must initialize
            //初始化头节点和尾节点
            if (compareAndSetHead(new Node()))
                tail = head;
         else 
            //如果尾节点不为空
            //设置新节点的前一个节点为现在的尾节点
            node.prev = t;
            //CAS更新尾节点为新节点
            if (compareAndSetTail(t, node)) 
                //成功了,则设置旧尾节点的下一个节点为新节点
                t.next = node;
                //返回旧节点
                return t;
            
        
    

//AbstractQueuedSynchronizer.acquireQueued()
//调用上面的addWaiter()方法使得新节点已经成功入队了,这个方法是尝试让当前节点来获取锁的
final boolean acquireQueued(final Node node, int arg) 
    //失败标记
    boolean failed = true;
    try 
        //中断标记
        boolean interrupted = false;
        //自旋
        for (;;) 
            //当前节点的前一个节点
            final Node p = node.predecessor();
            //如果当前节点的前一个节点为head节点,则说明轮到自己获取锁了
            //调用ReentrantLock.FairSync.tryAcquire()方法再次尝试获取锁
            if (p == head && tryAcquire(arg)) 
                //尝试获取锁成功
                //这里同时只会有一个线程在执行,所以不需要用CAS更新
                //把当前节点设置为新的头节点
                setHead(node);
                //并把上一个节点从链表中删除
                p.next = null; // help GC
                //标记为未失败
                failed = false;
                //还是需要获得锁后, 才能返回打断状态
                return interrupted;
            
            //是否需要阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                //真正阻塞的方法
                parkAndCheckInterrupt())
                //如果中断了
                interrupted = true;
        
     finally 
        //如果失败了
        if (failed)
            //取消获取锁
            cancelAcquire(node);
    

//AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
//第一次调用会把前一个节点的等待状态设置为SIGNAL,并返回false
//第二次调用才会返回true
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
    //上一个节点的等待状态
    //注意Node的waitStatus字段我们在上面创建Node的时候并没有指定
    // 也就是说使用的是默认值0
    //这里把各种等待状态再贴出来
    // static final int CANCELLED = 1;
    // static final int SIGNAL= -1;
    // static final int CONDITION = -2;
    // static final int PROPAGATE = -3;

    int ws = pred.waitStatus;
    // 如果等待状态为SIGNAL(等待唤醒),直接返回true
    if (ws == Node.SIGNAL)
        return true;
    // 如果前一个节点的状态大于0,也就是已取消状态
    if (ws > 0) 
        // 把前面所有取消状态的节点都从链表中删除
        do 
            node.prev = pred = pred.prev;
         while (pred.waitStatus > 0);
        pred.next = node;
     else 
        // 如果前一个节点的状态小于等于0,则把其状态设置为等待唤醒
        // 这里可以简单地理解为把初始状态0设置为SIGNAL
        // CONDITION是条件锁的时候使用的
        // PROPAGATE是共享锁使用的
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    
    return false;

// AbstractQueuedSynchronizer.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() 
    //阻塞当前线程
    //底层调用的是Unsafe的park()方法
    LockSupport.park(this);
    //返回是否已中断
    return Thread.interrupted();

// AbstractQueuedSynchronizer.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() 
    //阻塞当前线程
    //底层调用的是Unsafe的park()方法
    LockSupport.park(this);
    //返回是否已中断
    return Thread.interrupted();

lock()方法的主要流程:

  1. ReentrantLock#lock()
  2. ReentrantLock.FairSync#lock() // 公平模式获取锁
  3. AbstractQueuedSynchronizer#acquire() // AQS的获取锁方法
  4. ReentrantLock.FairSync#tryAcquire() // 尝试获取锁
  5. AbstractQueuedSynchronizer#addWaiter() // 添加到队列
  6. AbstractQueuedSynchronizer#enq() // 入队
  7. >AbstractQueuedSynchronizer#acquireQueued() // 里面有个for()循环,唤醒后再次尝试获取锁
  8. AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() // 检查是否要阻塞
  9. AbstractQueuedSynchronizer#parkAndCheckInterrupt() // 真正阻塞的地方

公平锁获取锁的主要过程大致如下:

  1. 尝试获取锁,如果获取到了就直接返回了
  2. 尝试获取锁失败,再调用addWaiter()构建新节点并把新节点入队
  3. 然后调用acquireQueued()再次尝试获取锁,如果成功了,直接返回
  4. 如果再次失败,再调用shouldParkAfterFailedAcquire()将节点的等待状态置为等待唤醒(SIGNAL)
  5. 调用parkAndCheckInterrupt()阻塞当前线程
  6. 如果被唤醒了,会继续在acquireQueued()for()循环再次尝试获取锁,如果成功了就返回
  7. 如果不成功,再次阻塞,重复(3)(4)(5)直到成功获取到锁

独占式同步状态获取流程,也就是acquire(int arg)方法调用流程:

5.2 非公平锁实现

//ReentrantLock.lock();
public void lock() 
    sync.lock();

// ReentrantLock.NonfairSync.lock()
final void lock() 
    //直接尝试CAS更新状态变量
    if (compareAndSetState(0, 1))
        //如果更新成功,说明获取到了锁,把当前线程设为独占线程
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);

//AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) 
    //先尝试加锁
    //如果失败了,就排队
    if (!tryAcquire(arg) &&
            //注意addWaiter()这里传入的节点模式为“独占模式”
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();

//ReentrantLock.NonfairSync.tryAcquire
protected final boolean tryAcquire(int acquires) 
    //调用父类的方法
    return nonfairTryAcquire(acquires);

// ReentrantLock.Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) 
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取状态值
    int c = getState();
    if (c == 0) 
        //如果状态变量为0,再次尝试CAS更新状态变量的值
        //相对于公平锁模式少了!hasQueuedPredecessors()条件,不会检查AQS队列
        if (compareAndSetState(0, acquires)) 
            setExclusiveOwnerThread(current);
            return true;
        
    
    // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
    else if (current == getExclusiveOwnerThread()) 
        //state++
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    
    return false;

相对于公平锁,非公平锁加锁的过程主要有两点不同:

  1. 一开始就尝试CAS更新状态变量state的值,如果成功了就获取到锁了
  2. 在tryAcquire()的时候没有检查是否前面有排队的线程,直接上去获取锁才不管别人有没有排队呢

总的来说,相对于公平锁,非公平锁在一开始就多了两次直接尝试获取锁的过程。

6.lockInterruptibly()方法

可中断地获取锁,和 lock()方法的不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程

//ReentrantLock.lockInterruptibly()
public void lockInterruptibly() throws InterruptedException 
    sync.acquireInterruptibly(1);

//AbstractQueuedSynchronizer.acquireInterruptibly(int arg)
public final void acquireInterruptibly(int arg)
        throws InterruptedException 
    if (Thread.interrupted())
        throw new InterruptedException();
    //如果没有获得到锁,进入doAcquireInterruptibly方法
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);

private void doAcquireInterruptibly(int arg)
        throws InterruptedException 
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try 
        for (; ; ) 
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) 
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                //在park过程中如果被interrupt会进入这里
                //这时候抛出一次,而不会再次进入for循环
                throw new InterruptedException();
        
     finally 
        if (failed)
            cancelAcquire(node);
    

7.tryLock()方法

尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false

// ReentrantLock.tryLock()
public boolean tryLock() 
    //直接调用Sync的nonfairTryAcquire()方法
    return sync.nonfairTryAcquire(1);

// ReentrantLock.Sync.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) 
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取状态值
    int c = getState();
    if (c == 0) 
        //如果状态变量为0,再次尝试CAS更新状态变量的值
        //相对于公平锁模式少了!hasQueuedPredecessors()条件
        if (compareAndSetState(0, acquires)) 
            setExclusiveOwnerThread(current);
            return true;
        
    
    else if (current == getExclusiveOwnerThread()) 
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    
    return false;

tryLock()方法比较简单,直接以非公平的模式去尝试获取一次锁,获取到了或者锁本来就是当前线程占有着就返回true,否则返回false。

8.tryLock(long time, TimeUnit unit)方法

超时的获取锁,当前线程在以下3种情况下会返回:

①当前线程在超时时间内获得了锁

②当前线程在超时时间内被中断

③超时时间结束,返回false

//ReentrantLock.tryLock()
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException 
    // 调用AQS中的方法
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));

// AbstractQueuedSynchronizer.tryAcquireNanos()
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException 
    //如果线程中断,抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //先尝试获取一次锁
    return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);

//AbstractQueuedSynchronizer.doAcquireNanos()
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException 
    //如果时间到期,直接返回false
    if (nanosTimeout <= 0L)
        return false;
    //到期时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try 
        for (; ; ) 
            final Node p = node.predecessor();
      

以上是关于JUCReentrantLock源码分析的主要内容,如果未能解决你的问题,请参考以下文章

v73.02 鸿蒙内核源码分析(参考手册) | 阅读内核源码必备工具 | 百篇博客分析OpenHarmony源码

Mesos源码分析

Mybatis源码分析

Spring源码分析专题——目录

ARouter源码分析

Handler源码分析