java多线程进阶ThreadLocal

Posted 烟锁迷城

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java多线程进阶ThreadLocal相关的知识,希望对你有一定的参考价值。

目录

1、基本使用

2、常见方法

2.1、set

2.2、get

2.3、remove

2.4、withInitial

3、原理分析

3.1、set源码

3.2、replaceStaleEntry回收空位与数值插入

3.2.1、第一种情况

3.2.2、第二种情况

 3.2.3、第三种情况

 3.2.4、第四种情况

3.2、get方法

3.3、remove移除

4、内存泄露


1、基本使用

ThreadLocal的作用是在线程内保证其携带的变量安全,即在分割线程内变量的基础上实现线程安全,下面是一个使用的小例子。

public class ThreadLocalDemo 

    private static ThreadLocal<DateFormat> dateFormatThreadLocal = new ThreadLocal<>();

    public static DateFormat getDateFormat() throws ParseException 
        DateFormat dateFormat = dateFormatThreadLocal.get();
        if (dateFormat == null) 
            dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            dateFormatThreadLocal.set(dateFormat);
        
        return dateFormat;
    

    public static Date parse(String strDate) throws ParseException 
        return getDateFormat().parse(strDate);
    

    public static void main(String[] args) 
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) 
            executorService.execute(() -> 
                try 
                    System.out.println(parse("2022-06-01 14:10:10"));
                 catch (ParseException e) 
                    e.printStackTrace();
                
            );
        
    

2、常见方法

ThreadLocal具有一些常见的方法

2.1、set

在当前线程使用范围内,设置一个数值放到ThreadLocal中,这个值仅对当前线程可见,即创建了一个属于这个线程的数据副本。

2.2、get

从当前线程中获取到set的数值

2.3、remove

移除当前线程中的数据

2.4、withInitial

可以使用lambda表达式赋予ThreadLocal初始数值

3、原理分析

可以猜测,ThreadLocal是一个Map结构,set和get都是典型的方法,但是它的set和get都不需要key就可以取出数据,而且还是按照线程进行隔离,每个线程具有多个ThreadLocal,所以可以猜测其结构将以当前线程都有一个map,map以ThreadLocal为key。

有了猜测,就可以继续向下,阅读源码。

3.1、set源码

Thread t = Thread.currentThread();——获取到当前线程

ThreadLocalMap map = getMap(t);——获取到ThreadLocalMap

if (map != null) map.set(this, value);——初始化判断,如果不为空,就以this,即ThreadLocal本身为key,传入数据为value

else createMap(t, value);——若初始化为空,则进行初始化。

public void set(T value) 
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);

获取,将线程内的threadLocals取出

ThreadLocalMap getMap(Thread t) 
    return t.threadLocals;

初始化方法,创建Map,具体方法就是新建ThreadLocalMap

void createMap(Thread t, T firstValue) 
    t.threadLocals = new ThreadLocalMap(this, firstValue);

具体的ThreadLocalMap构造方法。

table = new Entry[INITIAL_CAPACITY];——创建一个长度为16的Entry数组

int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);——计算数组下标位置

table[i] = new Entry(firstKey, firstValue);——将数据存储到指定位置

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) 
    table = new Entry[INITIAL_CAPACITY];
    int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
    table[i] = new Entry(firstKey, firstValue);
    size = 1;
    setThreshold(INITIAL_CAPACITY);

Entry继承了WeakReference,是一个弱引用。

弱引用与垃圾回收有关,弱引用的对象可能在任何时刻被回收。

static class Entry extends WeakReference<ThreadLocal<?>> 
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) 
        super(k);
        value = v;
    

ThreadLocalMap的set方法,可以看到前三句是和初始化一致的。

接下来是循环,循环递次执行,从计算出来的i值开始,向后寻找。

循环内部,获取到key的数值,如果key相等,则进行替换

如果key为空,则执行replaceStaleEntry。key通常来说是不会为null的,但是Entry是弱引用,所以一旦被回收,key就可能为null

如果循环没有成功的将新数值存入,则i所在的位置一定为空,此时创建新的Entry,同时存入,这种解决Hash冲突的方式就是属于开放寻址法。

private void set(ThreadLocal<?> key, Object value) 

    // We don't use a fast path as with get() because it is at
    // least as common to use set() to create new entries as
    // it is to replace existing ones, in which case, a fast
    // path would fail more often than not.

    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);

    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) 
        ThreadLocal<?> k = e.get();

        if (k == key) 
            e.value = value;
            return;
        

        if (k == null) 
            replaceStaleEntry(key, value, i);
            return;
        
    

    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();

3.2、replaceStaleEntry回收空位与数值插入

replaceStaleEntry是一个比较复杂的方法,它会做两件事情,创建新的Entry存储value,清除掉原来无用的空值。

在这个方法里,有两个主要循环方法,第一个是向前查找,第二个是向后查找。

向前查找的是寻找到为空的数组位,记录位置到slotToExpunge

向后查找的是寻找到发生Hash冲突的数组位,并进行数值交换。

因此可以得到四种情况:

  1. 向前有脏Entry,向后有可覆盖的Entry
  2. 向前有脏Entry,向后没有可覆盖的Entry
  3. 向前没有脏Entry,向后有可覆盖的Entry
  4. 向前没有脏Entry,向后没有可覆盖的Entry

3.2.1、第一种情况

在向前的查询循环中,可以看到,这是一个正在寻找最小数组位为空的那个元素,并且直到出现数组位上的Entry为空才停止,并且将其赋予slotToExpunge,作为清除失效数据的范围起始标记。

在向后的查询循环中,可以看到,这是一个正在寻找第一个Hash冲突元素的数组位,当找到这个位置之后,执行e.value=value,旧的数值被直接替换掉,解决Hash冲突问题,然后执行

tab[i] = tab[staleSlot]

tab[staleSlot] = e;

这两句代码意味着传入的计算位置和找到的冲突位置的数据将发生一次替换,即冲突位的value与计算值的null发生一次交换,这样就让计算得到的结果显示在正确的数组下标位上,下次再进行插入的时候,可以直接得到Hash冲突的结论,避免发生数值重复问题。

接下来判断staleSlot是否和slotToExpunge一致,如果一致,slotToExpunge被赋予i的数值。

cleanSomeSlots,清理数组,清理的范围就是slotToExpunge至末尾。

这样,整个清理结束,数值替换也结束。

3.2.2、第二种情况

在向后没有Hash冲突的情况下,则开始执行下面的代码

tab[staleSlot].value = null;——将原来失效数值清除掉

tab[staleSlot] = new Entry(key, value);——注入需要的新数值Entry

 if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);——如果没有向后冲突,依然存在需要清除的失效位,则依旧执行清除

private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                               int staleSlot) 
    Entry[] tab = table;
    int len = tab.length;
    Entry e;

    // Back up to check for prior stale entry in current run.
    // We clean out whole runs at a time to avoid continual
    // incremental rehashing due to garbage collector freeing
    // up refs in bunches (i.e., whenever the collector runs).
    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;

    // Find either the key or trailing null slot of run, whichever
    // occurs first
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) 
        ThreadLocal<?> k = e.get();

        // If we find key, then we need to swap it
        // with the stale entry to maintain hash table order.
        // The newly stale slot, or any other stale slot
        // encountered above it, can then be sent to expungeStaleEntry
        // to remove or rehash all of the other entries in run.
        if (k == key) 
            e.value = value;

            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;

            // Start expunge at preceding stale entry if it exists
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        

        // If we didn't find stale entry on backward scan, the
        // first stale entry seen while scanning for key is the
        // first still present in the run.
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    

    // If key not found, put new entry in stale slot
    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);

    // If there are any other stale entries in run, expunge them
    if (slotToExpunge != staleSlot)
        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);

 3.2.3、第三种情况

和第一种其实是类似的,只不过清除位无需重新计算。

 3.2.4、第四种情况

和第二种情况类似。

3.2、get方法

get方法还是比较简单的,即从map中取数

Thread t = Thread.currentThread();——获取当前的线程

ThreadLocalMap map = getMap(t);——获取此线程的ThreadLocalMap

if (map != null) ——如果得到的map不为空

ThreadLocalMap.Entry e = map.getEntry(this);——获取到指定的Entry

if (e != null) T result = (T)e.value; return result; ——如果取出的数值不为空,则返回需要的数值。

return setInitialValue();——如果map为空,证明还未进行初始化,初始化方法使用initialValue作为初始值,进行ThreadLocalMap的初始化构建。

public T get() 
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) 
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) 
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        
    
    return setInitialValue();

3.3、remove移除

remove是很简单的方法。

方法内依旧使用了线性探测来找到对应的key,移除key对应数据的同时,也会清除失效数据

public void remove() 
    ThreadLocalMap m = getMap(Thread.currentThread());
    if (m != null)
        m.remove(this);

private void remove(ThreadLocal<?> key) 
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) 
        if (e.get() == key) 
            e.clear();
            expungeStaleEntry(i);
            return;
        
    

4、内存泄露

虽然ThreadLocal的Entry是弱引用,但依旧不可避免的会出现回收不及时或无法回收的情况,因此,每一个set数值最好都使用remove方法来移除,彻底避免内存泄露的问题。

在线程池中,尤其需要注意,因为线程是循环使用的,ThreadLocal有可能一直不会被回收,最后导致内存溢出。

以上是关于java多线程进阶ThreadLocal的主要内容,如果未能解决你的问题,请参考以下文章

Java 线程系列ThreadLocal进阶解析

java进阶之路-java中的threadlocal源码实现

java十:ThreadLocal源码分析

[Java复习] 多线程 并发 JUC 补充

多线程进阶

Java多线程——ThreadLocal类