通过ReentrantLock简单了解下并发包中的锁

Posted 断弦de风筝

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过ReentrantLock简单了解下并发包中的锁相关的知识,希望对你有一定的参考价值。

  ReentrantLock在进行实例化时,可以通过构造函数的参数选择是否使用公平锁FairSync或者非公平锁NonfairSync,两者的区别比较简单,如果是公平锁则新来的线程会先检测同步队列中是否有等待的线程,如果有,则追加到同步队列尾,锁竞争过程强调的是有序进行,当然代价比较明显,线程切换会造成额外消耗;而对于非公平锁,新来的线程会直接参与竞争,比如一个线程刚刚释放锁但CPU时间片还没结束,如果再次争夺锁,那明显会更容易成功,也就是以无序争夺锁来降低线程切换从而提高吞吐量。

这里以NonfairSync(继承Sync继承AbstractQueuedSynchronizer)非公平同步器为例,从lock方法开始..

final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

  首先尝试通过CAS变更同步器中的锁标识state状态(0状态表示无锁,1表示以被占用),CAS成功则占有同步锁,同时设置属性exclusiveOwnerThread为当前线程,这个操作可以再锁重入时判断当前线程是否获取到锁,下面会有逻辑会体现出来。

  如果CAS变更失败则表示同步器被占用,执行acquire方法。

public final void acquire(int arg) {
     // 1 2
if (!tryAcquire(arg) && acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) selfInterrupt(); //3 }

acquire里一个if判断最多分为四个步骤,先简单描述下:

  1.首先tryAcquire再次尝试获取锁,当然获取到了就没有234了;

  2.失败则addWaiter将当前线程包装成Node节点;

  3.然后放到阻塞队列中进行锁获取,如果当前线程发生中断时,acquireQueued方法会返回true,进而执行步骤4,中断线程也就停止代码块了,当然从lock()进来的流程不属于可中断锁,acquireQueued返回false也就不会执行到selfInterrupt了。

  4.执行Thread.currentThread().interrupt();中断线程

然后再按步骤解释下:

  首先执行if判断的步骤1代码,还是先尝试获取锁tryAcquire这里arg是1,获取锁的操作arg一直是1

    protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { //1
            if (compareAndSetState(0, acquires)) { //2
                setExclusiveOwnerThread(current);
                return true;
            }else{//3
                return false;
            }
        }
        else if (current == getExclusiveOwnerThread()) {//4
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }else{//5
            return false;
        }
    }

  还是分为5个步骤,返回 true 则表示获取同步锁,false 则失败,

  步骤 1:判断AQS中state的状态是否等于0(无锁状态),如果是则执行内部逻辑进入步骤 2 ,否则 步骤 4

    步骤 2 : CAS变更状态,如果成功,则返回true,否则执行步骤 3

    步骤 3 : 返回 false

  步骤 4 :  如果当前AQS同步器中的state大于等于1 则先判断当前获取同步锁的线程是否与exclusiveOwnerThread锁赋值的线程一致,如果相同,则表示当前线程已经获取过锁,当前锁重入,直接修改state的值+1即可,state的数值也就表示当前加锁的次数; 否则执行步骤 5

  步骤5: 获取锁失败,且非重入锁操作,返回 false

  继续回到aquire方法,步骤 1 满足条件 !false 时,也就是没有获取到同步器的锁状态,则开始判断步骤2也就是acquireQueued(addWaiter(Node.EXCLUSIVE), arg),首先执行addWaiter方法,参数是一个Node内部类对象,结构如下:

 static final class Node {
        /**
         * 当前节点的状态
         * 默认初始化并添加到同步队列时值为 0 需要CAS变更
         * CANCELLED = 1,当前节点因超时或调用interrupt方法被中断,处于这种状态的节点不会发生状态变更,处于该状态的节点会被移出队列。
         * SIGNAL = -1,后继节点的线程处于等待状态,如果当前节点的线程释放了同步状态或者被取消,会通知后继节点,使后继节点的线程得以运行。为了避免其它节点竞争,尝试获取锁操作的节点必须先确保其处于该状态
         * CONDITION = -2,Condition等待队列上的线程的状态,其他线程调用signal后会转移到同步队列中再变回 0
         * PROPAGATE = -3, 共享模式Node可能会出现该状态,表示下一次共享式同步状态会无条件传播下去
        /**
         * 前驱节点,当节点被假如到acquire同步队列时设置到队列尾
         */
        volatile Node prev;
        /**
         * 后继节点
         */
        volatile Node next;
        /**
         * 当前线程
         */
        volatile Thread thread;
        /**
         * 两种用法,节点类型(独占和共享)和等待队列中的后继节点共有该字段:
         * 1、如果当前节点是共享的,则为SHARED常量(new Node())
         * 2、如果当前队列为Condition队列(等待队列),则为后继节点,对于等待队列来说,只有当前线程获取到同步块的情况下(独自占有锁时),才能使用(await方法),所以将节点从acquire同步队列中转移到Condition等待队列是线程安全的,当调用signal方法时,会将该节点再转移回acquire同步队列再尝试获取锁
         */
        Node nextWaiter;
        Node() {}
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

addWaiter的操作其实就是将包装当前线程的Node塞到同步队列里,让线程能够有序的竞争锁标识:

private Node addWaiter(Node mode) {
        //mode = Node.EXCLUSIVE = null 使用的Node的第二个构造函数,即当前节点的nextWaiter = null
        Node node = new Node(Thread.currentThread(), mode); 
        Node pred = tail;
        //链表尾不为null 也就表白当前节点需要同步队列中排队,需将当前节点插入到双向链表尾
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }else{
                //插入链表尾失败,则表明存在多线程竞争,则在enq(node)方法中死循环往链表尾插入,知道成功为止
            }
        }else{
            //链表尾节点为空 则表明当前同步队列可能未被初始化,则在enq(node)方法中尝试初始化
        }
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {  
                if (compareAndSetHead(new Node()))
                    //尝试初始化
                    tail = head;
            } else {
                //尝试塞到链表尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

继续看acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,该方法返回一个boolean类型,返回结果 true 则可以执行selfInterrupt()方法也就是线程中断 

 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (; ; ) {
                //获取当前线程所处node的前驱节点,如果前驱节点为空会抛出NullPointerException异常 则会触发下边的cancelAcquire方法
                final Node p = node.predecessor();
                //只有当前节点的前驱节点是同步队列的头节点时才能尝试获取同步锁,否则会阻塞下去
                //当然也只有当前线程tryAcquire获取到锁时才能结束阻塞
                if (p == head && tryAcquire(arg)) {
                    //设置当前线程所在节点为同步队列中的头结点
                    setHead(node);
                    //之前的头节点也就没用了 帮助GC
                    p.next = null;
                    failed = false;
                    //正常不被中断的话 返回false 则结束阻塞开始执行同步块内的代码
                    return interrupted;
                }
                //当前节点的前驱节点并非头结点满足阻塞条件则通过LockSupport阻塞 当然阻塞后如果前驱节点是头结点并获取到锁后,会唤醒当前线程的
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //获取当前节点的前驱节点时如果前驱节点为null则会抛出NullPointerException异常,则会执行该方法,不过这种情况不会发生,同步队列中只有一个节点时,
       //则获取到的前驱节点就是当前节点了,所以普通获取锁操作,此处不会触发,当然如果当前线程被其它线程中断时(被调用interrupt方法时),上述代码块正在执行
       //过程中抛出InterruptedException时会安全的执行取消操作,稍后在看cancelAcquire中的代码
            if (failed)
                cancelAcquire(node);
        }
    }

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取当前节点前驱节点的状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //如果前驱节点处于SIGNAL状态则表白当前线程仍需排队,可以执行后续parkAndCheckInterrupt方法,让当前线程阻塞
            return true;
        if (ws > 0) {
            //waitStatus>0表示前驱节点处于CANCELLED也就是中断状态,则继续向前驱节点到链表头的方向找 直到找到一个非CANCELLED状态的节点,然后将当前节点移动到所找到节点之后
            do {
          //对于CANCELLED中断状态的节点,在当前节点获取到锁并设置为头结点后,会跟之前的头结点一样被回收掉
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //前驱节点当前的waitStatus当前值为0(初始化状态)或者PROPAGATE.对于当前节点获取同步锁失败说明其前驱节点并非队列头结点,所以需要设置当前节点的前驱节点为SIGNAL状态,并且acquireQueued的for循环会再次重试来验证当前节点不能获取到同步锁,然后也就是在 if (ws == Node.SIGNAL) 返回 true 执行阻塞操作
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    private final boolean parkAndCheckInterrupt() {
        //阻塞当前线程 并返回当前线程的状态是否已被中断
        LockSupport.park(this);
        return Thread.interrupted();
    }

这个是阻塞方式获取锁与synchronized一致,当然相对于JVM提供的synchronized方式,并发包里的锁可以提供尝试非阻塞的获取锁(只尝试获取一次锁,成功与否都返回)、能被中断的获取锁(当获取到锁的线程被中断时,中断异常会抛出,同时锁会释放)、超时获取锁(指定时间内获取锁,获取到锁或者时间片内没获取到锁都会返回结果)

先摆一个超时锁,大致流程与上边的一致:

 public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
    }

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            //<= 0 表示只进行一次tryAcquire尝试获取锁 也就是 非阻塞的获取锁
            return false;
        //指定一个获取锁的超时时间
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                //该时间片内没获取到锁 则返回结果  也就是 超时获取锁
                if (nanosTimeout <= 0L)
                    return false;
                //首先当前节点的前驱节点并非头结点满足阻塞条件
                //然后判断一下当前节点的剩余时间片是否大于一个较短的时间阈值(默认1毫秒),如果大于则让当前线程阻塞剩余时间片时间再进行尝试获取锁操作,再次尝试获取锁锁成功与否都会返回结果了
                //如果小于这个较短的时间片 则没有必要阻塞了 意思是剩余这么短的时间 你愿意试试就再试试吧
                if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
        //锁超时的话会执行取消竞争锁操作,将当前节点设为CANCELLED状态
        //当前线程未获取到锁再重试过程中被中断时,也会触发取消锁操作

            if (failed)
                cancelAcquire(node);
        }

  线程包装进节点后,当发生锁超时或者中断时,在cancelAcquire方法重置节点状态(CANCELLED)及其它属性信息,并进行CANCELLED状态节点的清理工作。

  然后看一下cancelAcquire方法的代码: 

private void cancelAcquire(AbstractQueuedSynchronizer.Node node) {
        if (node == null)
            return;
        //重置节点的线程
        node.thread = null;
        // 从当前节点到同步队列头方向查找,将当前节点的前驱节点设置为找到的第一个非CANCELLED的节点
        AbstractQueuedSynchronizer.Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        AbstractQueuedSynchronizer.Node predNext = pred.next;
       //设为CANCELLED状态
        node.waitStatus = AbstractQueuedSynchronizer.Node.CANCELLED;
        // 如果当前节点是尾节点,删除当前节点即可,上边的while循环找到的pred节点是非CANCELLED状态的节点
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                    ((ws = pred.waitStatus) == AbstractQueuedSynchronizer.Node.SIGNAL ||
                            (ws <= 0 && compareAndSetWaitStatus(pred, ws, AbstractQueuedSynchronizer.Node.SIGNAL))) &&
                    pred.thread != null) {
                AbstractQueuedSynchronizer.Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
            node.next = node; 
        }
    }

获取锁的流程这些,然后看下释放锁的代码;

public final boolean release(int arg) {
        //尝试释放锁
        //当然 tryRelease(arg)调用执行完setState(0)后, AQS对象中的state已经修改为无锁状态0了 多以对于非公平锁而已,新来的线程是有机会获取到锁的(公平锁的话新来的线程只能默默的追加到同步队列尾,排队..)
        if (tryRelease(arg)) {
            AbstractQueuedSynchronizer.Node h = head;
            //这里需要判断一下当前持有锁的线程是否是同步队列的头结点,对于不存在多个线程竞争的情况下首个获取锁的线程或线程竞争模式下的非公平锁 是不会创建Node并加入到队列的。其次对于waitStatus这个属性,队列中的后续节点无法竞争到线程的话,这个作为某个线程所处节点的前驱节点,该变量是必然会被修改状态的 比如 SIGNAL状态
            if (h != null && h.waitStatus != 0)
                //这里开始唤醒当前节点的后继节点  毕竟不能一直阻塞下去啊
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    protected final boolean tryRelease(int releases) {
        //对于可重入锁 state锁标识的值记录了加锁次数 正如加锁的时候一个个加,释放锁的时候同样需要一个一个的减 知道state = 0 了,也就标识锁全部释放了
        //对于释放锁 由于是再同步块内完成的 所以所以操作都是线程安全的
        int c = getState() - releases;
        //为了防止其它线程乱释放,这里也是有校验的
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            //锁全部释放了 则可以重置AQS中占有锁的变量了
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }


    private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
        //获取当前线程所处节点的状态
        int ws = node.waitStatus;
        //这步。。干嘛用的?!
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //获取同步队列中当前节点的后继节点
        AbstractQueuedSynchronizer.Node s = node.next;
        //如果后继节点为null或者后继节点状态是CANCELLED取消状态 则从队列尾到队列头开始找 找到当前节点的时候for循环就结束了,所以选取的也就是当前节点的下一个非CANCELLED取消状态状态的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            //为什么从双向连边的链表尾查找呢?此处比较迷惑 迷惑原因不是if(s == null ||..)的这个判断,因为思来想去这个t的变量获取head或者tail都是重新获取同步队列,既然是用一个双向链表,实在想不通有啥区别,但是s.waitStatus>0这个判断就比较蛋疼了,如果同步队列很长..岂不是这个for循环要走很久...
            for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //唤醒当前节点的后继节点上的线程,该线程也就可以参与锁竞争了(竞争成功后会重置头结点,可以看上边的acquireQueued方法)
        if (s != null)
            LockSupport.unpark(s.thread);
    }

差不多就先简单描述这些吧。。

以上是关于通过ReentrantLock简单了解下并发包中的锁的主要内容,如果未能解决你的问题,请参考以下文章

Java中juc并发包下的Condition接口与ReentrantLock对象锁实现线程通信

java并发包源码怎么读

Java并发包4--可重入锁ReentrantLock的实现原理

Java并发包--ReentrantLock

Java并发包--ReentrantLock

AbstractQueuedSynchronizer实现原理分析——ReentrantLock