ReentrantLock源码分析

Posted 醉酒的小男人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ReentrantLock源码分析相关的知识,希望对你有一定的参考价值。

AQS是java concurrent包的基础,像Lock、CountDownLatch、Semaphore等都是基于它实现的。AQS是一个模板方法模式tryAcquire()、tryRelease()、tryAcquireShared()、tryReleaseShared()等都可以被子类重写的。AQS有共享锁和独占锁方式。

设计锁的思考

公平锁和非公平锁

公平锁自然是遵循FIFO(先进先出)原则的,先到的线程会优先获取资源,后到的会进行排队等待,而非公平锁是不遵循这个原则的。

公平锁

1.在公平锁里,判断当前锁占用状态==0后,会继续判断hasQueuedPredecessors,即当前队列是否有排队的情况,如果没有才会尝试获取锁。

2.这样可以保证遵循FIFO的原则,每一个先来的线程都可以最先获取到锁,但是增加了上下文切换与等待线程的状态变换时间。所以效率相较于非公平锁较慢

非公平锁

1.可以看到非公平锁里,判断当前锁占用状态==0直接会进行compareAndSetState尝试获取锁。若此时有线程排队,可能争夺不过资源。所以这是非公平的

2.在非公平锁里,因为可以直接compareAndSetState来获取锁,不需要加入队列,然后等待队列头线程唤醒再获取锁这一步骤,所以效率较快

成员变量

//head:等待队列头部,延迟初始化,直到调用enq才真正初始化
private transient volatile Node head;
//tail:等待队列尾部,延迟初始化,直到调用enq才真正初始化;
private transient volatile Node tail;
//state:AQS状态位,通过try*方法维护;
private volatile int state;
//spinForTimeoutThreshold:自旋锁超时阀值;
static final long spinForTimeoutThreshold = 1000L;
//实际上head是个空节点,其thread和prev属性都为null 

Node内部类

static final class Node {
    static final Node SHARED = new Node();//标识等待节点处于共享模式
    static final Node EXCLUSIVE = null;//标识等待节点处于独占模式

    static final int CANCELLED =  1;//由于超时或中断,节点已被取消
    static final int SIGNAL    = -1;//表示下一个节点是通过park堵塞的,需要通过unpark唤醒
    static final int CONDITION = -2;//表示线程在等待条件变量(先获取锁,加入到条件等待队列,然后释放锁,等待条件变量满足条件;只有重新获取锁之后才能返回)
    static final int PROPAGATE = -3;//表示后续结点会传播唤醒的操作,共享模式下起作用

    //等待状态:对于condition节点,初始化为CONDITION;其它情况,默认为0,通过CAS操作原子更新
    volatile int waitStatus;
    //前节点
    volatile Node prev;
    //后节点
    volatile Node next;
    //线程对象
    volatile Thread thread;
    //对于Condtion表示下一个等待条件变量的节点;其它情况下用于区分共享模式和独占模式;
    Node nextWaiter;
}

 源码分析

源码分析阶段主要以非公平锁为主,这是网上找的图加深理解

 

 package com;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockTest {
   final static Lock lock = new ReentrantLock();
    public static void main(String[] args) {
        lock.lock();
        System.out.println("hello world");
        lock.unlock();
    }
}

lock

public void lock() {
        sync.lock();
}

 final void lock() {
     //如果state的值是0的话修改为1,并且把当前线程设置为独占状态,这里使用的CAS来保证并发情况下只有一个线 程可以获得锁,其它线程执行acquire
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //没有抢占到锁的线程
                acquire(1);
        } 

protected final void setExclusiveOwnerThread(Thread thread) {
       //设置为独占线程,用这个值可以判断请求线程是否是当前线程
        exclusiveOwnerThread = thread;
    } 

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    } 

tryAcquire

tryAcquire()方法其实只做了两件事,再次抢占锁和判断是否重入,抢占到锁或者重入成功返回true,抢占锁这里也是对程序的优化,尽量不让线程进入队列堵塞。

protected final boolean tryAcquire(int acquires) {
            //入参acquires为1
            return nonfairTryAcquire(acquires);
        }

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { 无锁状态CAS抢占锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //判断是否可以重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        } 

addWaiter

addWaiter()方法主要是把当前线程封装成独占的Node节点,添加到同步队列的尾部。空节点会进行enq()方法,先创建亚节点并指向头结点和尾节点,然后执行三步:1.当前节点prev等于尾节点 2.cas修改尾节点的引用为当前节点 3.尾节点的next等于当前节点。

private Node addWaiter(Node mode) {
        //根据传入的模式(独占or共享)创建Node对象;
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            //指定当前节点的前一个节点为尾节点
            node.prev = pred;
            //修改尾节点的引用为当前节点
            if (compareAndSetTail(pred, node)) {
                //修改尾节点的next引用为当前节点
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //创建哑节点指向头结点和尾节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

if语句

 else语句

acquireQueued

acquireQueued()方法首先是获得当前节点的前一个节点是不是head节点并且抢占锁成功,那么把当前节点设置为head节点。否则就把当前节点的前一个节点的state状态改为-1挂起当前线程。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //当前节点的前一节点
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    //放弃前一个节点,可以被GC
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //获取锁失败后判断是否挂起当前线程
                //这里如果状态不是-1会一直自旋的
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果前节点为-1则返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
       //如果前一节点已取消,则往前找,直到找到一个状态正常的节点,其实就是从队列删除取消状态的节点
        if (ws > 0) { 
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //如果前节点不处于取消状态,则设为signal -1.(0或-3设为-1)
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//阻塞该线程,至此该线程进入等待状态,等着unpark和interrupt叫醒
        return Thread.interrupted();//叫醒之后返回该线程是否在中断状态,并会清除中断记号。
    } 

最后多个线程只会有一个抢占成功,其它的挂起堵塞。

unlock

 public void unlock() {
        sync.release(1);
    } 

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); //unpark唤醒第一个等待节点
            return true;
        }
        return false;
    } 

//主要就是把ExclusiveOwnerThread设置为空,state设置为0
protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //如果节点为空或者被取消了,则从队列尾部开始查找,找到离node最近的非null且状态正常的节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
       //取出找到节点的线程对象,通过unpark释放锁,然后执行acquireQueued方法唤醒继续抢占锁                           
        if (s != null)
            LockSupport.unpark(s.thread);
    }

以上是关于ReentrantLock源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程实战—–“J.U.C”:ReentrantLock之二lock方法分析

ReentrantLock源码分析

ReentrantLock源码分析--jdk1.8

ReentrantLock的实现原理

ReentrantLock源码分析

[源码分析]ReentrantLock & AbstractQueuedSynchronizer