JAVA并发编程——源码原理全面剖析与重点梳理

Posted 南祈鸣

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA并发编程——源码原理全面剖析与重点梳理相关的知识,希望对你有一定的参考价值。

帧与帧栈

  • 每个线程有一块栈内存
  • 每个栈由多个栈帧组成,每个栈帧对应每次方法调用占用的内存
  • 每个线程只能有一个活动栈帧,对应当前正在执行的那个方法

线程上下文切换

  • 线程的CPU时间片用完
  • 垃圾回收
  • 有更高优先级的线程需要运行
  • 线程自己调用了sleep、yield、wait、join、park、synchronized、lock等方法

其中程序计数器的作用

操作系统保存当前线程的状态,记住下一条jvm指令的执行地址,用于恢复线程的状态

PS:状态:程序计数器、栈中每个帧栈的信息

线程状态之五种状态

操作系统层面划分

  1. 初始状态,仅仅是在语言层面上创建了线程对象,即Thead thread = new Thead();,还未与操作系统线程关联

  2. 可运行状态,也称就绪状态,指该线程已经被创建,与操作系统相关联,等待cpu给它分配时间片就可运行

  3. 运行状态,指线程获取了CPU时间片,正在运行;当CPU时间片用完,线程会转换至【可运行状态】,等待 CPU再次分配时间片,会导致我们前面讲到的上下文切换

  4. 阻塞状态

    如果调用了阻塞API,如BIO读写文件,那么线程实际上不会用到CPU,不会分配CPU时间片,会导致上下文切换,进入【阻塞状态】

等待BIO操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】 与【可运行状态】的区别是,只要操作系统一直不唤醒线程,调度器就一直不会考虑调度它们,CPU就一直不会分配时间片 5. 终止状态,表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态

线程状态之六种状态

Java API层面

  1. NEW 跟五种状态里的初始状态是一个意思
  2. RUNNABLE 是当调用了 start() 方法之后的状态,注意,Java API 层面的 RUNNABLE 状态涵盖了操作系统层面的【可运行状态】、【运行状态】和【io阻塞状态】(由于 BIO 导致的线程阻塞,在 Java 里无法区分,仍然认为是可运行)
  3. BLOCKED , WAITING , TIMED_WAITING 都是 Java API 层面对【阻塞状态】的细分

Monitor概念

Java对象头

以 32 位虚拟机为例,普通对象的对象头结构如下,其中的Klass Word为指针,指向对应的Class对象

 

其中Mark Word结构为:

hashcode:对象自己的哈希码 age:对象在垃圾回收中的分代年龄 biased_lock:是不是偏向锁,加锁状态 和monitor关联成功的话,状态由01变为10

Monitor(锁)

又叫做监视器管程 每个java对象都可以关联一个Monitor,如果使用synchronized给对象上锁(重量级),该对象头的Mark Word中就被设置为指向Monitor对象的指针

 

刚开始时Monitor中的Owner为null当Thread-2 执行synchronized(obj)代码时就会将Monitor的所有者Owner 设置为 Thread-2,上锁成功,Monitor中同一时刻只能有一个Owner当Thread-2 占据锁时,如果线程Thread-3,Thread-4也来执行synchronized(obj)代码,就会进入EntryList中变成BLOCKED状态。Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争时是非公平的。图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲wait-notify 时会分析

synchronized原理进阶

轻量级锁

轻量级锁的使用场景是:如果一个对象虽然有多个线程要对它进行加锁,但是加锁的时间是错开的(也就是没有人可以竞争的),那么可以使用轻量级锁来进行优化。

轻量级锁对使用者是透明的,即语法仍然是synchronized,假设有两个方法同步块,利用同一个对象加锁

static final Object obj = new Object();
public static void method1() 
     synchronized( obj ) 
         // 同步块 A
         method2();
     

public static void method2() 
     synchronized( obj ) 
         // 同步块 B
     

  1. 每次指向到synchronized代码块时,都会创建锁记录(Lock Record)对象,每个线程都会包括一个锁记录的结构,锁记录内部可以储存对象的Mark Word和对象引用reference

     

  2. 让锁记录中的Object reference指向对象,并且尝试用cas(compare and sweep)替换Object对象的Mark Word ,将Mark Word 的值存入锁记录中

     

  3. 如果cas替换成功,那么对象的对象头储存的就是锁记录的地址和状态01,如下所示

     

  4. 如果cas失败,有两种情况

  • 如果是其它线程已经持有了该Object的轻量级锁,那么表示有竞争,将进入锁膨胀阶段

  • 如果是其它线程已经持有了该Object的轻量级锁,那么表示有竞争,将进入锁膨胀阶段

5.当线程退出synchronized代码块的时候,如果获取的是取值为 null 的锁记录 ,表示有重入,这时重置锁记录,表示重入计数减一

 

6.当线程退出synchronized代码块的时候,如果获取的锁记录取值不为 null,那么使用cas将Mark Word的值恢复给对象

  • 成功则解锁成功
  • 失败,则说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程

锁膨胀

如果在尝试加轻量级锁的过程中,cas操作无法成功,这是有一种情况就是其它线程已经为这个对象加上了轻量级锁,这是就要进行锁膨胀,将轻量级锁变成重量级锁。

1.当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁

 

2.这时 Thread-1 加轻量级锁失败,进入锁膨胀流程

  • 即为对象申请Monitor锁,让Object指向重量级锁地址,然后自己进入Monitor 的EntryList 变成BLOCKED状态

 

3.当Thread-0 推出synchronized同步块时,使用cas将Mark Word的值恢复给对象头,失败,那么会进入重量级锁的解锁过程,即按照Monitor的地址找到Monitor对象,将Owner设置为null,唤醒EntryList 中的Thread-1线程

自旋优化

重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即在自旋的时候持锁的线程释放了锁),那么当前线程就可以不用进行上下文切换就获得了锁

  • 其实就是线程发现有锁,然后在那边等一会,不发生上下文切换
  • 自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。Java 7 之后不能控制是否开启自旋功能

偏向锁

第一次使用CAS时将对象的Mark Word头设置为入锁线程ID,之后这个入锁线程再进行重入锁时,发现线程ID是自己的,那么就不用再进行CAS了

 

对象的创建过程

  1. 如果开启了偏向锁(默认是开启的),那么对象刚创建之后,Mark Word 最后三位的值101,并且这是它的Thread,epoch,age都是0,在加锁的时候进行设置这些的值
  2. 偏向锁默认是延迟的,不会在程序启动的时候立刻生效,如果想避免延迟,可以添加虚拟机参数来禁用延迟:-XX:BiasedLockingStartupDelay=0来禁用延迟
  3. 注意:处于偏向锁的对象解锁后,线程 id 仍存储于对象头中,从属于该线程
  4. 测试禁用:如果没有开启偏向锁,那么对象创建后最后三位的值为001,这时候它的hashcode,age都为0,hashcode是第一次用到hashcode时才赋值的。在上面测试代码运行时在添加 VM 参数-XX:-UseBiasedLocking禁用偏向锁(禁用偏向锁则优先使用轻量级锁),退出synchronized状态变回001

撤销偏向锁-hashcode方法

测试 hashCode:当调用对象的hashcode方法的时候就会撤销这个对象的偏向锁,因为使用偏向锁时没有位置存hashcode的值了

当一个对象已经计算过一致性哈希码后,它就再也无法进入偏向锁状态了;

而当一个对象当前正处于偏向锁状态,又收到需要计算其一致性哈希码请求时,它的偏向状态会被立即撤销,并且锁会膨胀为重量级锁。

在重量级锁的实现中,对象头指向了重量级锁的位置,代表重量级锁的ObjectMonitor类里有字段可以记录非加锁状态(标志位为“01”)下的Mark Word,其中自然可以存储原来的哈希码

撤销偏向锁-变为轻量锁

首先得满足轻量级锁的使用条件,就是没有线程对同一个对象进行锁竞争

最开始使用的是偏向锁,但是第二个线程尝试获取对象锁时,发现本来对象偏向的是线程一,那么偏向锁就会失效,加的就是轻量级锁

撤销偏向锁-调用wait/notify

会使对象的锁变成重量级锁,因为wait/notify方法之后重量级锁才支持

批量重偏向

如果对象被多个线程访问,但是没有竞争,这时候偏向了线程一的对象又有机会重新偏向线程二,即可以不用升级为轻量级锁

超过20对象对同一个线程如线程一撤销偏向时,那么第20个及以后的对象才可以将撤销对线程一的偏向这个动作变为将第20个及以后的对象偏向线程二

批量撤销

当撤销偏向锁阈值超过40次后,jvm会这样觉得,自己确实偏向错了,根本就不该偏向,于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的

锁消除

如果即时编译器(JIT)发现有的锁锁住的对象没有逃离方法,这锁锁了跟没锁一样,就会优化,去掉这个锁

park&unpark原理

每个线程都有自己的一个 Parker 对象,由三部分组成 _counter, _cond和 _mutex

先调用park再调用upark的过程

1:先调用park

  1. 当前线程调用 Unsafe.park() 方法
  2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁(mutex对象有个等待队列 _cond)
  3. 线程进入 _cond 条件变量阻塞
  4. 设置 _counter = 0

 

2:调用unpark

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 唤醒 _cond 条件变量中的 Thread_0
  3. Thread_0 恢复运行
  4. 设置 _counter 为 0

先调用unpark再调用park的过程

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 当前线程调用 Unsafe.park() 方法
  3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
  4. 设置 _counter 为 0

 

线程状态切换

1:NEW --> RUNNABLE

  • 当调用t.start()方法时,由NEW --> RUNNABLE

2:RUNNABLE <--> WAITING

t 线程用synchronized(obj) 获取了对象锁后

  • 调用obj.wait()方法时,t线程从RUNNABLE --> WAITING

  • 调用obj.notify(),obj.notifyAll(),t.interrupt()时

    竞争锁成功,t 线程从WAITING --> RUNNABLE

竞争锁失败,t 线程从WAITING --> BLOCKED

3:RUNNABLE <--> WAITING

  • 当前线程调用t.join()方法时,当前线程从RUNNABLE --> WAITING
  • 注意是当前线程在t 线程对象的监视器上等待
  • t 线程运行结束,或调用了当前线程的interrupt()时,当前线程从WAITING --> RUNNABLE

4:RUNNABLE <--> WAITING

  • 当前线程调用LockSupport.park()方法会让当前线程从RUNNABLE --> WAITING
  • 调用LockSupport.unpark(目标线程)或调用了线程的interrupt(),会让目标线程从WAITING -->RUNNABLE

5:RUNNABLE <--> TIMED_WAITING

t 线程用synchronized(obj)获取了对象锁后

  • 调用obj.wait(long n)方法时,t 线程从RUNNABLE --> TIMED_WAITING

  • t 线程等待时间超过了 n 毫秒,或调用obj.notify(),obj.notifyAll(),t.interrupt() 时

    竞争锁成功,t 线程从TIMED_WAITING --> RUNNABLE

    竞争锁失败,t 线程从TIMED_WAITING --> BLOCKED

6:RUNNABLE <--> TIMED_WAITING

  • 当前线程调用t.join(long n) 方法时,当前线程从RUNNABLE --> TIMED_WAITING
  • 注意是当前线程在t 线程对象的监视器上等待
  • 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的interrupt() 时,当前线程从TIMED_WAITING --> RUNNABLE

7:RUNNABLE <--> TIMED_WAITING

  • 当前线程调用Thread.sleep(long n),当前线程从RUNNABLE --> TIMED_WAITING
  • 当前线程等待时间超过了 n 毫秒,当前线程从TIMED_WAITING --> RUNNABLE

8:RUNNABLE <--> TIMED_WAITING

  • 当前线程调用LockSupport.parkNanos(long nanos)或LockSupport.parkUntil(long millis) 时 ,当前线程从RUNNABLE --> TIMED_WAITING

  • 调用LockSupport.unpark(目标线程) 或调用了线程的interrupt(),或是等待超时,会让目标线程从TIMED_WAITING--> RUNNABLE

9:RUNNABLE <--> BLOCKED

  • t 线程用synchronized(obj) 获取了对象锁时如果竞争失败,从RUNNABLE --> BLOCKED
  • 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED的线程重新竞争,如果其中 t 线程竞争成功,从BLOCKED --> RUNNABLE,其它失败的线程仍然BLOCKED

10:RUNNABLE <--> TERMINATED

  • 当前线程所有代码运行完毕,进入TERMINATED

活跃性

死锁

一个线程需要同时获取多把锁,这时就容易发生死锁t1 线程获得A对象锁,接下来想获取B对象的锁;t2 线程获得B对象锁,接下来想获取A对象的锁例。

活锁

两个线程互相改变对方的结束条件,最后谁也无法结束

饥饿

一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束

JMM(Java内存模型)

JMM 即 Java Memory Model,它从java层面定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。JMM 体现在以下几个方面

  • 原子性 - 保证指令不会受到线程上下文切换的影响
  • 可见性 - 保证指令不会受 cpu 缓存的影响
  • 有序性 - 保证指令不会受 cpu 指令并行优化的影响

可见性

先来看一个现象,main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止:

public class Test1 
    static boolean run = true;
    public static void main(String[] args) throws InterruptedException 
        Thread t = new Thread(()->
            while(run)
                // ....
//                System.out.println(2323);  如果加上这个代码就会停下来
            
        );
        t.start();
        utils.sleep(1);
        System.out.println(3434);   
        run = false; // 线程t不会如预想的停下来
    

分析:

1.初始状态, t 线程刚开始从主内存读取了 run 的值到工作内存

 

2.因为t1线程频繁地从主存中读取run的值,JIT即时编译器会将run的值缓存至自己工作内存中的高速缓存中,减少对主存中run的访问以提高效率

 

3.一秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量 的值,结果永远是旧值

 

解决方法:

  1. volatile(表示易变关键字的意思),它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存
  2. 使用synchronized关键字也有相同的效果!在Java内存模型中,synchronized规定,线程在加锁时, 先清空工作内存→在主内存中拷贝最新变量的副本到工作内存 →执行完代码→将更改后的共享变量的值刷新到主内存中→释放互斥锁

PS:synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是 synchronized 是属于重量级操作,性能相对更低。

System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到 对 run 变量的修改了,想一想为什么?

因为println方法里面有synchronized修饰。

有序性

指令重排

int num = 0;

// volatile 修饰的变量,可以禁用指令重排 volatile boolean ready = false; 可以防止变量之前的代码被重排序
boolean ready = false; 
// 线程1 执行此方法
public void actor1(I_Result r) 
 if(ready) 
     r.r1 = num + num;
  
 else 
     r.r1 = 1;
 

// 线程2 执行此方法
public void actor2(I_Result r) 
 num = 2;
 ready = true;

结果可能是 0 。这种情况下是:线程2 执行 ready = true,切换到线程1,进入 if 分支,相加为 0,再切回线程2 执行 num = 2。

这种现象叫做指令重排,是 JIT 编译器在运行时的一些优化,这个现象需要通过大量测试才能复现。

重排序也需要遵守一定规则:

  1. 重排序操作不会对存在数据依赖关系的操作进行重排序。比如:a=1;b=a; 这个指令序列,由于第二个操作依赖于第一个操作,所以在编译时和处理器运行时这两个操作不会被重排序。
  2. 重排序是为了优化性能,但是不管怎么重排序,单线程下程序的执行结果不能被改变。比如:a=1;b=2;c=a+b这三个操作,第一步(a=1)和第二步(b=2)由于不存在数据依赖关系,所以可能会发生重排序,但是c=a+b这个操作是不会被重排序的,因为需要保证最终的结果一定是c=a+b=3。

重排序在单线程模式下是一定会保证最终结果的正确性,但是在多线程环境下,问题就出来了。

解决方法:volatile 修饰的变量,可以禁用指令重排

volatile原理

volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)

  • 对 volatile 变量的写指令后会加入写屏障
  • 对 volatile 变量的读指令前会加入读屏障

保证有序性

1.写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中

写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后

public void actor2(I_Result r) 
     num = 2;
     ready = true; // ready是被volatile修饰的 ,赋值带写屏障
     // 写屏障

2.而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据

读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前

public void actor1(I_Result r) 
 // 读屏障
 //  ready是被volatile修饰的 ,读取值带读屏障
 if(ready) 
     r.r1 = num + num;
  else 
     r.r1 = 1;
 

还是那句话,不能解决指令交错:

 

  • 写屏障仅仅是保证之后的读能够读到最新的结果,但不能保证其它线程的读跑到它前面去
  • 而有序性的保证也只是保证了本线程内相关代码不被重排序

CAS原理(无锁)

AtomicInteger的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?

    @Override
    public void withdraw(Integer amount) 
        // 核心代码
        // 需要不断尝试,直到成功为止
        while (true)
            // 比如拿到了旧值 1000
            int pre = getBalance();
            // 在这个基础上 1000-10 = 990
            int next = pre - amount;
            /*
             compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值
             - 不一致了,next 作废,返回 false 表示失败
             比如,别的线程已经做了减法,当前值已经被减成了 990
             那么本线程的这次 990 就作废了,进入 while 下次循环重试
             - 一致,以 next 设置为新值,返回 true 表示成功
             */
            if (atomicInteger.compareAndSet(pre,next))
                break;
            
        
    

其中的关键是 compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。

 

CAS与volatile

在上面代码中的AtomicInteger,保存值的value属性使用了volatile 。获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。

CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果

为什么无锁效率高

  1. 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。打个比喻:线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速... 恢复到高速运行,代价比较大
  2. 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。

CAS的特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。

  • CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗
  • synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
  • CAS 体现的是无锁并发、无阻塞并发
  1. 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
  2. 但如果竞争激烈(写操作多),可以想到重试必然频繁发生,反而效率会受影响

ABA问题

以为你是A,所以把你改成B,但其实他早就已经被别的线程从A改成B又改回A了

并发工具类

java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类:

  1. 使用原子的方式更新基本类型

    AtomicInteger:整型原子类

AtomicLong:长整型原子类 AtomicBoolean :布尔型原子类 2. 原子引用 3. 原子数组 4. 字段更新器 5. 原子累加器

原子整数

    public static void main(String[] args) 
        AtomicInteger i = new AtomicInteger(0);
        // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
        System.out.println(i.getAndIncrement());
        // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
        System.out.println(i.incrementAndGet());
        // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
        System.out.println(i.decrementAndGet());
        // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
        System.out.println(i.getAndDecrement());
        // 获取并加值(i = 0, 结果 i = 5, 返回 0)
        System.out.println(i.getAndAdd(5));
        // 加值并获取(i = 5, 结果 i = 0, 返回 0)
        System.out.println(i.addAndGet(-5));
        // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(i.getAndUpdate(p -> p - 2));
        // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(i.updateAndGet(p -> p + 2));
        // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
        // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
        System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
        // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0)
        // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
        System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
    
    

原子引用

为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。

基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。

  • AtomicReference:引用类型原子类

  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。

      AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A ->C
    
      通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
    
  • AtomicMarkableReference :原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来

      并不关心引用变量更改了几次,只是单纯的关心是否更改过
    

原子数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

字段更新器

  • AtomicReferenceFieldUpdater // 域 字段
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater

利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常

原子累加器

LongAdder累加器的使用,性能会提升

性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]... 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

源码之 LongAdder

LongAdder 类有几个关键域

// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;

cas锁

使用cas实现一个自旋锁

// 不要用于生产实践!!!
public class LockCas 
    private AtomicInteger state = new AtomicInteger(0);
    public void lock() 
        while (true) 
            if (state.compareAndSet(0, 1)) 
                break;
            
        
    
    public void unlock() 
        log.debug("unlock...");
        state.set(0);
    


原理之伪共享

其中 Cell 即为累加单元

// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell 
    volatile long value;
    Cell(long x)  value = x; 
    // 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
    final boolean cas(long prev, long next) 
        return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
    
    // 省略不重要代码

下面讨论@sun.misc.Contended注解的重要意义

得从缓存说起,缓存与内存的速度比较

 

因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。缓存离cpu越近速度越快。 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long),缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中,CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效。

 

因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因 此缓存行可以存下 2 个的 Cell 对象。这样问题来了: Core-0 要修改 Cell[0],Core-1 要修改 Cell[1]

无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加 Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效

问题解决:

@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效


add方法源码解读

  public void add(long x) 
        // as 为累加单元数组
        // b 为基础值
        // x 为累加值
        Cell[] as; long b, v; int m; Cell a;
        // 进入 if 的两个条件
        // 1. as 有值, 表示已经发生过竞争, 进入 if
        // 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 if
        if ((as = cells) != null || !casBase(b = base, b + x)) 
            // uncontended 表示 cell 没有竞争
            boolean uncontended = true;
            if (
                // as 还没有创建
                    as == null || (m = as.length - 1) < 0 ||
                            // 当前线程对应的 cell 还没有被创建,a为当线程的cell
                            (a = as[getProbe() & m]) == null ||
       // 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )
                            !(uncontended = a.cas(v = a.value, v + x))
            ) 
                // 进入 cell 数组创建、cell 创建的流程
                longAccumulate(x, null, uncontended);
            
        
    


LongAccumulate逻辑分析

第一个if的第一个条件加上else if 中代码的逻辑,这是cells未创建时的处理逻辑

 

 若线程对应的cell还没创建好,则执行的是第一个红框里的代码,逻辑如下

若线程对应的cell已经创建好,则执行的是第二个红框里的代码,逻辑如下

sum方法分析

获取最终结果通过 sum 方法,将各个累加单元的值加起来就得到了总的结果。

    public long sum() 
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) 
            for (int i = 0; i < as.length; ++i) 
                if ((a = as[i]) != null)
                    sum += a.value;
            
        
        return sum;
    

Unsafe对象

Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得。

Unsafe unsafe = UnsafeAccessor.getUnsafe();
Field id = Student.class.getDeclaredField("id"); 
Field name = Student.class.getDeclaredField("name"); 
// 获得成员变量的偏移量
long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id); 
long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);
Student student = new Student(); 
// 使用 cas 方法替换成员变量的值
UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20);  // 返回 true 
UnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true
System.out.println(student);

线程池

线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

ThreadPoolExecutor

Executor 框架结构

1:任务(Runnable /Callable) 执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

2:任务的执行(Executor) 包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。

这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor 这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。

3:异步计算的结果(Future) Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。

当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。(调用 submit() 方法时会返回一个 FutureTask 对象)

使用步骤:

1:主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。

2:把创建完成的实现 Runnable/Callable接口的 对象直接交给 ExecutorService 执行: ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable task))。

3:如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象(我们刚刚也提到过了执行 execute()方法和 submit()方法的区别,submit()会返回一个 FutureTask 对象)。

4:最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行

AQS原理

概述:全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

特点:

  1. 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁

    getState - 获取 state 状态

    setState - 设置 state 状态

    compareAndSetState - cas 机制设置 state 状态

    独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源

  2. 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList

  3. 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

子类主要实现这样一些方法(默认抛出 UnsupportedOperationException):

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

ReentrantLock 原理

可以看到ReentrantLock提供了两个同步器,实现公平锁和非公平锁,默认是非公平锁

 

非公平锁实现原理

加锁解锁流程,先从构造器开始看,默认为非公平锁实现

public ReentrantLock() 
 sync = new NonfairSync();

NonfairSync 继承自 AQS

没有竞争时

第一个竞争出现时,查看源码的NonfairSync的lock方法

Thread-1 执行了:

  1. lock方法中CAS 尝试将 state 由 0 改为 1,结果失败

  2. lock方法中进一步调用acquire方法,进入 tryAcquire 逻辑,这里我们认为这时 state 已经是1,结果仍然失败

  3. 接下来进入 acquire方法的addWaiter 逻辑,构造 Node 队列

    图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态

    Node 的创建是懒惰的

    其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程

 

 

当前线程进入 acquire方法的 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞

  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,我们这里设置这时 state 仍为 1,失败

  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false

  4. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败

  5. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true

  6. 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示已经阻塞)

  7. 再次有多个线程经历上述过程竞争失败,变成这个样子

     

  8. Thread-0 调用unlock方法里的release方法释放锁,进入tryRelease(使用ctrl+alt+b查看tryRelease方法的具体ReentrantLock实现) 流程,如果成功,设置 exclusiveOwnerThread 为 null,state = 0

  9. unlock方法里的release方法方法中,如果当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程: unparkSuccessor中会找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1 回到 Thread-1 的 acquireQueued 流程

  10. 如果加锁成功(没有竞争),会设置 (acquireQueued 方法中)

    exclusiveOwnerThread 为 Thread-1,state = 1

    head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread

    原本的 head 因为从链表断开,而可被垃圾回收

  11. 如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了

     

    如果不巧又被 Thread-4 占了先

    Thread-4 被设置为 exclusiveOwnerThread,state = 1

    Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞

    可重入原理

    static final class NonfairSync extends Sync 
        // ...
    
        // Sync 继承过来的方法, 方便阅读, 放在此处
        final boolean nonfairTryAcquire(int acquires) 
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) 
                if (compareAndSetState(0, acquires)) 
                    setExclusiveOwnerThread(current);
                    return true;
                
            
            // 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
            else if (current == getExclusiveOwnerThread()) 
                // state++
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            
            return false;
        
    
        // Sync 继承过来的方法, 方便阅读, 放在此处
        protected final boolean tryRelease(int releases) 
            // state--
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 支持锁重入, 只有 state 减为 0, 才释放成功
            if (c == 0) 
                free = true;
                setExclusiveOwnerThread(null);
            
            setState(c);
            return free;
        
    
    

    可打断原理

    不可打断模式:在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

    // Sync 继承自 AQS
    static final class NonfairSync extends Sync 
        // ...
    
        private final boolean parkAndCheckInterrupt() 
            // 如果打断标记已经是 true, 则 park 会失效
            LockSupport.park(this);
            // interrupted 会清除打断标记
            return Thread.interrupted();
        
    
        final boolean acquireQueued(final Node node, int arg) 
            boolean failed = true;
            try 
                boolean interrupted = false;
                for (;;) 
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) 
                        setHead(node);
                        p.next = null;
                        failed = false;
                        // 还是需要获得锁后, 才能返回打断状态
                        return interrupted;
                    
                    if (
                            shouldParkAfterFailedAcquire(p, node) &&
                                    parkAndCheckInterrupt()
                    ) 
                        // 如果是因为 interrupt 被唤醒, 返回打断状态为 true
                        interrupted = true;
                    
                
             finally 
                if (failed)
                    cancelAcquire(node);
            
        
    
        public final void acquire(int arg) 
            if (
                    !tryAcquire(arg) &&
                            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
            ) 
                // 如果打断状态为 true
                selfInterrupt();
            
        
    
        static void selfInterrupt() 
            // 重新产生一次中断,这时候线程是如果正常运行的状态,那么不是出于sleep等状态,interrupt方法就不会报错
            Thread.currentThread().interrupt();
        
    
    
    

    可打断模式(区别在于循环的出口多了一个抛异常的出口)

    static final class NonfairSync extends Sync 
        public final void acquireInterruptibly(int arg) throws InterruptedException 
            if (Thread.interrupted())
                throw new InterruptedException();
            // 如果没有获得到锁, 进入 ㈠
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        
    
        // ㈠ 可打断的获取锁流程
        private void doAcquireInterruptibly(int arg) throws InterruptedException 
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try 
                for (;;) 
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) 
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt()) 
                        // 在 park 过程中如果被 interrupt 会进入此
                        // 这时候抛出异常, 而不会再次进入 for (;;)
                        throw new InterruptedException();
                    
                
             finally 
                if (failed)
                    cancelAcquire(node);
            
        
    
    

    公平锁实现原理

    static final class FairSync extends Sync 
        private static final long serialVersionUID = -3000897897090466540L;
        final void lock() 
            acquire(1);
        
    
        // AQS 继承过来的方法, 方便阅读, 放在此处
        public final void acquire(int arg) 
            if (
                    !tryAcquire(arg) &&
                            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
            ) 
                selfInterrupt();
            
        
        // 与非公平锁主要区别在于 tryAcquire 方法的实现
        protected final boolean tryAcquire(int acquires) 
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) 
                // 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) 
                    setExclusiveOwnerThread(current);
                    return true;
                
            
            else if (current == getExclusiveOwnerThread()) 
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            
            return false;
        
    
        // ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
        public final boolean hasQueuedPredecessors() 
            Node t = tail;
            Node h = head;
            Node s;
            // h != t 时表示队列中有 Node
            return h != t &&
                    (
                            // (s = h.next) == null 表示队列中还有没有老二
                            (s = h.next) == null || // 或者队列中老二线程不是此线程
                                    s.thread != Thread.currentThread()
                    );
        
    
    

    条件变量实现原理

    每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject

    await 流程

开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程 创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部

 

 接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁

 

 unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功

 park 阻塞 Thread-0

 signal流程

假设 Thread-1 要来唤醒 Thread-0

 进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node

 执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的waitStatus 改为 -1

 

Thread-1 释放锁,进入 unlock 流程

读写锁

ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。读-写,写-写都是相互互斥的!

提供一个数据容器类内部分别使用读锁保护数据的read()方法,写锁保护数据的write()方法

注意事项:

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
           r.lock();
            try 
                // ...
                w.lock();
                try 
                    // ...
                 finally
                    w.unlock();
                
             finally
                r.unlock();
            
    
  • 重入时降级支持:即持有写锁的情况下去获取读锁
     class CachedData 
        Object data;
        // 是否有效,如果失效,需要重新计算 data
        volatile boolean cacheValid;
        final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        void processCachedData() 
            rwl.readLock().lock();
            if (!cacheValid) 
                // 获取写锁前必须释放读锁
                rwl.readLock().unlock();
                rwl.writeLock().lock();
                try 
                    // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
                    if (!cacheValid) 
                        data = ...
                        cacheValid = true;
                    
                    // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存,同时保证自己在读的时候不会被其他线程写
                    rwl.readLock().lock();
                 finally 
    
                    rwl.writeLock().unlock();
                
            
            // 自己用完数据, 释放读锁
            try 
                use(data);
             finally 
                rwl.readLock().unlock();
            
        
    
    

Semaphore

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一刚开始,permits(state)为 3,这时 5 个线程来获取资源

 假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞

 这时 Thread-4 释放了 permits,状态如下

 接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

 

CountdownLatch

CountDownLatch允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。在 Java 并发中,countdownlatch 的概念是一个常见的面试题,所以一定要确保你很好的理解了它。

CountDownLatch是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用countDown方法时,其实使用了tryReleaseShared方法以CAS的操作来减少state,直至state为0就代表所有的线程都调用了countDown方法。当调用await方法的时候,如果state不为0,就代表仍然有线程没有调用countDown方法,那么就把已经调用过countDown的线程都放入阻塞队列Park,并自旋CAS判断state = = 0,直至最后一个线程调用了countDown,使得state == 0,于是阻塞的线程便判断成功,全部往下执行。

用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

JDK7 HashMap 并发死链

死链复现

运行代码,程序在预料的断点位置停了下来,输出

长度为16时,桶下标为1的key
1
16
35
50
长度为32时,桶下标为1的key
1
35
扩容前大小[main]:12

这时可以在 Variables 面板观察到 e 和 next 变量,使用 view as -> Object 查看节点状态

e       (1)->(35)->(16)->null
next    (35)->(16)->null

在 Threads 面板选中 Thread-1 恢复运行,可以看到控制台输出新的内容如下,Thread-1 扩容已完成

newTable[1]  (35)->(1)->null

这时 Thread-0 还停在 594 处, Variables 面板变量的状态已经变化为

e       (1)->null
next    (35)->(1)->null

为什么呢,因为 Thread-1 扩容时链表也是后加入的元素放入链表头,因此链表就倒过来了,但 Thread-1 虽然结果正确,但它结束后 Thread-0 还要继续运行 接下来就可以单步调试(F8)观察死链的产生了 下一轮循环到 594,将 e 搬迁到 newTable 链表头

newTable[1]     (1)->null
e              (35)->(1)->null
next           (1)->nul

下一轮循环到 594,将 e 搬迁到 newTable 链表头

newTable[1]     (35)->(1)->null
e                (1)->null
next            null

再看看源码

e.next = newTable[1]; 
// 这时 e  (1,35)
// 而 newTable[1] (35,1)->(1,35) 因为是同一个对象 

newTable[1] = e;
// 再尝试将 e 作为链表头, 死链已成 

e =

Java Review - 并发编程_读写锁ReentrantReadWriteLock的原理&源码剖析

文章目录


ReentrantLock VS ReentrantReadWriteLock

解决线程安全问题使用ReentrantLock就可以,但是ReentrantLock是独占锁,某时只有一个线程可以获取该锁,而实际中会有写少读多的场景,显然ReentrantLock满足不了这个需求,所以ReentrantReadWriteLock应运而生。ReentrantReadWriteLock采用读写分离的策略,允许多个线程可以同时获取读锁。


类图结构

为了了解ReentrantReadWriteLock的内部构造,我们先看下它的类图结构


读写锁的内部维护了一个ReadLock和一个WriteLock,它们依赖Sync实现具体功能。

Sync继承自AQS,并且也提供了公平和非公平的实现。


非公平的读写锁实现

下面只介绍非公平的读写锁实现。

我们知道AQS中只维护了一个state状态,而ReentrantReadWriteLock则需要维护读状态和写状态,一个state怎么表示写和读两种状态呢?

ReentrantReadWriteLock巧妙地使用state的高16位表示读状态,也就是获取到读锁的次数;使用低16位表示获取到写锁的线程的可重入次数。

        /*
         * Read vs write count extraction constants and functions.
         * Lock state is logically divided into two unsigned shorts:
         * The lower one representing the exclusive (writer) lock hold count,
         * and the upper the shared (reader) hold count.
         */

        static final int SHARED_SHIFT   = 16;
        //共享锁(读锁)状态单位值65536
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        //共享锁线程最大个数65535
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
	    //排它锁(写锁)掩码,二进制,15个1
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

		
		// 返回读锁线程数
	    /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)     return c >>> SHARED_SHIFT; 
		
		// 返回写锁线程数
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c)  return c & EXCLUSIVE_MASK; 

Sync中的

  • firstReader用来记录第一个获取到读锁的线程
  • firstReaderHoldCount则记录第一个获取到读锁的线程获取读锁的可重入次数。
  • cachedHoldCounter用来记录最后一个获取读锁的线程获取读锁的可重入次数

        /**
         * A counter for per-thread read hold counts.
         * Maintained as a ThreadLocal; cached in cachedHoldCounter
         */
        static final class HoldCounter 
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        

readHolds是ThreadLocal变量,用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数。

ThreadLocalHoldCounter继承了ThreadLocal,因而initialValue方法返回一个HoldCounter对象。

  /**
         * ThreadLocal subclass. Easiest to explicitly define for sake
         * of deserialization mechanics.
         */
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> 
            public HoldCounter initialValue() 
                return new HoldCounter();
            
        


写锁的获取与释放

在ReentrantReadWriteLock中写锁使用WriteLock来实现。

void lock()

写锁是个独占锁,某时只有一个线程可以获取该锁。

  • 如果当前没有线程获取到读锁和写锁,则当前线程可以获取到写锁然后返回。
  • 如果当前已经有线程获取到读锁和写锁,则当前请求写锁的线程会被阻塞挂起。
  • 另外,写锁是可重入锁,如果当前线程已经获取了该锁,再次获取只是简单地把可重入次数加1后直接返回。

     public void lock() 
            sync.acquire(1);
        

   public final void acquire(int arg) 
   	// sync重写的tryAcquire方法
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    

如以上代码所示,在 lock()内部调用了AQS的acquire方法,其中tryAcquire是ReentrantReadWriteLock内部的sync类重写的,代码如下。

   protected final boolean tryAcquire(int acquires) 
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
			// 1  c!=0 说明读锁或者写锁已经被其他线程获取	
            if (c != 0) 
                // (Note: if c != 0 and w == 0 then shared count != 0)
                // 2 w = 0 说明有线程已经获取了写锁,w!=0 且 当前线程不是写锁的拥有者,返回false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                 // 3 说明当前线程已经获取到了写锁,判断可重入次数
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                // 设置可重入次数(1	)
                setState(c + acquires);
                return true;
            
			// 5 第一个写线程获取写锁 
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        
  • 代码(1)中,如果当前AQS状态值不为0则说明当前已经有线程获取到了读锁或者写锁。

  • 代码(2)中,如果w==0说明状态值的低16位为0,而AQS状态值不为0,则说明高16位不为0,这暗示已经有线程获取了读锁,所以直接返回false 。 如果w!=0则说明当前已经有线程获取了该写锁,再看当前线程是不是该锁的持有者,如果不是则返回false。

  • 执行到代码(3)说明当前线程之前已经获取到了该锁,所以判断该线程的可重入次数是不是超过了最大值,是则抛出异常,否则执行代码 (4)增加当前线程的可重入次数,然后返回true.

  • 如果AQS的状态值等于0则说明目前没有线程获取到读锁和写锁,所以执行代码(5)。其中,对于writerShouldBlock方法,

非公平锁的实现为

  final boolean writerShouldBlock() 
            return false; // writers can always barge
        

如果代码对于非公平锁来说总是返回false,则说明代码(5)抢占式执行CAS尝试获取写锁,获取成功则设置当前锁的持有者为当前线程并返回true,否则返回false。

公平锁的实现为

 final boolean writerShouldBlock() 
            return hasQueuedPredecessors();
        

这里还是使用hasQueuedPredecessors来判断当前线程节点是否有前驱节点,如果有则当前线程放弃获取写锁的权限,直接返回false。


void lockInterruptibly()

类似于lock()方法,它的不同之处在于,它会对中断进行响应,也就是当其他线程调用了该线程的interrupt()方法中断了当前线程时,当前线程会抛出异常InterruptedException异常。

   public void lockInterruptibly() throws InterruptedException 
            sync.acquireInterruptibly(1);
        

boolean tryLock()

尝试获取写锁,如果当前没有其他线程持有写锁或者读锁,则当前线程获取写锁会成功,然后返回true。 如果当前已经有其他线程持有写锁或者读锁则该方法直接返回false,且当前线程并不会被阻塞。 如果当前线程已经持有了该写锁则简单增加AQS的状态值后直接返回true。

   public boolean tryLock( ) 
            return sync.tryWriteLock();
        
    /**
         * Performs tryLock for write, enabling barging in both modes.
         * This is identical in effect to tryAcquire except for lack
         * of calls to writerShouldBlock.
         */
        final boolean tryWriteLock() 
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) 
                int w = exclusiveCount(c);
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        

如上代码与tryAcquire方法类似,不同在于这里使用的是非公平策略。


boolean tryLock(long timeout, TimeUnit unit)

与tryAcquire()的不同之处在于,多了超时时间参数,如果尝试获取写锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果还是没有获取到写锁则返回false。另外,该方法会对中断进行响应,也就是当其他线程调用了该线程的interrupt()方法中断了当前线程时,当前线程会抛出InterruptedException异常。

   public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException 
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        

void unlock()

尝试释放锁,如果当前线程持有该锁,调用该方法会让该线程对该线程持有的AQS状态值减1,如果减去1后当前状态值为0则当前线程会释放该锁,否则仅仅减1而已。如果当前线程没有持有该锁而调用了该方法则会抛出IllegalMonitorStateException异常

   public void unlock() 
            sync.release(1);
        
    public final boolean release(int arg) 
    	// 调用ReentrantReadWriteLock#Sync重写的tryRelease
        if (tryRelease(arg)) 
        	// 激活阻塞队列里面的一个线程
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        
        return false;
    
   protected final boolean tryRelease(int releases) 
   			// 6  看是否是拥有写锁的线程调用的unLock
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 7 获取可重入值,没有考虑高16位,因为获取写锁时读锁的值肯定为0
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            // 如果可重入锁值为0则释放锁,否则只是简单的更新状态值 
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        
  • tryRelease首先通过isHeldExclusively判断是否当前线程是该写锁的持有者,如果不是则抛出异常

  • 否则执行代码(7),这说明当前线程持有写锁,持有写锁说明状态值的高16位为0,所以这里nextc值就是当前线程写锁的剩余可重入次数。

  • 代码(8)判断当前可重入次数是否为0,如果free为true则说明可重入次数为0,所以当前线程会释放写锁,将当前锁的持有者设置为null。如果free为false则简单地更新可重入次数。

读锁的获取与释放

ReentrantReadWriteLock中的读锁是使用ReadLock来实现的。

void lock()

获取读锁,如果当前没有其他线程持有写锁,则当前线程可以获取读锁,AQS的状态值state的高16位的值会增加1,然后方法返回。否则如果其他一个线程持有写锁,则当前线程会被阻塞。

      /**
         * Acquires the read lock.
         *
         * <p>Acquires the read lock if the write lock is not held by
         * another thread and returns immediately.
         *
         * <p>If the write lock is held by another thread then
         * the current thread becomes disabled for thread scheduling
         * purposes and lies dormant until the read lock has been acquired.
         */
        public void lock() 
            sync.acquireShared(1);
        
  public final void acquireShared(int arg) 
  		// 调用ReentrantReadWriteLock中syn的tryAcquireShared
        if (tryAcquireShared(arg) < 0)
        	// 调用AQS的doAcquireShared
            doAcquireShared(arg);
    

  protected final int tryAcquireShared(int unused) 
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            // 1 获取当前状态值
            int c = getState();
            // 2 判断是否写锁被占用
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 3 获取读锁计数
            int r = sharedCount(c);
            // 4 尝试获取锁,多个读线程只有一个会成功,不成功的进入fullTryAcquireShared进行重试
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) 
                // 5 第一个线程获取到锁
                if (r == 0) 
                    firstReader = current;
                    firstReaderHoldCount = 1;
                // 6 如果当前线程是第一个获取读锁的线程
                 else if (firstReader == current) 
                    firstReaderHoldCount++;
                 else 
                	// 7 记录最后一个获取读锁的线程或记录其他线程读锁的可重入次数
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                
                return 1;
            
            // 8 类似tryAcquireShared,但是自旋获取
            return fullTryAcquireShared(current);
        
  • 首先获取了当前AQS的状态值
  • 然后代码(2)查看是否有其他线程获取到了写锁,如果是则直接返回-1,而后调用AQS的doAcquireShared方法把当前线程放入AQS阻塞队列。

如果当前要获取读锁的线程已经持有了写锁,则也可以获取读锁。但是需要注意,当一个线程先获取了写锁,然后获取了读锁处理事情完毕后,要记得把读锁和写锁都释放掉,不能只释放写锁。

  • 否则执行代码(3),得到获取到的读锁的个数,到这里说明目前没有线程获取到写锁,但是可能有线程持有读锁,然后执行代码(4)

其中非公平锁的readerShouldBlock实现代码如下

 final boolean readerShouldBlock() 
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        
    
  final boolean apparentlyFirstQueuedIsExclusive() 
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    

如上代码的作用是,如果队列里面存在一个元素,则判断第一个元素是不是正在尝试获取写锁,如果不是,则当前线程判断当前获取读锁的线程是否达到了最大值。最后执行CAS操作将AQS状态值的高16位值增加1。

  • 代码(5)(6)记录第一个获取读锁的线程并统计该线程获取读锁的可重入数。

  • 代码(7)使用cachedHoldCounter记录最后一个获取到读锁的线程和该线程获取读锁的可重入数,readHolds记录了当前线程获取读锁的可重入数。

  • 如果readerShouldBlock返回true则说明有线程正在获取写锁,所以执行代码(8)。

  • fullTryAcquireShared的代码与tryAcquireShared类似,它们的不同之处在于,前者通过循环自旋获取。

 final int fullTryAcquireShared(Thread current) 
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) 
                int c = getState();
                if (exclusiveCount(c) != 0) 
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                 else if (readerShouldBlock()) 
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) 
                        // assert firstReaderHoldCount > 0;
                     else 
                        if (rh == null) 
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) 
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            
                        
                        if (rh.count == 0)
                            return -1;
                    
                
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) 
                    if (sharedCount(c) == 0) 
                        firstReader = current;
                        firstReaderHoldCount = 1;
                     else if (firstReader == current) 
                        firstReaderHoldCount++;
                     else 
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    
                    return 1;
                
            
        


void lockInterruptibly()

类似于lock()方法,不同之处在于,该方法会对中断进行响应,也就是当其他线程调用了该线程的interrupt()方法中断了当前线程时,当前线程会抛出InterruptedException异常。


boolean tryLock()

  • 尝试获取读锁,如果当前没有其他线程持有写锁,则当前线程获取读锁会成功,然后返回true 。

  • 如果当前已经有其他线程持有写锁则该方法直接返回false,但当前线程并不会被阻塞。

  • 如果当前线程已经持有了该读锁则简单增加AQS的状态值高16位后直接返回true。

其代码类似tryLock的代码,这里不再讲述。


boolean tryLock(long timeout, TimeUnit unit)

与tryLock()的不同之处在于,多了超时时间参数,如果尝试获取读锁失败则会把当前线程挂起指定时间,待超时时间到后当前线程被激活,如果此时还没有获取到读锁则返回false。

另外,该方法对中断响应,也就是当其他线程调用了该线程的interrupt()方法中断了当前线程时,当前线程会抛出InterruptedException异常。


void unlock()

  public void unlock() 
            sync.releaseShared(1);
        

如上代码具体释放锁的操作是委托给Sync类来做的,sync.rele

以上是关于JAVA并发编程——源码原理全面剖析与重点梳理的主要内容,如果未能解决你的问题,请参考以下文章

Java Review - 并发编程_读写锁ReentrantReadWriteLock的原理&源码剖析

Java Review - 并发编程_ 回环屏障CyclicBarrier原理&源码剖析

Java Review - 并发编程_ThreadPoolExecutor原理&源码剖析

Java Review - 并发编程_ CountDownLatch原理&源码剖析

Java Review - 并发编程_ScheduledThreadPoolExecutor原理&源码剖析

Java Review - 并发编程_ScheduledThreadPoolExecutor原理&源码剖析