Java并发编程——ThreadLocalAQS线程安全集合类

Posted AC_Jobim

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程——ThreadLocalAQS线程安全集合类相关的知识,希望对你有一定的参考价值。

Java并发编程(三)——ThreadLocal、AQS、线程安全集合类

一、ThreadLocal详解

好的博客:
一针见血ThreadLocal
Java:关于 ThreadLocal 的知识来了
由浅入深,全面解析ThreadLocal

1.1 ThreadLocal的使用

ThreadLocal的作用:

  • ThreadLocal 能实现每一个线程都有自己专属的本地变量副本,不同线程之间不会相互干扰,主要解决了让每个线程绑定自己的值,通过使用get()和set()方法,获取默认值或将其值更改为当前线程所存的副本的值从而避免了线程安全问题。

ThreadLocal的使用场景:

  • 在Java的多线程编程中,为保证多个线程对共享变量的安全访问,通常会使用synchronized来保证同一时刻只有一个线程对共享变量进行操作。这种情况下可以将类变量放到ThreadLocal类型的对象中,使变量在每个线程中都有独立拷贝,不会出现一个线程读取变量时而被另一个线程修改的现象
  1. 在进行对象跨层传递的时候,使用ThreadLocal可以避免多次传递,打破层次间的约束。
  2. 线程间数据隔离
  3. 进行事务操作,用于存储线程事务信息。
  4. 数据库连接,Session会话管理。

ThreadLocal 和Synchronized的区别:

synchronizedThreadLocal
原理同步机制采用以时间换空间的方式,只提供了一份变量, 让不同的线程排队访问ThreadLocal采用以空间换时间的方式, 为每一个线程都提供了一份变量的副本, 从而实现同访问而相不干扰
侧重点多个线程之间访问资源的同步多线程中让每个线程之间的数据相互隔离

ThreadLocal 常用的 API 介绍

/**
返回该线程局部变量在当前线程副本中的值。如果该变量对于当前线程没有值,它首先被初始化调用 initialValue 方法得到返回的值。
*/
public T get() 
/**
返回当前线程的这个线程局部变量的“初始值”。该方法将在线程第一次使用 get 方法访问变量时被调用
除非线程之前调用了set 方法,在这种情况下,initialValue 方法将不会被线程调用。
通常,这个方法在每个线程中最多调用一次,但是在后续调用 remove 和 get 的情况下,它可能会被再次调用。
*/
protected T initialValue() 
	return null;

/**
删除当前线程局部变量的值。如果这个线程局部变量随后被当前线程调用了 get ,它的值将通过调用它的 initialValue 方法重新初始化,除非它的值在过渡期间被当前线程调用了 set 。这可能导致在当前线程中多次调用 initialValue 方法。
*/
public void remove() 
/**
将当前线程的这个线程局部变量的副本设置为指定的值。大多数子类将不需要覆盖这个方法,仅仅依靠 initialValue 方法来设置线程局部变量的值
*/
public void set(T value) 
/**
创建线程局部变量。变量的初始值是通过方法上 Supplier 的 get 方法来确定的。jdk1.8 才有的。
*/
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) 

ThreadLocal 简单使用,下面代码为两个线程进行买票,买票之间互相不影响。

class House 
    ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);

    public void saleHouse() 
        Integer value = threadLocal.get();
        value++;
        threadLocal.set(value);
    


/**
 * 两个线程,每个线程操作自己的数据
 */
public class ThreadLocalDemo 

    public static void main(String[] args) 

        House house = new House();

        new Thread(() -> 
            try 
                for (int i = 1; i <=3; i++) 
                    house.saleHouse(); // 线程t1增加了三次
                
                System.out.println(Thread.currentThread().getName()+"\\t"+"---"+house.threadLocal.get()); //t1	---3
            finally 
                house.threadLocal.remove();// 如果不清理自定义的 ThreadLocal 变量,可能会影响后续业务逻辑和造成内存泄露等问题
            
        ,"t1").start();

        new Thread(() -> 
            try 
                for (int i = 1; i <=2; i++) 
                    house.saleHouse(); // 线程t2增加了两次
                
                System.out.println(Thread.currentThread().getName()+"\\t"+"---"+house.threadLocal.get()); //t2	---2
            finally 
                house.threadLocal.remove();
            
        ,"t2").start();

        System.out.println(Thread.currentThread().getName()+"\\t"+"---"+house.threadLocal.get()); //main	---0
    

打印结果:

可以看到每个线程都有自己的threadLocal值

SimpleDateFormat 线程安全的使用:

  • 多线程下使用 SimpleDateFormat 有线程安全问题,如果非要使用,建议为每个线程创建独立的格式实例,如果多线程同时访问一个格式,则它必须保持外部同步。解决办法:1、将 SimpleDateFormat 定义为局部变量;2、使用 ThreadLocal ,也叫做线程本地变量或者线程本地存储;3、加锁

正确使用代码示例:(构建日期转换工具类)

public class DateUtils 
    private static final ThreadLocal<SimpleDateFormat>  sdf_threadLocal =
            ThreadLocal.withInitial(()-> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));

    /**
     * ThreadLocal可以确保每个线程都可以得到各自单独的一个SimpleDateFormat的对象,那么自然也就不存在竞争问题了。
     * @param stringDate
     * @return
     * @throws Exception
     */
    public static Date parseDateTL(String stringDate)throws Exception 
        return sdf_threadLocal.get().parse(stringDate);
    

    public static void main(String[] args) throws Exception 
        for (int i = 1; i <=30; i++) 
            new Thread(() -> 
                try 
                    System.out.println(DateUtils.parseDateTL("2020-11-11 11:11:11"));
                 catch (Exception e) 
                    e.printStackTrace();
                
            ,String.valueOf(i)).start();
        
    

1.2 ThreadLocal实现原理

Thread,ThreadLocal,ThreadLocalMap,Entry的关系:

  • 首先ThreadLocalMap是以数组形式存储一个个Entry键值对的,它是Thread的一个静态内部类,而Entry是ThreadLocalMap的静态内部类,Entry的key就是new出来的ThreadLocal,value就是set入的值,所以一个Thread可以有多个ThreadLocal-value键值对。
  • JVM内部维护了一个线程版的Map<Thread,T>(通过ThreadLocal对象的set方法,结果把ThreadLocal对象自己当做key,放进了ThreadLoalMap中),每个线程要用到这个T的时候,用当前的线程去Map里面获取,通过这样让每个线程都拥有了自己独立的变量,

  • set方法分析

    1. 首先获取当前线程,并根据当前线程获取一个Map
    2. 如果获取的Map不为空,则将参数设置到Map中(当前ThreadLocal的引用作为key)
    3. 如果Map为空,则给该线程创建 Map,并设置初始值
    // 作用:将当前线程的这个线程局部变量的副本设置为指定的值
    public void set(T value) 
        // 拿到当前线程
        Thread t = Thread.currentThread();
        // 获取线程对应的ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        // 如果map不为空,设置键值对,this代表调用这个方法的ThreadLocal对象
        if (map != null)
            map.set(this, value);
        else
            // 1)当前线程Thread 不存在ThreadLocalMap对象
            // 2)则调用createMap进行ThreadLocalMap对象的初始化
            // 3)并将 t(当前线程)和value(t对应的值)作为第一个entry存放至ThreadLocalMap中
            createMap(t, value);
    
    
    /**
     * 获取当前线程Thread对应维护的ThreadLocalMap 
     *
     * @param  t the current thread 当前线程
     * @return the map 对应维护的ThreadLocalMap 
     */
    ThreadLocalMap getMap(Thread t) 
        return t.threadLocals;
    
    
    /**
     *创建当前线程Thread对应维护的ThreadLocalMap 
     *
     * @param t 当前线程
     * @param firstValue 存放到map中第一个entry的值
     */
    void createMap(Thread t, T firstValue) 
        //这里的this是调用此方法的threadLocal
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    
    
  • get方法分析

    1. 首先获取当前线程, 根据当前线程获取一个Map
    2. 如果获取的Map不为空,则在Map中以ThreadLocal的引用作为key来在Map中获取对应的Entry e,否则转到4
    3. 如果e不为null,则返回e.value,否则转到4
    4. Map为空或者e为空,则通过initialValue函数获取初始值value,然后用ThreadLocal的引用和value作为firstKey和firstValue创建一个新的Map
    public T get() 
        // 获取当前线程对象
        Thread t = Thread.currentThread();
        // 获取此线程对象中维护的ThreadLocalMap对象
        ThreadLocalMap map = getMap(t);
        // 如果此map存在
        if (map != null) 
            // 以当前的ThreadLocal 为 key,调用getEntry获取对应的存储实体e
            ThreadLocalMap.Entry e = map.getEntry(this);
            // 对e进行判空 
            if (e != null) 
                @SuppressWarnings("unchecked")
                // 获取存储实体 e 对应的 value值
                // 即为我们想要的当前线程对应此ThreadLocal的值
                T result = (T)e.value;
                return result;
            
        
        /*
            初始化 : 有两种情况有执行当前代码
            第一种情况: map不存在,表示此线程没有维护的ThreadLocalMap对象
            第二种情况: map存在, 但是没有与当前ThreadLocal关联的entry
         */
        return setInitialValue();
    
    
    // 初始化操作
    private T setInitialValue() 
        // 调用initialValue获取初始化的值
        // 此方法可以被子类重写, 如果不重写默认返回null
        T value = initialValue();
        // 获取当前线程对象
        Thread t = Thread.currentThread();
        // 获取此线程对象中维护的ThreadLocalMap对象
        ThreadLocalMap map = getMap(t);
        // 判断map是否存在
        if (map != null)
            // 存在则调用map.set设置此实体entry
            map.set(this, value);
        else
            // 1)当前线程Thread 不存在ThreadLocalMap对象
            // 2)则调用createMap进行ThreadLocalMap对象的初始化
            // 3)并将 t(当前线程)和value(t对应的值)作为第一个entry存放至ThreadLocalMap中
            createMap(t, value);
        // 返回设置的值value
        return value;
    
    
  • remove方法分析

    1. 首先获取当前线程,并根据当前线程获取一个Map
    2. 如果获取的Map不为空,则移除当前ThreadLocal对象对应的entry
    /**
     * 删除当前线程中保存的ThreadLocal对应的实体entry
     */
    public void remove() 
        // 获取当前线程对象中维护的ThreadLocalMap对象
        ThreadLocalMap m = getMap(Thread.currentThread());
        // 如果此map存在
        if (m != null)
            // 存在则调用map.remove
            // 以当前ThreadLocal为key删除对应的实体entry
            m.remove(this);
    
    

ThreadLocal的原理:

  • 每一个 Thread 对象均含有一个 ThreadLocalMap 类型的成员变量 threadLocals ,它存储本线程中所
    有ThreadLocal对象及其对应的值
  • ThreadLocalMap 由一个个 Entry 对象构成,Entry 继承自 WeakReference<ThreadLocal<?>> ,一个 Entry 由 ThreadLocal 对象和 Object 构成。由此可见, Entry 的key是ThreadLocal对象,并且是一个弱引用。当没指向key的强引用后,该key就会被垃圾收集器回收
  • 当执行set方法时,ThreadLocal首先会获取当前线程对象,然后获取当前线程的ThreadLocalMap对
    再以当前ThreadLocal对象为key,将值存储进ThreadLocalMap对象中
  • get方法执行过程类似。ThreadLocal首先会获取当前线程对象,然后获取当前线程的ThreadLocalMap
    对象。再以当前ThreadLocal对象为key,获取对应的value。
  • ThreadLocalMap的键值为ThreadLocal对象,而且可以有多个threadLocal变量,因此保存在map中
  • ThreadLocal本身并不存储值,它只是作为一个key来让线程从ThreadLocalMap获取value
  • 由于每一条线程均含有各自私有的ThreadLocalMap容器,这些容器相互独立互不影响,因此不会存在
    线程安全性问题,从而也无需使用同步机制来保证多条线程访问容器的互斥性。

1.3 ThreadLocal内存泄漏

好的博客:ThreadLocal 内存泄露问题

什么是内存泄漏?

  • Memory Leak 程序中已经动态分配的堆内存由于某种原因, 程序未释放或者无法释放, 造成系统内部的浪费, 导致程序运行速度减缓甚至系统崩溃等严重结果. 内存泄漏的堆积终将导致内存溢出
  • 即:不再会被使用的对象或者变量占用的内存不能被回收,就是内存泄露。

1.3.1 强、软、弱、虚、四大引用

  1. 强引用(StrongReference)

    强引用是使用最普遍的引用。如果一个对象具有强引用,那垃圾回收器绝不会回收它。

    Object o=new Object();   //  强引用
    o=null;     // 帮助垃圾收集器回收此对象
    
  2. 软引用(SoftReference)

    如果一个对象只具有软引用,则内存空间足够,垃圾回收器就不会回收它;如果内存空间不足了,就会回收这些对象的内存。只要垃圾回收器没有回收它,该对象就可以被程序使用。软引用可用来实现内存敏感的高速缓存。

    //当我们内存不够用的时候,soft会被回收的情况
    SoftReference<MyObject> softReference = new SoftReference<>(new Object());
    
  3. 弱引用(WeakReference)

    对于只有弱引用的对象来说,只要垃圾回收机制一运行,不管JVM的内存空间是否足够,都会回收该对象占用的内存。

    //垃圾回收机制一运行,会回收该对象占用的内存
    WeakReference<MyObject> weakReference = new WeakReference<>(new Object());
    
  4. 虚引用(PhantomReference)

    顾名思义,就是形同虚设,与其他几种引用都不同,虚引用并不会决定对象的生命周期。如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收器回收,它不能单独使用也不能通过它访问对象。

    虚引用必须和引用队列 (ReferenceQueue)联合使用,当垃圾回收器准备回收一个对象时,如果发现它还有虚引用,就会在回收对象的内存之前,把这个虚引用加入到与之关联的引用队列中。

    ReferenceQueue<MyObject> referenceQueue = new ReferenceQueue();
    //和引用队列进行关联,当虚引用对象被回收后,会进入ReferenceQueue队列中
    PhantomReference<MyObject> phantomReference = new PhantomReference<>(new MyObject(),referenceQueue);
    

1.3.2 原因分析

ThreadLocal内存泄露的原因:

ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,而 value 是强引用。

所以,如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。这样一来,ThreadLocalMap 中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。

  • 假设ThreadLocalMap中的key使用了强引用, 那么会出现内存泄漏吗?

    ThreadLocalMap中的key使用了强引用, 是无法完全避免内存泄漏的

因此,ThreadLocal内存泄漏的根源是:

  1. 没有手动侧除这个 Entry
  2. CurrentThread 当前线程依然运行,由于ThreadLocalMap的生命周期跟Thread一样长,如果没有手动删除对应key就会导致内存泄漏,而不是因为弱引用。

那么为什么 key 要用弱引用呢?

  • 事实上,在 ThreadLocalMap 中的set/getEntry 方法中,会对 key 为 null(也即是 ThreadLocal 为 null )进行判断,如果为 null 的话,那么会把 value 置为 null 的。

  • 这就意味着使用完 ThreadLocal , CurrentThread 依然运行的前提下.就算忘记调用 remove 方法,弱引用比强引用可以多一层保障:弱引用的 ThreadLocal 会被回收.对应value在下一次 ThreadLocaIMap 调用 set/get/remove 中的任一方法的时候会被清除,从而避免内存泄漏.

二、AQS详解

好的博客:

Lock简介与初识AQS

AQS(AbstractQueuedSynchronizer)详解与源码分析

2.1 AQS介绍

什么是AQS?

  • AQS,全程AbstractQueuedSynchronizer,位于java.util.concurrent.locks包下。
  • 是JDK1.5提供的一套用于实现阻塞锁和一系列依赖FIFO等待队列的同步器(First Input First Output先进先出)的框架实现。是除了java自带的synchronized 关键字之外的锁机制。 可以将AQS作为一个队列来理解。
  • 我们常用的ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的。具体用法是通过继承AQS,并实现其模板方法,来达到同步状态的管理。

AQS的功能在使用中可以分为两种:独占锁和共享锁

  • 独占锁:每次只能有一个线程持有锁。eg: ReentrantLock就是独占锁
  • 共享锁:允许多个线程同时获得锁,并发访问共享资源。eg: ReentrantReadWriteLock中的读锁、CountDownL.atch

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。


AQS设计是基于模板方法模式的,一般的使用方式是:

1.使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)

2.将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

我们来看看AQS定义的这些可重写的方法:

  • protected boolean tryAcquire(int arg): 独占式获取同步状态,试着获取,成功返回true,反之为false
  • protected boolean tryRelease(int arg) :独占式释放同步状态,等待中的其他线程此时将有机会获取到同步状态;
  • protected int tryAcquireShared(int arg) :共享式获取同步状态,返回值大于等于0,代表获取成功;反之获取失败;
  • protected boolean tryReleaseShared(int arg) :共享式释放同步状态,成功为true,失败为false
  • protected boolean isHeldExclusively(): 是否在独占模式下被线程占用。

AQS提供的模板方法:

  • 独占锁

    void acquire(int arg);// 独占式获取同步状态,如果获取失败则插入同步队列进行等待;
    void acquireInterruptibly(int arg);// 与acquire方法相同,但在同步队列中进行等待的时候可以检测中断;
    boolean tryAcquireNanos(int arg, long nanosTimeout);// 在acquireInterruptibly基础上增加了超时等待功能,在超时时间内没有获得同步状态返回false;
    boolean release(int arg);// 释放同步状态,该方法会唤醒在同步队列中的下一个节点
    
  • 共享锁

    void acquireShared(int arg);// 共享式获取同步状态,与独占式的区别在于同一时刻有多个线程获取同步状态;
    void acquireSharedInterruptibly(int arg);// 在acquireShared方法基础上增加了能响应中断的功能;
    boolean tryAcquireSharedNanos(int arg, long nanosTimeout);// 在acquireSharedInterruptibly基础上增加了超时等待的功能;
    boolean releaseShared(int arg);// 共享式释放同步状态
    

使用总结:

  • 首先,我们需要去继承AbstractQueuedSynchronizer这个类,然后我们根据我们的需求去重写相应的方法,比如要实现一个独占锁,那就去重写tryAcquire,tryRelease方法,要实现共享锁,就去重写tryAcquireShared,tryReleaseShared;
  • 然后,在我们的组件中调用AQS中的模板方法就可以了,而这些模板方法是会调用到我们之前重写的那些方法的。也就是说,我们只需要很小的工作量就可以实现自己的同步组件,重写的那些方法,仅仅是一些简单的对于共享资源state的获取和释放操作,至于像是获取资源失败,线程需要阻塞之类的操作,自然是AQS帮我们完成了。

2.2 AQS源码分析

AQS的基本实现:

  • AQS维护一个共享资源state,通过内置的FIFO来完成获取资源线程的排队工作。(这个内置的同步队列称为"CLH"队列)。该队列由一个一个的Node结点组成,每个Node结点维护一个prev引用和next引用,分别指向自己的前驱和后继结点。AQS维护两个指针,分别指向队列头部head和尾部tail。

  • 其实就是个双端双向链表

  • 当线程获取资源失败(比如tryAcquire时试图设置state状态失败),会被构造成一个结点加入CLH队列中,同时当前线程会被阻塞在队列中(通过LockSupport.park实现,其实是等待态)。当持有同步状态的线程释放同步状态时,会唤醒后继结点,然后此结点线程继续加入到对同步状态的争夺中。

AQS内部结构代码:

static final class Node  //

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state; // 同步状态

Node节点结构:

static final class Node 
    // 表示线程已被取消(等待超时或者被中断)
    static final int CANCELLED =  1;
    // 表示线程已经准备好了,就等资源释放了
    static final int SIGNAL  = -1;
    // 表示节点在等待队列中,节点线程等待唤醒
    static final int CONDITION = -2;
    // 表示下一次共享式同步状态会被无条件地传播下去 
    static final int PROPAGATE = -3;
    // Node初始化的时候的默认值
    volatile int waitStatus;
    /**当前结点的前驱结点 */
    volatile Node prev;
    /** 当前结点的后继结点 */
    volatile Node next;
    /** 与当前结点关联的排队中的线程 */
    volatile Thread thread;
    /** ...... */

以ReentrantLock中非公平锁分析AQS的执行过程