(抽象同步器Lock详解)

Posted 风清扬逍遥子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(抽象同步器Lock详解)相关的知识,希望对你有一定的参考价值。

        上一节重点介绍了Synchronized关键字的剖析,那么本章带你进入另一个锁的实现,就是李二狗写的Lock同步器,生平不识李二狗,学懂并发也枉然!!

1、前言

        来看一段伪代码:

//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程

MyLock lock = new MyLock();

lock.lock();

xxxxx业务逻辑代码


lock.unlock();

退出

        假设这个时候有3个线程来了t0, t1, t2

//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程

MyLock lock = new MyLock();

//t0, t1, t2在一起到这里
lock.lock();

xxxxx业务逻辑代码


lock.unlock();

退出

        如果这个时候只有t0进来,那么其他两个锁怎么样?有人说阻塞,那就必定要唤醒,然后在通过时间片分配执行,一批线程都在阻塞,谁分到时间片谁就执行低,这种方式无疑不是一个好的解决办法,怎么办呢?想起前面的Synchronized有个自旋的办法,我让t1,t2进来后自旋,但是不可能一直在转圈,cpu也是要消耗的,怎么办呢,我做个判断行不行,如果获取了锁,那么就退出来:

//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程

MyLock lock = new MyLock();

lock.lock();
while(true)
//t1 t2进来自旋
    if(加锁成功)
        break;
    


xxxxx业务逻辑代码


lock.unlock();

退出

        可是这个时候如果来了100个线程呢?都在循环吗cpu要爆炸了?于是我想到了让出cpu的方法,Thread.yield();如果不知道的可以去了解下这个方法。但是这样的话解决不了问题啊,这么多线程都在让cpu吗?是不是可以sleep(时间);睡眠多久呢?是不是还得大概计算下每个线程执行下面的业务逻辑的时间?有的1s,有的10s,没法估计哇!!很明显也不行。那我是不是可以让线程阻塞住?java里有个方法叫做LockSupport.park(),线程一旦碰到这个方法后,立刻会被阻塞,不会再去使用cpu资源;

//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程

MyLock lock = new MyLock();

lock.lock();
while(true)
//t1 t2进来自旋
    if(加锁成功)
        break;
    

    LockSupport.park();


xxxxx业务逻辑代码


lock.unlock();

退出

        问题我想了下又来了,这么多线程是阻塞了,一直躺这里吗,起码得有人唤醒你,在JVM里堆积的线程栈就会越来越多,总有一天会把你的内存干满的。所以有阻塞一定有唤醒,而unpark方法一定要传个参数,这个参数是线程的引用!

         那要唤醒的话,那这个线程从哪里来呢?那我在park前起码要保存个线程的引用对吧,假设我做个队列来保存这些线程的引用;

//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程

MyLock lock = new MyLock();

lock.lock();
while(true)
//t1 t2进来自旋
    if(加锁成功)
        break;
    

    HashSet,LinkQueued(),ArrayList...等等
    XXX.add|put(Thread);//塞进队列

    LockSupport.park();

    XXX.get|take();
    LockSupport.unpark(XXX.get|take());


xxxxx业务逻辑代码


lock.unlock();

退出

        那么唤醒在哪里唤醒?不可能在我上面的代码里去唤醒,又是阻塞又是唤醒,没有意义呀,所以我考虑放在unlock释放锁里面去唤醒线程。这样的话当T0执行到unlock的话,唤醒下一个线程,进入while中继续判断加锁逻辑:

//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程

MyLock lock = new MyLock();

lock.lock();
while(true)
//t1 t2进来自旋
    if(加锁成功)
        break;
    

    HashSet,LinkQueued(),ArrayList...等等
    XXX.add|put(Thread);//塞进队列

    LockSupport.park(); 


xxxxx业务逻辑代码


lock.unlock();

退出

unlock()
    //唤醒线程
    XXX.get|take();
    LockSupport.unpark(XXX.get|take());

        大概思路这么玩,重点放在怎么实现互斥锁,边边角角先别理会!

        所以这锁实现的几大核心:

  • 自旋
  • LockSupport
  • CAS
  • queue队列

        这里冒出来个CAS,我们看上面代码if里这个加锁怎么保证100,1000个线程来加锁永远只有一个线程可以加到锁?这个时候我们大部分会想到Synchronized,所以if(Synchronied(Object o))这么写吗?我相信没人会这么写,毕竟Synchronized性能虽好,但是他本身是基于JVM封装去实现,非直接和硬件指定,cpu打交道,于是Java中给我们提供另一个加锁的东西,就是CAS

        CAS:Compare And Swap是个原子操作,比较与交换,能够保证不管并发有多高我都可以保证你的原子性。

        举个例子,如果在内存中有个变量叫做a=0,这么多线程怎么去修改他呢?

        假设t1,t2来了,先把这值读到自己的内存去,此时两个线程中分别记录。

        t1 address = 0x1111,a = 0,refresh = 1,t2address = 0x1111,a = 0,refresh = 2,这个refresh就是要修改的值,t1要修改成1,t2要修改成2,每个线程都存了这个a的地址引用。

        此时如果t2要修改线程值从0改成2,改了后t1来了发现地址相同,看下a的值不一样啊,如果想修改,就必须要把a的值读回来,然后再去比较下,倘若这个时候还被改了,就一直这样下去,修改完后结束。这样每个线程都要干3个事情:执行读-修改-写操作;CAS 有效地说明了“我认为位置 address 应该包含值 a=xx;如果包含该值,则将refresh的值放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。当然CAS会产生ABA问题,这里先不多讨论。

        在java中有个Unsafe类,你可以理解为这个是JVM给开的一个后门,之间可以和硬件打交道的,这个Unsafe中有这些方法:对象比较,Int比较,Long型比较

        这个CAS其实底层调用的是汇编指令,cmpxchg(),也就是对应我们Unsafe这个实现类,调用的是c++库函数;

        那么为什么要有个队列呢?回想下线程再多,也会存在优先级,引出两个概念:公平和非公平,啥是公平?就是正常的给我排队买饭去别插队,其他人都在阻塞着,cpu唤醒线程的开销就会很大,每次老板都要来喊人下一位!非公平是什么呢?就是你们去打饭,都伸着盘子找阿姨,如果阿姨不理你,那么就还是老老实实排在后面去,如果阿姨看你帅,给你打饭了,很幸运你被选中。而排在后面的还是一样按照先进先出的顺序来,那么优点就是减少唤醒的开销,这样就有可能导致中间排队的一直打不到饭....

        说的标准点,公平锁是按照锁申请的顺序来获取锁,线程直接进入同步队列中排队,队列中的第一个线程才能获得到锁。非公平锁是线程申请锁时,直接尝试加锁,获取不到才会进入到同步队列排队。如果此时该线程刚好获取到了锁,那么它不需要因为队列中有其他线程在排队而阻塞,省去了CPU唤醒该线程的开销。而对于已经在同步队列中的线程,仍然是按照先进先出的公平规则获取锁。

        其实剩下的就是不断的生产实践,优化细节,不断迭代后就成了现在的稳定版本的Lock锁!!正式进入主题

2、什么是AQS

        Java并发编程核心在于java.concurrent.util包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,顾名思义叫做抽象的队列同步器,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。

        我们看下源码的结构图:

         看到很多Sync类,每个Sync类中都有两个子类,一个是公平,一个是非公平。

3、ReentrantLock

        ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。

使用ReentrantLock进行同步

ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁

lock.lock() //加锁

lock.unlock() //解锁

        那么,ReentrantLock如何实现Synchronized不具备的公平与非公平性呢?我们都知道Synchronized关键字实现的是非公平锁,也就是大家都来抢锁,抢不到的话,就老老实实去排队。

        ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;并且还定义了两个子类:

        1、FairSync 公平锁的实现

        2、NonfairSync 非公平锁的实现

        这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。上面主要涉及的设计模式:模板模式-子类根据需要做具体业务实现。

        AQS具备的特性:

  • 阻塞等待队列:上面提到的Queue
  • 共享/独占:后面说
  • 公平/非公平
  • 可重入:同一个线程可以反复多次获得锁,多次进行加锁释放锁
  • 允许中断

        既然说ReentrantLock是AQS的实现,那么我们看看这个是怎么实现的?

        看下抽象父类AQS的定义,这个exclusiveOwnerThread,记录的是当前独占模式下,获取锁的线程是谁?

         既然可以记录获取锁,那锁的状态,线程是否加上锁,通过什么来记录这个同步状态呢?实际上是state,这个被volatile修饰的变量。volatile可以保证可见性,其实就是让每个线程都知道这个state的值,好方便去得知锁的状态。

         那么队列怎么实现的?实际上通过一个Node内部类,构成一个双向链表,其中Node中有几个重要的属性【prev前驱节点,next下一个节点,Thread的引用,为了要唤醒线程】,为什么要用双向链表呢?好处就是可以从前往后或者从后往前遍历。

         具体来看Lock.lock()方法是怎么加锁的:

        因为内部类有公平或者非公平,在new的时候其实已经默认为公平锁。

         进去后里面有几个比较重要的方法,tryAcquire()、acquireQueued()、addWaiter()

        tryAcquire():尝试获取锁,AQS定义这个方法,实现是在子类Sync的公平锁。

protected final boolean tryAcquire(int acquires) 
        //获取当前线程
        final Thread current = Thread.currentThread();
        //获取当前锁的状态,如果是0,表示没有加锁,可以加锁
        int c = getState();
        //加锁前要判断下是否有线程排队,因为默认用的公平锁,没人获取的话我还不能直接获取,因为公平锁要进行排队加锁
        if (c == 0) 
            //如果没有线程在队列里排队,取反就是true,继续判断cas去加锁,aqcuire值传的是1,即从0改成1,加锁成功后,把独占锁的线程引用指向当前线程。
            //如果有线程在队列里,取反就是false,直接走下面的else
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) 
                setExclusiveOwnerThread(current);
                return true;
            //如果获取锁的当前线程是自己,也就是自己再去获取锁
         else if (current == getExclusiveOwnerThread()) 
            //则只要把state+1,这里和Synchronized重入性原理是一样的,这里else判断一定只能保证一个线程进来修改这个值,
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        
        return false;
    

        acquireQueued(),跳出了tryAcquire,如果加锁不成功,取反就是true,就尝试去入队列

final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                //创建一个节点
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    

        addWaiter(Node.EXCLUSIVE):线程入队

private Node addWaiter(Node mode) 
        //mode属性传的是独占,Exclusive,共享是shared
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) 
            node.prev = pred;
            if (compareAndSetTail(pred, node)) 
                pred.next = node;
                return node;
            
        
        enq(node);
        return node;
    

        Node节点有几个比较重要的属性:【pre,next,waitState,thread指向】,其中重点要说下这个waitState,描述了节点的生命状态,可以称为信号量。这个waitState有几种状态:

  • SIGNAL:表示可被唤醒

  • CANCELLED:代表出现异常,中断引起需要废弃结束

  • CONDITION:条件等待

  • PROPAGATE:传播

  • 0:初始状态Init

        那么整个节点形成的队列是这样的:其中head和tail实际上是在AQS的,拿出来为了方便看整个的节点队列,那这个节点怎么生成的呢?我们往下看

        AQS中的属性是这样的:

         下面我们开始分析addWaiter这个方法的细节:

        addWaiter(Node.EXCLUSIVE):传入一个空的Node节点,很明显Node EXCLUSIVE = null;

        在初始化的时候,创建第一个节点。

        一开始的AQS,tail其实就是null,并没有地方给初始化值。          所以pred一定是空,if判断是不走的,看enq(node);这个应该可以猜出来,enterQueue缩写的入队方法,这里for循环有点像在自旋,验证下对不对:

private Node enq(final Node node) 
        for (;;) 
            Node t = tail;
            if (t == null)  // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
             else 
                node.prev = t;
                if (compareAndSetTail(t, node)) 
                    t.next = node;
                    return t;
                
            
        
    

        继续看逻辑,先定义一个Node节点指向tail,因为这个tail本身指向的就是null,把自己指向为null的赋值给这新的Node节点t,其实也是null;执行if(t == null)的判断;这里一定要注意compareAndSetHead这个方法,细心的同学点进去可以发现,这个实现的效果是,将AQS的head指针从null赋值给新new的Node节点!!这个操作,也是原子的,即使来了几个线程,也只能保证当前节点只有一个线程初始化队列成功。

         李二狗为了防止空指针,以这样的方式来初始化一个AQS形成的队列!!将AQS的head节点和tail都指向一个new Node();

        接下来再一次循环,很明显t指向new Node()不为空了,所以走到else判断,入队同样也存在线程的竞争,不然没法保证先进先出,compareAndSetTail()保证了入队的原子性,否则会出现else里面,尾部先指向某个节点了,然后这个时候别的线程过来也修改了这tail的引用,那么刚刚上一个的线程节点就不在队列里,永远是阻塞不会被唤醒,同样栈的空间永远会存在,如果是100个线程,很明显就泄露了。

         这样就形成了节点双向链链表队列,如果依次下去,会形成非常多的节点,每个节点的线程引用都存在,方便后面唤醒线程。

        这就很好理解为什么要用for 一直循环了,因为同一时刻CAS保证只有一个线程加锁成功,其他线程入队失败了,那么可以进行重试,也就是我们说的自旋,保证线程都不会被浪费掉。

        既然enq方法走完了,返回的是当前的node节点,进入到acquireQueued(final Node node, args)这个方法里。

final boolean acquireQueued(final Node node, int arg) 
        boolean failed = true;
        try 
            boolean interrupted = false;
            for (;;) 
                final Node p = node.predecessor();
                //如果p是头结点,并且可以获取到锁就出队
                if (p == head && tryAcquire(arg)) 
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            
         finally 
            if (failed)
                cancelAcquire(node);
        
    

        假设当前的队列状态是这样的(优化下图结构),那么进入acquireQueued方法后,有个for循环,为了防止上下文切换的开销,李二狗在队列中获取Node的第一个节点 node.predecessor(); 因为进入队列的线程不是立刻进行阻塞,阻塞前要尝试获取一次锁:

1、能获取到就出队;

2、不能获取到就阻塞住;

        a) 首先第一轮循环,修改head的状态,修改成Signal标记可以被唤醒。

        b) 第二轮循环,阻塞线程parkAndCheckInterrupt(),判断线程是否可中断。

         if (p == head && tryAcquire(arg)) //如果p是头结点,并且可以获取到锁就出队;很明显图中已经表明了setHead()要做的事情,原先的头结点就已经没有了引用,后面会被gc回收掉,然后直接return掉。

         当下一轮for进来,走到现在的头结点的时候,要判断shouldParkAfterFailedAcquire(前驱结点,当前结点),实际上这个方法内部就是对状态进行的判断:实际上node结点是头结点的下一个结点,所于是要判断node的前驱结点,根据waitStatus状态位是什么,再选择是否出队。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 
        // 先拿到当前结点的前驱结点,如果这个状态是Signal,则表示可以被唤醒
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        //如果是其他状态
        if (ws > 0) 
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do 
                node.prev = pred = pred.prev;
             while (pred.waitStatus > 0);
            pred.next = node;
         else 
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        
        return false;
    

        假设waitStatus的状态是Init=0,那么就走到下面的else,shouldParkAfterFailedAcquire(),把前驱结点head的状态从0改成Signal = -1表示可以被唤醒(要想唤醒排队的第一个线程T1,持有锁的线程T0在释放锁的时候,需要判断T0结点的waitStatus是否!=0,如果!=0成立,会再把waitStatus从-1改成0;从而T1再被唤醒去抢锁,在非公平状态下可能会再失败,此时可能T3持有了锁。)。实际上前驱结点是为了后面结点服务的。这里可能文字讲的不是很清楚,读者多读几次代码就知道了,这里是很有难度的,也是并发里的精髓。

        剩下的边边角角我这里就不多说了,整体下来,李二狗写的代码可读性我觉得是比较不好的,和Spring相比还是有点生涩,但是思路我们要get到,毕竟面试喜欢聊这些。

以上是关于(抽象同步器Lock详解)的主要内容,如果未能解决你的问题,请参考以下文章

(抽象同步器Lock详解)

五:抽象队列同步器AQS应用Lock详解

synchronized和lock详解

Lock接口AbstractQueuedSynchronizer队列同步器重入锁读写锁

六:抽象队列同步器AQS应用之BlockingQueue详解

(转)Lock和synchronized比较详解