(抽象同步器Lock详解)
Posted 风清扬逍遥子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(抽象同步器Lock详解)相关的知识,希望对你有一定的参考价值。
上一节重点介绍了Synchronized关键字的剖析,那么本章带你进入另一个锁的实现,就是李二狗写的Lock同步器,生平不识李二狗,学懂并发也枉然!!
1、前言
来看一段伪代码:
//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程
MyLock lock = new MyLock();
lock.lock();
xxxxx业务逻辑代码
lock.unlock();
退出
假设这个时候有3个线程来了t0, t1, t2
//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程
MyLock lock = new MyLock();
//t0, t1, t2在一起到这里
lock.lock();
xxxxx业务逻辑代码
lock.unlock();
退出
如果这个时候只有t0进来,那么其他两个锁怎么样?有人说阻塞,那就必定要唤醒,然后在通过时间片分配执行,一批线程都在阻塞,谁分到时间片谁就执行低,这种方式无疑不是一个好的解决办法,怎么办呢?想起前面的Synchronized有个自旋的办法,我让t1,t2进来后自旋,但是不可能一直在转圈,cpu也是要消耗的,怎么办呢,我做个判断行不行,如果获取了锁,那么就退出来:
//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程
MyLock lock = new MyLock();
lock.lock();
while(true)
//t1 t2进来自旋
if(加锁成功)
break;
xxxxx业务逻辑代码
lock.unlock();
退出
可是这个时候如果来了100个线程呢?都在循环吗cpu要爆炸了?于是我想到了让出cpu的方法,Thread.yield();如果不知道的可以去了解下这个方法。但是这样的话解决不了问题啊,这么多线程都在让cpu吗?是不是可以sleep(时间);睡眠多久呢?是不是还得大概计算下每个线程执行下面的业务逻辑的时间?有的1s,有的10s,没法估计哇!!很明显也不行。那我是不是可以让线程阻塞住?java里有个方法叫做LockSupport.park(),线程一旦碰到这个方法后,立刻会被阻塞,不会再去使用cpu资源;
//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程
MyLock lock = new MyLock();
lock.lock();
while(true)
//t1 t2进来自旋
if(加锁成功)
break;
LockSupport.park();
xxxxx业务逻辑代码
lock.unlock();
退出
问题我想了下又来了,这么多线程是阻塞了,一直躺这里吗,起码得有人唤醒你,在JVM里堆积的线程栈就会越来越多,总有一天会把你的内存干满的。所以有阻塞一定有唤醒,而unpark方法一定要传个参数,这个参数是线程的引用!
那要唤醒的话,那这个线程从哪里来呢?那我在park前起码要保存个线程的引用对吧,假设我做个队列来保存这些线程的引用;
//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程
MyLock lock = new MyLock();
lock.lock();
while(true)
//t1 t2进来自旋
if(加锁成功)
break;
HashSet,LinkQueued(),ArrayList...等等
XXX.add|put(Thread);//塞进队列
LockSupport.park();
XXX.get|take();
LockSupport.unpark(XXX.get|take());
xxxxx业务逻辑代码
lock.unlock();
退出
那么唤醒在哪里唤醒?不可能在我上面的代码里去唤醒,又是阻塞又是唤醒,没有意义呀,所以我考虑放在unlock释放锁里面去唤醒线程。这样的话当T0执行到unlock的话,唤醒下一个线程,进入while中继续判断加锁逻辑:
//我设计一个锁,这个锁是用来让线程停住,一次只能一个进入一个线程
MyLock lock = new MyLock();
lock.lock();
while(true)
//t1 t2进来自旋
if(加锁成功)
break;
HashSet,LinkQueued(),ArrayList...等等
XXX.add|put(Thread);//塞进队列
LockSupport.park();
xxxxx业务逻辑代码
lock.unlock();
退出
unlock()
//唤醒线程
XXX.get|take();
LockSupport.unpark(XXX.get|take());
大概思路这么玩,重点放在怎么实现互斥锁,边边角角先别理会!
所以这锁实现的几大核心:
- 自旋
- LockSupport
- CAS
- queue队列
这里冒出来个CAS,我们看上面代码if里这个加锁怎么保证100,1000个线程来加锁永远只有一个线程可以加到锁?这个时候我们大部分会想到Synchronized,所以if(Synchronied(Object o))这么写吗?我相信没人会这么写,毕竟Synchronized性能虽好,但是他本身是基于JVM封装去实现,非直接和硬件指定,cpu打交道,于是Java中给我们提供另一个加锁的东西,就是CAS
CAS:Compare And Swap是个原子操作,比较与交换,能够保证不管并发有多高我都可以保证你的原子性。
举个例子,如果在内存中有个变量叫做a=0,这么多线程怎么去修改他呢?
假设t1,t2来了,先把这值读到自己的内存去,此时两个线程中分别记录。
t1 address = 0x1111,a = 0,refresh = 1,t2address = 0x1111,a = 0,refresh = 2,这个refresh就是要修改的值,t1要修改成1,t2要修改成2,每个线程都存了这个a的地址引用。
此时如果t2要修改线程值从0改成2,改了后t1来了发现地址相同,看下a的值不一样啊,如果想修改,就必须要把a的值读回来,然后再去比较下,倘若这个时候还被改了,就一直这样下去,修改完后结束。这样每个线程都要干3个事情:执行读-修改-写操作;CAS 有效地说明了“我认为位置 address 应该包含值 a=xx;如果包含该值,则将refresh的值放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。当然CAS会产生ABA问题,这里先不多讨论。
在java中有个Unsafe类,你可以理解为这个是JVM给开的一个后门,之间可以和硬件打交道的,这个Unsafe中有这些方法:对象比较,Int比较,Long型比较
这个CAS其实底层调用的是汇编指令,cmpxchg(),也就是对应我们Unsafe这个实现类,调用的是c++库函数;
那么为什么要有个队列呢?回想下线程再多,也会存在优先级,引出两个概念:公平和非公平,啥是公平?就是正常的给我排队买饭去别插队,其他人都在阻塞着,cpu唤醒线程的开销就会很大,每次老板都要来喊人下一位!非公平是什么呢?就是你们去打饭,都伸着盘子找阿姨,如果阿姨不理你,那么就还是老老实实排在后面去,如果阿姨看你帅,给你打饭了,很幸运你被选中。而排在后面的还是一样按照先进先出的顺序来,那么优点就是减少唤醒的开销,这样就有可能导致中间排队的一直打不到饭....
说的标准点,公平锁是按照锁申请的顺序来获取锁,线程直接进入同步队列中排队,队列中的第一个线程才能获得到锁。非公平锁是线程申请锁时,直接尝试加锁,获取不到才会进入到同步队列排队。如果此时该线程刚好获取到了锁,那么它不需要因为队列中有其他线程在排队而阻塞,省去了CPU唤醒该线程的开销。而对于已经在同步队列中的线程,仍然是按照先进先出的公平规则获取锁。
其实剩下的就是不断的生产实践,优化细节,不断迭代后就成了现在的稳定版本的Lock锁!!正式进入主题
2、什么是AQS
Java并发编程核心在于java.concurrent.util包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,顾名思义叫做抽象的队列同步器,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。
我们看下源码的结构图:
看到很多Sync类,每个Sync类中都有两个子类,一个是公平,一个是非公平。
3、ReentrantLock
ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。
使用ReentrantLock进行同步
ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁
lock.lock() //加锁
lock.unlock() //解锁
那么,ReentrantLock如何实现Synchronized不具备的公平与非公平性呢?我们都知道Synchronized关键字实现的是非公平锁,也就是大家都来抢锁,抢不到的话,就老老实实去排队。
ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;并且还定义了两个子类:
1、FairSync 公平锁的实现
2、NonfairSync 非公平锁的实现
这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。上面主要涉及的设计模式:模板模式-子类根据需要做具体业务实现。
AQS具备的特性:
- 阻塞等待队列:上面提到的Queue
- 共享/独占:后面说
- 公平/非公平
- 可重入:同一个线程可以反复多次获得锁,多次进行加锁释放锁
- 允许中断
既然说ReentrantLock是AQS的实现,那么我们看看这个是怎么实现的?
看下抽象父类AQS的定义,这个exclusiveOwnerThread,记录的是当前独占模式下,获取锁的线程是谁?
既然可以记录获取锁,那锁的状态,线程是否加上锁,通过什么来记录这个同步状态呢?实际上是state,这个被volatile修饰的变量。volatile可以保证可见性,其实就是让每个线程都知道这个state的值,好方便去得知锁的状态。
那么队列怎么实现的?实际上通过一个Node内部类,构成一个双向链表,其中Node中有几个重要的属性【prev前驱节点,next下一个节点,Thread的引用,为了要唤醒线程】,为什么要用双向链表呢?好处就是可以从前往后或者从后往前遍历。
具体来看Lock.lock()方法是怎么加锁的:
因为内部类有公平或者非公平,在new的时候其实已经默认为公平锁。
进去后里面有几个比较重要的方法,tryAcquire()、acquireQueued()、addWaiter()
tryAcquire():尝试获取锁,AQS定义这个方法,实现是在子类Sync的公平锁。
protected final boolean tryAcquire(int acquires)
//获取当前线程
final Thread current = Thread.currentThread();
//获取当前锁的状态,如果是0,表示没有加锁,可以加锁
int c = getState();
//加锁前要判断下是否有线程排队,因为默认用的公平锁,没人获取的话我还不能直接获取,因为公平锁要进行排队加锁
if (c == 0)
//如果没有线程在队列里排队,取反就是true,继续判断cas去加锁,aqcuire值传的是1,即从0改成1,加锁成功后,把独占锁的线程引用指向当前线程。
//如果有线程在队列里,取反就是false,直接走下面的else
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires))
setExclusiveOwnerThread(current);
return true;
//如果获取锁的当前线程是自己,也就是自己再去获取锁
else if (current == getExclusiveOwnerThread())
//则只要把state+1,这里和Synchronized重入性原理是一样的,这里else判断一定只能保证一个线程进来修改这个值,
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
return false;
acquireQueued(),跳出了tryAcquire,如果加锁不成功,取反就是true,就尝试去入队列
final boolean acquireQueued(final Node node, int arg)
boolean failed = true;
try
boolean interrupted = false;
for (;;)
//创建一个节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg))
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
finally
if (failed)
cancelAcquire(node);
addWaiter(Node.EXCLUSIVE):线程入队
private Node addWaiter(Node mode)
//mode属性传的是独占,Exclusive,共享是shared
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null)
node.prev = pred;
if (compareAndSetTail(pred, node))
pred.next = node;
return node;
enq(node);
return node;
Node节点有几个比较重要的属性:【pre,next,waitState,thread指向】,其中重点要说下这个waitState,描述了节点的生命状态,可以称为信号量。这个waitState有几种状态:
-
SIGNAL:表示可被唤醒
-
CANCELLED:代表出现异常,中断引起需要废弃结束
-
CONDITION:条件等待
-
PROPAGATE:传播
-
0:初始状态Init
那么整个节点形成的队列是这样的:其中head和tail实际上是在AQS的,拿出来为了方便看整个的节点队列,那这个节点怎么生成的呢?我们往下看
AQS中的属性是这样的:
下面我们开始分析addWaiter这个方法的细节:
addWaiter(Node.EXCLUSIVE):传入一个空的Node节点,很明显Node EXCLUSIVE = null;
在初始化的时候,创建第一个节点。
一开始的AQS,tail其实就是null,并没有地方给初始化值。 所以pred一定是空,if判断是不走的,看enq(node);这个应该可以猜出来,enterQueue缩写的入队方法,这里for循环有点像在自旋,验证下对不对:
private Node enq(final Node node)
for (;;)
Node t = tail;
if (t == null) // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
else
node.prev = t;
if (compareAndSetTail(t, node))
t.next = node;
return t;
继续看逻辑,先定义一个Node节点指向tail,因为这个tail本身指向的就是null,把自己指向为null的赋值给这新的Node节点t,其实也是null;执行if(t == null)的判断;这里一定要注意compareAndSetHead这个方法,细心的同学点进去可以发现,这个实现的效果是,将AQS的head指针从null赋值给新new的Node节点!!这个操作,也是原子的,即使来了几个线程,也只能保证当前节点只有一个线程初始化队列成功。
李二狗为了防止空指针,以这样的方式来初始化一个AQS形成的队列!!将AQS的head节点和tail都指向一个new Node();
接下来再一次循环,很明显t指向new Node()不为空了,所以走到else判断,入队同样也存在线程的竞争,不然没法保证先进先出,compareAndSetTail()保证了入队的原子性,否则会出现else里面,尾部先指向某个节点了,然后这个时候别的线程过来也修改了这tail的引用,那么刚刚上一个的线程节点就不在队列里,永远是阻塞不会被唤醒,同样栈的空间永远会存在,如果是100个线程,很明显就泄露了。
这样就形成了节点双向链链表队列,如果依次下去,会形成非常多的节点,每个节点的线程引用都存在,方便后面唤醒线程。
这就很好理解为什么要用for 一直循环了,因为同一时刻CAS保证只有一个线程加锁成功,其他线程入队失败了,那么可以进行重试,也就是我们说的自旋,保证线程都不会被浪费掉。
既然enq方法走完了,返回的是当前的node节点,进入到acquireQueued(final Node node, args)这个方法里。
final boolean acquireQueued(final Node node, int arg)
boolean failed = true;
try
boolean interrupted = false;
for (;;)
final Node p = node.predecessor();
//如果p是头结点,并且可以获取到锁就出队
if (p == head && tryAcquire(arg))
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
finally
if (failed)
cancelAcquire(node);
假设当前的队列状态是这样的(优化下图结构),那么进入acquireQueued方法后,有个for循环,为了防止上下文切换的开销,李二狗在队列中获取Node的第一个节点 node.predecessor(); 因为进入队列的线程不是立刻进行阻塞,阻塞前要尝试获取一次锁:
1、能获取到就出队;
2、不能获取到就阻塞住;
a) 首先第一轮循环,修改head的状态,修改成Signal标记可以被唤醒。
b) 第二轮循环,阻塞线程parkAndCheckInterrupt(),判断线程是否可中断。
if (p == head && tryAcquire(arg)) //如果p是头结点,并且可以获取到锁就出队;很明显图中已经表明了setHead()要做的事情,原先的头结点就已经没有了引用,后面会被gc回收掉,然后直接return掉。
当下一轮for进来,走到现在的头结点的时候,要判断shouldParkAfterFailedAcquire(前驱结点,当前结点),实际上这个方法内部就是对状态进行的判断:实际上node结点是头结点的下一个结点,所于是要判断node的前驱结点,根据waitStatus状态位是什么,再选择是否出队。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
// 先拿到当前结点的前驱结点,如果这个状态是Signal,则表示可以被唤醒
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//如果是其他状态
if (ws > 0)
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do
node.prev = pred = pred.prev;
while (pred.waitStatus > 0);
pred.next = node;
else
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
return false;
假设waitStatus的状态是Init=0,那么就走到下面的else,shouldParkAfterFailedAcquire(),把前驱结点head的状态从0改成Signal = -1表示可以被唤醒(要想唤醒排队的第一个线程T1,持有锁的线程T0在释放锁的时候,需要判断T0结点的waitStatus是否!=0,如果!=0成立,会再把waitStatus从-1改成0;从而T1再被唤醒去抢锁,在非公平状态下可能会再失败,此时可能T3持有了锁。)。实际上前驱结点是为了后面结点服务的。这里可能文字讲的不是很清楚,读者多读几次代码就知道了,这里是很有难度的,也是并发里的精髓。
剩下的边边角角我这里就不多说了,整体下来,李二狗写的代码可读性我觉得是比较不好的,和Spring相比还是有点生涩,但是思路我们要get到,毕竟面试喜欢聊这些。
以上是关于(抽象同步器Lock详解)的主要内容,如果未能解决你的问题,请参考以下文章
Lock接口AbstractQueuedSynchronizer队列同步器重入锁读写锁