ReentrantLock源码分析

Posted 无虑的小猪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReentrantLock源码分析相关的知识,希望对你有一定的参考价值。

一、ReentrantLock介绍

  ReentrantLock是JDK1.5引入的,实现Lock接口的互斥锁。保证多线程的环境下,共享资源的原子性。与Synchronized的非公平锁不同,ReentrantLock的实现公平锁、非公平锁。ReentrantLock是重入锁,重入是指,同一个线程可以重复执行加锁的代码段。

二、ReentrantLock使用

  ReentrantLock功能与Synchronized一样,使用起来比Synchronized更加灵活。与Synchronized自动加锁、释放锁不同,ReentrantLock需要手动加锁、释放锁。使用示例如下:

 import java.util.concurrent.locks.ReentrantLock;
 
 public class TestReentrantLock 
     public static void main(String[] args) 
         ReentrantLock lock = new ReentrantLock();
         lock.lock();
         try
             // todo
         finally 
             lock.unlock();
         
     
 

三、ReentrantLock原理

  Synchronized基于对象实现;ReentrantLock基于 AQS + CAS 实现。

3.1、lock()流程图

 

  ReentrantLock基于抽象队列同步器AQS + CAS 实现的加锁、释放锁。ReentrantLock实现了公平锁、非公平锁,公平锁与非公平锁唯一的区别在于,非公平锁不会判断等待队列中是否节点等待获取锁,而是直接尝试获取锁,获取不到,再将当前线程节点添加进等待队列的尾节点,判断当前线程节点是否挂起。

3.2、unlock()流程图

  ReentrantLock释放锁的流程较为简单,优先判断持有锁资源的线程是否为当前线程,若不为当前线程抛出异常;若为当前线程,AQS的state的属性值减1,再判断减1后的值是否为0,若为0表示当前线程彻底释放锁资源,唤醒等待队列中的挂起线程节点,开始抢占锁资源。

四、ReentrantLock源码分析

1、构造函数

 private final Sync sync;
 
 // 默认使用非公平锁
 public ReentrantLock() 
     sync = new NonfairSync();
 
 
 // fair=true,公平锁;否则,非公平锁
 public ReentrantLock(boolean fair) 
     sync = fair ? new FairSync() : new NonfairSync();
 

  Sync是ReentrantLock的抽象静态内部类,继承自AQS(AbstractQueuedSynchronizer) - 抽象队列同步器,AQS中定义了锁的基本行为,AQS中用volatile修饰的state表示当前锁重入的次数。

01  Sync类图:TODO

  NonfairSync、FairSync是ReentrantLock的静态内部类,继承ReentrantLock$Sync,NonfairSync实现非公平锁,FairSync实现公平锁。

2、加锁

1、lock()

 private final Sync sync;
 
 // 加锁
 public void lock() 
     sync.lock();
 

1.1、公平锁

  调用AQS的acquire方法。ReentrantLock$FairSync#lock() 核心代码:

// 加锁
final void lock() 
    acquire(1);

1.2、非公平锁

  通过CAS尝试获取锁(将AQS的state由0修改为1),若成功,代表当前线程获取锁资源成功;若失败调用AQS的acquire方法。ReentrantLock$NonfairSync#lock() 核心代码:

 // 加锁
 final void lock() 
     // 获取锁资源,CAS 修改 AQS 的 state 属性值,,获取成功,设置当前线程
     if (compareAndSetState(0, 1))
         setExclusiveOwnerThread(Thread.currentThread());
     // 获取失败,执行AQS的acquire
     else
         acquire(1);
 

2、acquire()

  acquire()方法是Sync父类AQS中的方法,AbstractQueuedSynchronizer#acquire() 核心代码:

 // 获取锁资源
 public final void acquire(int arg) 
     // 尝试获取锁资源
     if (!tryAcquire(arg) &&
         // 当前线程为获取到锁资源,加入等待队列,同时挂起线程,等待唤醒
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
  

  1、调用tryAcquire方法:

  2、调用addWaiter方法

  3、调用acquireQueued方法

2.1、tryAcquire()

  tryAcquire()方法在FairSync、NonFairSync中均有实现,尝试获取锁资源,核心代码如下:

 // 公平锁 FairSync#tryAcquire() 方法
 protected final boolean tryAcquire(int acquires) 
     // 获取当前线程
     final Thread current = Thread.currentThread();
     // 获取AQS的 state
     int c = getState();
     // state == 0 当前没有线程占用锁资源
     if (c == 0) 
         // 判断是否有线程在排队,若有线程在排队,返回true
         if (!hasQueuedPredecessors() &&
             // 尝试抢锁
             compareAndSetState(0, acquires)) 
             // 无线程排队,将线程属性设置为当前线程
             setExclusiveOwnerThread(current);
             return true;
         
     
     // state != 0  有线程占用锁资源
     // 占用锁资源的线程是否为当前线程
     else if (current == getExclusiveOwnerThread()) 
         // state + 1
         int nextc = c + acquires;
         // 锁重入超出最大限制 (int的最大值),抛异常
         if (nextc < 0)
             throw new Error("Maximum lock count exceeded");
         // 将 state + 1 设置给 state
         setState(nextc);
         // 当前线程拿到锁资源,返回true
         return true;
     
     return false;
 
 
 // 非公平锁  NonFairSync#tryAcquire() 方法
 protected final boolean tryAcquire(int acquires) 
     return nonfairTryAcquire(acquires);
 
 
 // 非公平锁  Sync#nonfairTryAcquire() 方法
 final boolean nonfairTryAcquire(int acquires) 
      // 获取当前线程
     final Thread current = Thread.currentThread();
     // 获取AQS的 state
     int c = getState();
     // 无线程占用锁资源
     if (c == 0) 
         // CAS 修改 state 的值,修改成功,设置线程属性为当前线程,返回占用锁资源标识
         if (compareAndSetState(0, acquires)) 
             setExclusiveOwnerThread(current);
             return true;
         
     
     // 有线程占用锁资源
     // 占用锁资源的线程是当前线程(重入)
     else if (current == getExclusiveOwnerThread()) 
         // AQS 的 state + acquires
         int nextc = c + acquires;
         // 超出锁重入的上限(int的最大值),抛异常
         if (nextc < 0)
             throw new Error("Maximum lock count exceeded");
         // 将 state + acquires 设置到 state 属性
         setState(nextc);
         return true;
     
     return false;
 

  获取当前线程、AQS的state。AQS的state属性值为0,表示无线程占用锁资源,判断等待队列中是否有线程在排队,若有线程在排队,返回尝试抢锁失败标识,将线程添加进等待队列中。

  若state属性值不为0,判断持有锁资源的线程是否为当前线程,若为当前线程,AQS的state属性值 + 1,返回尝试抢锁成功标识。
  公平锁与非公平锁的整体实现流程类似,唯一不同的是,AQS的state属性值为0,无线程占用锁资源时,非公平锁不会判断是否有线程在等待队列中排队,而是直接通过CAS抢锁。

2.2、addWaiter()

  为当前线程创建入队节点AbstractQueuedSynchronizer$Node,入参mode表示锁类型,在AQS的静态内部类Node中有SHARE、EXCLUSIVE两个属性,SHARE代表共享锁、EXCLUSIVE代表排它锁。  

   AbstractQueuedSynchronizer#addWaiter() 核心代码:

 // 等待队列的尾节点,懒加载,只能通过enq方法添加节点
 private transient volatile Node tail;
 
 private Node addWaiter(Node mode) 
     // 当前线程、获取的锁类型封装为Node对象
     Node node = new Node(Thread.currentThread(), mode);
     // 获取等待队列的尾节点
     Node pred = tail;
     // 尾节点不为null
     if (pred != null) 
         // 将当前节点设置为等待队列的尾节点
         node.prev = pred;
         if (compareAndSetTail(pred, node)) 
             pred.next = node;
             return node;
         
     
     // 等待队列为空,初始化等待队列节点信息
     enq(node);
     // 返回当前线程节点
     return node;
  

  等待队列不为空,将当前线程封装的Node节点添加进队列尾部;若等待队列为空,先初始化等待队列,然后在将Node节点添加进队列尾部。

2.2.1、enq()

  等待队列尾节点为空时,执行enq()方法初始化等待队列,并将Node节点添加进等待队列中。

 private Node enq(final Node node) 
     for (;;) 
         // 获取等待队列的尾节点
         Node t = tail;
         // 等待队列为空,初始化等待队列
         if (t == null) 
             // 初始化等待队列头尾节点
             if (compareAndSetHead(new Node()))
                 tail = head;
          else 
             // 当前线程的Node添加到等待队列中
             node.prev = t;
             if (compareAndSetTail(t, node)) 
                 t.next = node;
                 return t;
             
         
     
 

2.3、acquireQueued()

  当前线程是否挂起,AbstractQueuedSynchronizer#acquireQueued() 核心代码:

 final boolean acquireQueued(final Node node, int arg) 
     // 获取锁资源标识
     boolean failed = true;
     try 
         boolean interrupted = false;
         // 自旋
         for (;;) 
             // 获取当前节点的前驱节点
             final Node p = node.predecessor();
             // 当前节点的前驱节点为头节点,并获取锁资源成功
             if (p == head && tryAcquire(arg)) 
                 // 将当前节点设置到head - 头节点
                 setHead(node);
                 // 原头节点的下一节点指向设置为null,GC回收
                 p.next = null;
                 // 设置获取锁资源成功
                 failed = false;
                 // 不管线程GC
                 return interrupted;
             
             // 如果当前节点不是head的下一节点,获取锁资源失败,尝试将线程挂起
             if (shouldParkAfterFailedAcquire(p, node) &&
                 // 线程挂起, UNSAFE.park()
                 parkAndCheckInterrupt())
                 interrupted = true;
         
      finally 
         if (failed)
             cancelAcquire(node);
     
 

  查看当前排队的Node是否是head的next, 如果是,尝试获取锁资源, 如果不是或者获取锁资源失败那么就尝试将当前Node的线程挂起(unsafe.park())。

  shouldParkAfterFailedAcquire检查并更新未成功获取锁资源的状态,返回true表示线程被挂起。

  AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire() 核心代码:

 static final class Node 
     // 线程被取消
     static final int CANCELLED =  1;
     // 等待队列中存在待被唤醒的挂起线程
     static final int SIGNAL    = -1;
     // 当前线程在Condition队列中,未在AQS对列中
     static final int CONDITION = -2;
     // 解决JDK1.5的BUG。共享锁在释放资源后,若头节点为0,无法确定真的没有后继节点
     // 如果头节点为0,需要将头节点的状态改为 -3 ,当最新拿到锁资源的线程查看
     // 是否有后继节点并且为当前锁为共享锁,需唤醒排队的线程。
     static final int PROPAGATE = -3;
 
 
 // 获取锁资源失败,挂起线程
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
     // 获取当前节点的上一个节点的状态
     int ws = pred.waitStatus;
     // 上一节点被挂起
     if (ws == Node.SIGNAL)
         // 返回true,挂起当前线程
         return true;
     if (ws > 0) 
         // 上一节点被取消,获取最近的线程挂起节点,
         // 并将当前节点的上一节点指向最近的线程挂起节点
         do 
             node.prev = pred = pred.prev;
          while (pred.waitStatus > 0);
         // 最近线程挂起节点的下一节点指向当前节点
         pred.next = node;
      else 
         // 上一节点状态小于等于0,存在线程处于等待状态,但未被挂起的场景
         // 通过CAS将处于等待的线程挂起,避免在挂起前节点获取到锁资源
         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
     
     //  返回true,不挂起当前线程
     return false;
 

  在挂起线程前,确认当前节点的上一个节点的状态。若为1,代表是取消的节点,不能挂起;若为-1,代表后续节点中有挂起的线程;若为-2 (线程在等待队列 - Condition队列中)、-3 (避免线程无法唤醒的一个状态),需要将状态改为-1之后,才能挂起当前线程。

3、释放锁

1、unlock()

  释放锁,ReentrantLock#unlock() 核心代码:

// 释放锁
public void unlock() 
    sync.release(1);

  unlock方法实际调用的是AQS的release方法,AbstractQueuedSynchronizer#release() 核心代码:

 // 等待队列的头节点,懒加载,通过setHead方法初始化
 private transient volatile Node head;
 
 // 释放锁
 public final boolean release(int arg) 
     // 当前线程释放锁资源的计数值
     if (tryRelease(arg)) 
         // 当前线程玩去释放锁资源,获取等待队列头节点
         Node h = head;
         if (h != null && h.waitStatus != 0)
             // 唤醒等待队列中待唤醒的节点
             unparkSuccessor(h);
         // 完全释放锁资源
         return true;
     
     // 当前线程未完全释放锁资源
     return false;
  

1.1、tryRelease()

  释放锁,Reenttrant$Sync#tryRelease()的核心代码

 // 释放锁
 protected final boolean tryRelease(int releases) 
     // 修改 AQS 的 state 
     int c = getState() - releases;
     // 当前线程不是持有锁的线程,抛出异常
     if (Thread.currentThread() != getExclusiveOwnerThread())
         throw new IllegalMonitorStateException();
     // 是否成功的将锁资源完全释放标识 (state == 0)
     boolean free = false;
     // 锁资源完全释放
     if (c == 0) 
         // 修改标识
         free = true;
         // 将占用锁资源的属性设置为null
         setExclusiveOwnerThread(null);
     
     // state赋值
     setState(c);
     // 返回true表示当前线程完全释放锁资源;
     // 返回false标识当前线程是由锁资源,持有计数值减少
     return free;
 
 
 

源码分析ReentrantLock实现原理

ReentrantLock基本用法

ReentrantLock重入锁,区别于Synchronized关键,ReentrantLock是在Java Api层面上的锁实现。和Synchronized关键相比,ReentrantLock提供了更高级的功能,例如指定是否为公平锁、限时等待、响应中断等。

常用方法

ReentrantLock lock = new ReentrantLock(); // 构造函数,默认非公平锁;如要指定公平,传入truelock.lock(); // 获取锁,如果没有获取成功,则进入等待lock.unlock(); // 释放锁,需要事先获取到锁lock.lockInterruptibly(); // 可中断的获取锁,需要接收一个中断通知lock.tryLock(); // 尝试获取锁,获取成功返回true,失败立即返回false,不进行等待lock.tryLock(1000, TimeUnit.MILLISECONDS); // 带时间的尝试,获取失败,等待指定时间lock.isLocked(); // 判断该锁失败是锁定状态,state!=0lock.isHeldByCurrentThread(); // 判断锁是否被当前线程持有 exclusiveOwnerThread == Thread.currentThreadlock.getHoldCount(); // 锁进入的次数,获取的是state的值,每进入一次,state+1lock.isFair(); // 是否为公平

类结构

ReentrantLock的源码实现

下面将从非公平锁、公平锁、释放锁等实现源码来分析。

非公平锁获取锁的实现

ReentrantLock的默认实现是非公平锁。

public ReentrantLock() { sync = new NonfairSync();}

所以非公平锁的lock方法实现,在NonfairSync类中。

static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L;
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }}

NonfairSynclock方法实现中,首先会通过一个CAS的操作设置state的值,如果此时刚好有线程释放锁且当前线程仍然持有cpu时间片,那么就会进行插队(非公平性)拿到这边锁,并将锁的独占线程设置为当前线程。如果获取锁失败,则通过acquire方法再次进行尝试。

acquire实现

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}

acquire方法有好几个逻辑,包括tryAcquire尝试获取锁、addWaiter创建等待节点、acquireQueued在队列内获取锁等操作。

final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 1 if (compareAndSetState(0, acquires)) { // 2 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 3 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}

1、首先判断锁的状态是否为未锁定状态,

2、如果是,则通过CAS操作,设置锁的状态值为acquires。并设置锁的独占线程为当前线程。

3、如果锁已经被占,则判锁的独占线程是否为当前线程。如果是,则锁的状态值累加acquires。因为ReentrantLock是可重入锁,所以会判断锁的状态值是否溢出,防止某个线程陷入死循环的加锁。

tryAcquire方法会返回获取锁成功或失败,如果成功,那么线程获取到锁。lock方法结束。否则就需要将当前线程入队,进行排队等待获取锁。

acquireQueued实现

acquireQueued方法是在队列内部获取锁的实现,在执行它的逻辑之前, 先要执行addWaiter方法,创建等待节点。

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node;}

NodeAbstractQueuedSynchronizer的内部类。AQS是抽象队列同步器,它在内部维护了一个称之为CLH的双向队列,队列的每个元素是Node,Node中包含了当前线程、等待状态、前驱后继节点等信息。

static final class Node { static final Node SHARED = new Node(); // 表示节点是共享模式 static final Node EXCLUSIVE = null; // 表示节点是独占模式 static final int CANCELLED = 1; // 表示该节点的线程处于取消状态 static final int SIGNAL = -1; // 表示该节点的下一个节点需要被挂起 static final int CONDITION = -2; // 表示该节点等待一个condition唤醒 static final int PROPAGATE = -3; // 表示下一个acquireShared无条件传播 volatile int waitStatus; // 等待状态,如果是Sync节点,默认值为0,如果是condition队列的节点,默认职位CONDITION(-2)。使用CAS进行原子的更新 volatile Node prev; // 指向前驱节点,当前节点依赖它检查状态 volatile Node next; // 指向后继节点,当前节点释放是需要把它唤醒 volatile Thread thread; // 当前Thread Node nextWaiter; // 执行下一个等待condition条件的节点,}

Node的状态说明:

  • SINGLE

    后继节点处于阻塞状态(被挂起),当前节点在释放锁或者取消获取锁后,必须唤醒后继节点。为了避免竞争,acquire方法必须首先表明需要一个single信息,然后原子的去重试,如果重试失败,继续阻塞。

  • CANCELLED

    当前节点由于超时或者中断被取消。所有的节点不会丢弃这个状态,而且,一个处于取消状态的节点永远不会再次进入阻塞。

  • CONDITION

    当前节点处于一个condition队列,不会用于sync队列。

  • PROPAGATE

    releaseShared需要传播到其它节点,这是在doReleaseShared方法中设置的,以确保能传播。

关于prev和next

  • prev

    prev指向前驱节点,当前节点依赖它做状态检查。在入队的时候被赋值,出队的时候被置为null。如果prev节点被取消,需要为当前节点往前找一个未取消的节点,而且一定是可以找到的,因为头节点不可能处于Cancelled状态。一个节点只有成功获取到了锁,才能成为头节点。一个被取消的节点不可能成功的拿到锁,一个线程只能取消自身,而不能取消其它节点。

  • next

    next指向后继节点,当前节点的线程释放锁的时候,需要唤醒它。入队时的时候被赋值,当要避开被取消的节点时需要调整next指向,出队时被置为null。

上面介绍了AQS中Node类的重要属性,我们继续回到addWaiter方法:

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 1 // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { // 2 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); // 3 return node;}

addWaiter方法接收一个表示Node模式的操作,这里讲的是ReentrantLock,所以mode是独占模式。

1、首先会创建一个Node节点,thread指向当前线程,mode为独占模式。

2、将tail为节点赋值给pred,如果pred不为null,那么就表示当前等待队列已经被初始化,并且已经设置好了header节点。因为是处于多线程环境,所以会通过CAS的操作去设置尾节点(tail)的值。

3、如果pred为null,也就是尾节点tail为null,那么就表示当前等待队列还未被初始化,那么调用enq方法进行队列的创建。

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) // 1 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { // 2 t.next = node; return t; } } }}

这里使用了自旋的操作,在for循环里不断的尝试,把node节点入队。这里使用自旋,是因为是多线程环境,可能存在多个线程都在对队列进行初始化。

1、使用乐观锁进行设置队列的头。注意,队列的头节点没有存储Thread信息,头节点是一个虚拟节点,不参与锁竞争,初始状态值waitStatus=0;

2、如果在第一步设置头节点失败,那么就意味着其它线程在当前线程之前初始化好了队列,那么当前线程就进入队列尾部。

当前线程入队之后,需要在队列内部继续进行获取锁的操作,这个逻辑在acquireQueued方法中。

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; // 表示获取锁成功与否 try { boolean interrupted = false; // 表示线程是否被中断 for (;;) { final Node p = node.predecessor(); // 当前线程所在节点的前驱节点 if (p == head && tryAcquire(arg)) { // 1 setHead(node);  p.next = null; // help GC failed = false;  return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 2 interrupted = true; } } finally { if (failed) // 3 cancelAcquire(node); }}

1、判断p是否为头节点,前面讲到头节点是一个虚拟节点,不参与锁竞争。然后调用tryAcquire方法尝试获取锁。如果获取锁成功,设置头节点为当前线程的节点,在设置之前,会将node中的Thread信息置为nul。

2、如果上一步获取锁还是失败(因为是非公平锁,有可能比新来的线程抢占了),那么调用shouldParkAfterFailedAcquire方法看是否需要将当前线程挂起。如果不需要挂起,那么继续进入自旋尝试获取锁。如果需要被挂起,那么调用parkAndCheckInterrupt方法将线程挂起,并检测线程是否中断。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 1 return true; if (ws > 0) { // 2 do {  node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 3 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}

1、只有当当前节点的前驱节点的waitStatus=Node.SINGAL时,返回true,表示当前线程需要等待一个唤醒条件,那么当前节点的线程可以挂起。

2、如果waitStatus>0,那么表示当前节点的前驱节点状态为CANCELLED,那么需要为当前节点需要一个非CANCELLED的节点作为前驱节点。

3、表示waitStatus要么是0,要么是PROPAGATE. 表示当前线程需要一个信号,但是先不挂起,在挂起之前再重试一次。

private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted();}

在执行了LockSupport.park(this)这行代码之后,线程就会进入阻塞。要么被唤醒,要么被中断。如果被唤醒,返回false,否则返回true;Thread.interrupted()会返回中断标识并清除中断标识。

我们再回到acquireQueued方法。

if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { interrupted = true;}

如果线程被唤醒,那么interrupted还是false,否则会被置为true,表示当前线程被中断,但是当前线程并不退出锁的竞争,而是继续尝试获取锁(不响应中断),在成功获取锁之后,在返回中断标识。

最后在回到acquire方法

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}

tryAcquire获取锁失败,并且acquireQueued方法返回true(中断标识为true),则执行selfInterrupt方法,中断线程。

注意,Thread.interrupted方法只是返回中断标识,且会清除中断标识,真正中断线程是调用Thread类的interrupt方法。

cancelAcquire实现

acquireQueued方法在finally里面还有判断failed的逻辑,如果failed为true,那么就会执行cancelAcquire方法,那么来看下该方法做了那么工作。

private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return;
node.thread = null; // 1
Node pred = node.prev; while (pred.waitStatus > 0) // 2 node.prev = pred = pred.prev;
Node predNext = pred.next; // 3
node.waitStatus = Node.CANCELLED; // 4
if (node == tail && compareAndSetTail(node, pred)) { // 5 compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { // 6 Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 7 unparkSuccessor(node); }
node.next = node; // help GC // 8 }}

1、首先将当前节点的thread置为null。

2、然后从当前节点往前找,找到状态不为cancelled的节点,将当前节点的prev指向该节点。

3、取出剔除cancelled状态之后的前驱节点的后继节点,其实predNext指向的是当前节点。

4、将当前节点的waitStatus值置为CANCELLED。

前4步执行完之后,当前节点所处的位置就有三种可能,在头节点的下一个节点、在尾节点、既不是头节点的下一个节点也不是尾节点。5,6,7的是这三种情况的说明。

5、是尾节点,并且通过CAS操作尾节点成功(因为是多线程环境,可能存在新的线程入队了,那么当前节点就不会是尾节点)。然后将pred的next节点置为null。

源码分析ReentrantLock实现原理

6、既不是头节点的后继节点,也不是尾节点。那么需要确保将当前节点的前驱节点状态为SIGNAL,且前驱节点的Thread信息不能为null。取出当前节点的后继节点,赋值给next,如果next不为null且next的状态值不为CANCELLED,那么就设置当前节点的前驱节点的后继节点为next。最后设置将当前节点的后继节点指向自己。

源码分析ReentrantLock实现原理

7、是头节点的后继节点。因为当前节点是头节点的后继节点,当前节点要取消获取锁操作,那么就需要唤醒它的后继节点。调用unparkSuccessor方法。最后设置将当前节点的后继节点指向自己。

private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 如果ws<0将当前node节点的状态值设为0 Node s = node.next; // s指向当前节点的后继节点 if (s == null || s.waitStatus > 0) { // 如果节点为空,或者后继节点也是CANCELLED状态 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) // 从尾节点开始往前找,找到第一个状态不为CANCELLED的节点,赋值给s s = t; } if (s != null) // 如果找到的节点不为空,则唤醒该节点的Thread LockSupport.unpark(s.thread); }

源码分析ReentrantLock实现原理

从上面的对节点的处理可以看出,每种情况都不会对操作当前节点的前驱节点。这是因为在前面分析的

shouldParkAfterFailedAcquire方法,判断节点是否需要挂起的逻辑中,如果当前处理的节点的前驱节点状态为CANCELLED,将CANCELLED状态的节点处理掉。这样的话,就降低了cancelAcquire方法的实现复杂度。

非公平锁流程概括

公平锁的实现

ReentrantLock要指定为公平锁,需要在构造函数传入执行公平性参数。

ReentrantLock fairLock = new ReentrantLock(true); // 默认为非公平锁,true表示为公平锁

公平锁的lock方法内部实现是调用了acquire方法。

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}

这个和非公平锁一样,但是不同点是tryAcquire方法的实现,在AQS中,该方法抛出一个UnsupportedOperationException,所以要看其子类的具体实现,这里是公平锁,所以看FairSynctryAcquire的实现。

protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 1 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 2 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 3 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}

1、判断锁的状态值是否是未锁定。

2、判断当前队列里是否存在排队的线程,如果当前线程前没有排队线程,则尝试通过CAS获取锁,如果获取成功,设置锁的独占线程为当前线程。

3、如果锁已经处于锁定状态,则判断占有锁的线程是否为当前线程,如果是当前线程,则对锁的state值累加acquires。表示重入过程。

和非公平锁最重要的区别在于,公平锁在获取锁之前,会判断队列中是否存在排队线程。

hasQueuedPredecessors实现

public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());}

在分析非公平锁入队过程中,我们知道其实在初始化头节点的时候,头节点是一个虚节点,不存储线程信息,只是用于通知下一个线程。所以只有当 h != t 成立,才能说明队列中可能存在等待的线程,具体是否存在就需要看 ((s = h.next) == null || s.thread != Thread.currentThread()) 的判断:

  • 当(s=h.next)==null 成立,说明有线程正在对队列进行初始化,因为队列是双向的,在入队的时候,新node的前驱指针先指向尾节点,在通过cas更新尾节点为新node,最后原尾节点的后继指针指向节点。所以才会出现h.next为空。

  • 如果h.next不为空,那么就需要看h.next节点的线程是不是当前线程。

 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

入队分析:

公平锁的剩余实现就和非公平锁的实现是一样的了,最重要的就是在获取锁之前,公平锁会判断当前队列是否存在排队的线程。

释放锁的实现

在使用ReentrantLock时,需要显示的释放锁,而且一般都是在finally代码块中释放。如果没有正确的释放锁,那么就会产生死锁。

ReentrantLock lock = new ReentrantLock();try { lock.lock(); // do somthing...} finally { lock.unlock();}

下面就从源码的角度分析释放锁的过程。ReentrantLock的lock方法会调用AQS的release方法实现。

public final boolean release(int arg) { if (tryRelease(arg)) { // 1 Node h = head; if (h != null && h.waitStatus != 0) // 2 unparkSuccessor(h); return true; } return false;}

1、首先会调用tryRelease方法更新锁的状态值。tryRelease具体有AQS的子类Sync实现。

protected final boolean tryRelease(int releases) { int c = getState() - releases; // 将锁的状态值减掉对应释放锁传的值 if (Thread.currentThread() != getExclusiveOwnerThread()) // 判断释放锁的线程是不是占有锁的线程,如果不是就抛异常,防止错误的释放锁,所以只有当占用锁了,才能进行释放锁 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 只有当锁的状态值为0时,才表示锁被释放了。ReentrantLock是可重入锁,所以在使用时重入了多少次,就需要释放多少次。 free = true; setExclusiveOwnerThread(null); // 将锁的占有线程置为null } setState(c); // 更新锁的值 return free; }

2、当在队列中获取到锁之后,那么头节点会更新为获取到锁的线程节点,并将线程信息置为null。现在当前线程要释放锁,所以要唤醒它的下一个节点。

unparkSuccessor方法在上面已经分析过了,简单来说就是找到队列中一个状态waitStatus值不为CANCELLED的节点,然后将它唤醒。

可中断的获取锁实现

在获取锁的过程中,如果线程被中断了,线程会感知到,然后退出锁的竞争,并抛出异常。

ReentrantLock lock = new ReentrantLock();lock.lockInterruptibly();

lockInterruptibly方法会调用AQS的acquireInterruptibly实现:

public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg);}

tryAcquire方法在上面已经分析过了,这里不在说明。主要看doAcquireInterruptibly方法:

private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())  throw new InterruptedException(); // 响应中断 } } finally { if (failed) cancelAcquire(node); }}

和非响应中断不同的区别在于,如果线程被中断了,就会抛出一个InterruptedException。当然在抛出异常之前,会执行finally代码块的逻辑,此时failed的值就是true了,那么就会执行cancelAcquire方法,该方法在上面也分析过了。

tryLock的实现

tryLock表示仅仅是try一下,不管能不能获取到锁都会立即返回。带时间参数的表示带时间的尝试,获取到锁立即返回,没有则等待一段时间再返回。

ReentrantLock lock = new ReentrantLock();lock.tryLock();lock.tryLock(1000, TimeUnit.MILLISECONDS);

tryLock方法调用 Sync的nonfairTryAcquire实现。

public boolean tryLock() { return sync.nonfairTryAcquire(1);}

NonfairTryAcquire是非公平锁的获取实现,和非公平锁不同的时候,如果获取失败就返回,不入队。

带时间的tryLock方法调用AQS的tryAcquireNanos方法实现

public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);}

当线程被中断,抛出InterruptedException。tryAcquire的实现,要看锁是公平的还是非公平锁,具体就是FairSync和NonFairSync的实现。如果tryAcquire获取失败就看doAcquireNanos能否成功。

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) // 如果等待时间已经到了,直接返回false return false; final long deadline = System.nanoTime() + nanosTimeout; // 记录等待截止时间 final Node node = addWaiter(Node.EXCLUSIVE); // 将当前线程入队等待 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime();  if (nanosTimeout <= 0L) // 等待的时间已经到了,返回获取锁失败,failed=true return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) // 如果需要挂起并且剩余等待时间大于最大自旋时间。就将线程挂起 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) // 如果线程被中断了,那就抛出异常 throw new InterruptedException(); } } finally { if (failed) // 线程超时获取锁失败,或者被中断,那么就要退出队列 cancelAcquire(node); }}

END。

以上是关于ReentrantLock源码分析的主要内容,如果未能解决你的问题,请参考以下文章

ReentrantLock源码分析

[源码分析]ReentrantLock & AbstractQueuedSynchronizer

ReentrantLock源码分析-JDK1.8

源码分析ReentrantLock实现原理

ReentrantLock源码分析

ReentrantLock源码分析