#yyds干货盘点# 深度揭秘Netty中的FastThreadLocal为什么比ThreadLoc

Posted 跟着Mic学架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了#yyds干货盘点# 深度揭秘Netty中的FastThreadLocal为什么比ThreadLoc相关的知识,希望对你有一定的参考价值。

1. 详细剖析分布式微服务架构下网络通信的底层实现原理(图解)

2. (年薪60W的技巧)工作了5年,你真的理解Netty以及为什么要用吗?(深度干货)

3. 深度解析Netty中的核心组件(图解+实例)

4. BAT面试必问细节:关于Netty中的ByteBuf详解

5. 通过大量实战案例分解Netty中是如何解决拆包黏包问题的?

6. 基于Netty实现自定义消息通信协议(协议设计及解析应用实战)

7. 全网最详细最齐全的序列化技术及深度解析与应用实战

8. 手把手教你基于Netty实现一个基础的RPC框架(通俗易懂)

9. (年薪60W分水岭)基于Netty手写实现RPC框架进阶篇(带注册中心和注解)

FastThreadLocal的实现与J.U.C包中的ThreadLocal非常类似。

了解过ThreadLocal原理的同学应该都清楚,它有几个关键的对象.

  1. Thread
  2. ThreadLocalMap
  3. ThreadLocal

同样,Netty专门为FastThreadLocal量身打造了FastThreadLocalThreadInternalThreadLocalMap两个重要的类。下面我们看下这两个类是如何实现的。

FastThreadLocalThread是对Thread类的一层包装,每个线程对应一个InternalThreadLocalMap实例。只有FastThreadLocalFastThreadLocalThread组合使用时,才能发挥 FastThreadLocal的性能优势。首先看下FastThreadLocalThread的源码定义:

public class FastThreadLocalThread extends Thread {

    private InternalThreadLocalMap threadLocalMap;
    // 省略其他代码
}

可以看出 FastThreadLocalThread 主要扩展了 InternalThreadLocalMap 字段,我们可以猜测到 FastThreadLocalThread 主要使用 InternalThreadLocalMap 存储数据,而不再是使用 Thread 中的 ThreadLocalMap。所以想知道 FastThreadLocalThread 高性能的奥秘,必须要了解 InternalThreadLocalMap 的设计原理。

InternalThreadLocalMap

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {

    private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8;

    private static final int STRING_BUILDER_INITIAL_SIZE;

    private static final int STRING_BUILDER_MAX_SIZE;

    public static final Object UNSET = new Object();

    private BitSet cleanerFlags;
    private InternalThreadLocalMap() {
        indexedVariables = newIndexedVariableTable();
    }
    private static Object[] newIndexedVariableTable() {
        Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];
        Arrays.fill(array, UNSET);
        return array;
    }
    public static int lastVariableIndex() {
        return nextIndex.get() - 1;
    }

    public static int nextVariableIndex() {
        int index = nextIndex.getAndIncrement();
        if (index < 0) {
            nextIndex.decrementAndGet();
            throw new IllegalStateException("too many thread-local indexed variables");
        }
        return index;
    }
    // 省略

}

从 InternalThreadLocalMap 内部实现来看,与 ThreadLocalMap 一样都是采用数组的存储方式。

了解ThreadLocal的同学都知道,它内部也是采用数组的方式来实现hash表,对于hash冲突,采用了线性探索的方式来实现。

但是 InternalThreadLocalMap 并没有使用线性探测法来解决 Hash 冲突,而是在 FastThreadLocal 初始化的时候分配一个数组索引 index,index 的值采用原子类 AtomicInteger 保证顺序递增,通过调用 InternalThreadLocalMap.nextVariableIndex() 方法获得。然后在读写数据的时候通过数组下标 index 直接定位到 FastThreadLocal 的位置,时间复杂度为 O(1)。如果数组下标递增到非常大,那么数组也会比较大,所以 FastThreadLocal 是通过空间换时间的思想提升读写性能。

下面通过一幅图描述 InternalThreadLocalMap、index 和 FastThreadLocal 之间的关系。

通过上面 FastThreadLocal 的内部结构图,我们对比下与 ThreadLocal 有哪些区别呢?

FastThreadLocal 使用 Object 数组替代了 Entry 数组,Object[0] 存储的是一个Set<FastThreadLocal<?>> 集合。

从数组下标 1 开始都是直接存储的 value 数据,不再采用 ThreadLocal 的键值对形式进行存储。

假设现在我们有一批数据需要添加到数组中,分别为 value1、value2、value3、value4,对应的 FastThreadLocal 在初始化的时候生成的数组索引分别为 1、2、3、4。如下图所示。

至此,我们已经对 FastThreadLocal 有了一个基本的认识,下面我们结合具体的源码分析 FastThreadLocal 的实现原理。

FastThreadLocal的set方法源码分析

在讲解源码之前,我们回过头看下上文中的 ThreadLocal 示例,如果把示例中 ThreadLocal 替换成 FastThread,应当如何使用呢?

public class FastThreadLocalTest {

    private static final FastThreadLocal<String> THREAD_NAME_LOCAL = new FastThreadLocal<>();
    private static final FastThreadLocal<TradeOrder> TRADE_THREAD_LOCAL = new FastThreadLocal<>();
    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            int tradeId = i;
            String threadName = "thread-" + i;
            new FastThreadLocalThread(() -> {
                THREAD_NAME_LOCAL.set(threadName);
                TradeOrder tradeOrder = new TradeOrder(tradeId, tradeId % 2 == 0 ? "已支付" : "未支付");
                TRADE_THREAD_LOCAL.set(tradeOrder);
                System.out.println("threadName: " + THREAD_NAME_LOCAL.get());
                System.out.println("tradeOrder info:" + TRADE_THREAD_LOCAL.get());
            }, threadName).start();

        }
    }
}

可以看出,FastThreadLocal 的使用方法几乎和 ThreadLocal 保持一致,只需要把代码中 Thread、ThreadLocal 替换为 FastThreadLocalThread 和 FastThreadLocal 即可,Netty 在易用性方面做得相当棒。下面我们重点对示例中用得到 FastThreadLocal.set()/get() 方法做深入分析。

public final void set(V value) {
    if (value != InternalThreadLocalMap.UNSET) {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        setKnownNotUnset(threadLocalMap, value);
    } else {
        remove();
    }
}

FastThreadLocal.set() 方法实现并不难理解,先抓住代码主干,一步步进行拆解分析。set() 的过程主要分为三步:

  1. 判断 value 是否为缺省值,如果等于缺省值,那么直接调用 remove() 方法。这里我们还不知道缺省值和 remove() 之间的联系是什么,我们暂且把 remove() 放在最后分析。
  2. 如果 value 不等于缺省值,接下来会获取当前线程的 InternalThreadLocalMap。
  3. 然后将 InternalThreadLocalMap 中对应数据替换为新的 value。

InternalThreadLocalMap.get()

先来看InternalThreadLocalMap.get()方法:

public static InternalThreadLocalMap get() {
    Thread thread = Thread.currentThread();
    if (thread instanceof FastThreadLocalThread) {
        return fastGet((FastThreadLocalThread) thread);
    } else {
        return slowGet();
    }
}

如果thread实例类型是FastThreadLocalThread,则调用fastGet()。

InternalThreadLocalMap.get() 逻辑很简单.

  1. 如果当前线程是 FastThreadLocalThread 类型,那么直接通过 fastGet() 方法获取 FastThreadLocalThread 的 threadLocalMap 属性即可
  2. 如果此时 InternalThreadLocalMap 不存在,直接创建一个返回。
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
  InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
  if (threadLocalMap == null) {
    thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
  }
  return threadLocalMap;
}

否则,则调用slowGet(),从代码实现来看,slowGet() 是针对非 FastThreadLocalThread 类型的线程发起调用时的一种兜底方案。如果当前线程不是 FastThreadLocalThread,内部是没有 InternalThreadLocalMap 属性的,Netty 在 UnpaddedInternalThreadLocalMap 中保存了一个 JDK 原生的 ThreadLocal,ThreadLocal 中存放着 InternalThreadLocalMap,此时获取 InternalThreadLocalMap 就退化成 JDK 原生的 ThreadLocal 获取。

private static InternalThreadLocalMap slowGet() {
  InternalThreadLocalMap ret = slowThreadLocalMap.get();
  if (ret == null) {
    ret = new InternalThreadLocalMap();
    slowThreadLocalMap.set(ret);
  }
  return ret;
}

setKnownNotUnset

获取 InternalThreadLocalMap 的过程已经讲完了,下面看下 setKnownNotUnset() 如何将数据添加到 InternalThreadLocalMap 的。

private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
    if (threadLocalMap.setIndexedVariable(index, value)) {
        addToVariablesToRemove(threadLocalMap, this);
    }
}

setKnownNotUnset() 主要做了两件事:

  1. 找到数组下标 index 位置,设置新的 value。
  2. 将 FastThreadLocal 对象保存到待清理的 Set 中。

首先我们看下第一步 threadLocalMap.setIndexedVariable() 的源码实现:

public boolean setIndexedVariable(int index, Object value) {
    Object[] lookup = indexedVariables;
    if (index < lookup.length) {
        Object oldValue = lookup[index];
        lookup[index] = value;
        return oldValue == UNSET;
    } else {
        expandIndexedVariableTableAndSet(index, value);
        return true;
    }
}

indexedVariables 就是 InternalThreadLocalMap 中用于存放数据的数组,如果数组容量大于 FastThreadLocal 的 index 索引,那么直接找到数组下标 index 位置将新 value 设置进去,事件复杂度为 O(1)。在设置新的 value 之前,会将之前 index 位置的元素取出,如果旧的元素还是 UNSET 缺省对象,那么返回成功。

如果数组容量不够了怎么办呢?InternalThreadLocalMap 会自动扩容,然后再设置 value。接下来看看 expandIndexedVariableTableAndSet() 的扩容逻辑:

private void expandIndexedVariableTableAndSet(int index, Object value) {
    Object[] oldArray = indexedVariables;
    final int oldCapacity = oldArray.length;
    int newCapacity = index;
    newCapacity |= newCapacity >>>  1;
    newCapacity |= newCapacity >>>  2;
    newCapacity |= newCapacity >>>  4;
    newCapacity |= newCapacity >>>  8;
    newCapacity |= newCapacity >>> 16;
    newCapacity ++;

    Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
    Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
    newArray[index] = value;
    indexedVariables = newArray;
}

可以看出 InternalThreadLocalMap 实现数组扩容几乎和 HashMap 完全是一模一样的,所以多读源码还是可以给我们很多启发的。InternalThreadLocalMap 以 index 为基准进行扩容,将数组扩容后的容量向上取整为 2 的次幂。然后将原数组内容拷贝到新的数组中,空余部分填充缺省对象 UNSET,最终把新数组赋值给 indexedVariables。

思考关于基准扩容

假设现在初始化了 70 个 FastThreadLocal,但是这些 FastThreadLocal 从来没有调用过 set() 方法,此时数组还是默认长度 32。当第 index = 70 的 FastThreadLocal 调用 set() 方法时,如果按原数组容量 32 进行扩容 2 倍后,还是无法填充 index = 70 的数据。所以使用 index 为基准进行扩容可以解决这个问题,但是如果 FastThreadLocal 特别多,数组的长度也是非常大的。

回到 setKnownNotUnset() 的主流程,向 InternalThreadLocalMap 添加完数据之后,接下就是将 FastThreadLocal 对象保存到待清理的 Set 中。我们继续看下 addToVariablesToRemove() 是如何实现的:

addToVariablesToRemove

private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
    Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
    Set<FastThreadLocal<?>> variablesToRemove;
    if (v == InternalThreadLocalMap.UNSET || v == null) {
        variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
        threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
    } else {
        variablesToRemove = (Set<FastThreadLocal<?>>) v;
    }

    variablesToRemove.add(variable);
}

variablesToRemoveIndex 是采用 static final 修饰的变量,在 FastThreadLocal 初始化时 variablesToRemoveIndex 被赋值为 0。InternalThreadLocalMap 首先会找到数组下标为 0 的元素.

  1. 如果该元素是缺省对象 UNSET 或者不存在,那么会创建一个 FastThreadLocal 类型的 Set 集合,然后把 Set 集合填充到数组下标 0 的位置。
  2. 如果数组第一个元素不是缺省对象 UNSET,说明 Set 集合已经被填充,直接强转获得 Set 集合即可。这就解释了 InternalThreadLocalMap 的 value 数据为什么是从下标为 1 的位置开始存储了,因为 0 的位置已经被 Set 集合占用了。

思考关于Set集合设计

public final void remove(InternalThreadLocalMap threadLocalMap) {
  if (threadLocalMap == null) {
    return;
  }

  Object v = threadLocalMap.removeIndexedVariable(index);
  removeFromVariablesToRemove(threadLocalMap, this);

  if (v != InternalThreadLocalMap.UNSET) {
    try {
      onRemoval((V) v);
    } catch (Exception e) {
      PlatformDependent.throwException(e);
    }
  }
}

在执行 remove 操作之前,会调用 InternalThreadLocalMap.getIfSet() 获取当前 InternalThreadLocalMap。

有了之前的基础,理解 getIfSet() 方法就非常简单了。

  1. 如果是 FastThreadLocalThread 类型,直接取 FastThreadLocalThread 中 threadLocalMap 属性。
  2. 如果是普通线程 Thread,从 ThreadLocal 类型的 slowThreadLocalMap 中获取。

找到 InternalThreadLocalMap 之后,InternalThreadLocalMap 会从数组中定位到下标 index 位置的元素,并将 index 位置的元素覆盖为缺省对象 UNSET。

接下来就需要清理当前的 FastThreadLocal 对象,此时 Set 集合就派上了用场,InternalThreadLocalMap 会取出数组下标 0 位置的 Set 集合,然后删除当前 FastThreadLocal。最后 onRemoval() 方法起到什么作用呢?Netty 只是留了一处扩展,并没有实现,用户需要在删除的时候做一些后置操作,可以继承 FastThreadLocal 实现该方法。

FastThreadLocal.get()源码分析

public final V get() {
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }

    return initialize(threadLocalMap);
}

首先根据当前线程是否是 FastThreadLocalThread 类型找到 InternalThreadLocalMap,然后取出从数组下标 index 的元素,如果 index 位置的元素不是缺省对象 UNSET,说明该位置已经填充过数据,直接取出返回即可。

public Object indexedVariable(int index) {
  Object[] lookup = indexedVariables;
  return index < lookup.length? lookup[index] : UNSET;
}

如果 index 位置的元素是缺省对象 UNSET,那么需要执行初始化操作。可以看到,initialize() 方法会调用用户重写的 initialValue 方法构造需要存储的对象数据.

private V initialize(InternalThreadLocalMap threadLocalMap) {
    V v = null;
    try {
        v = initialValue();
    } catch (Exception e) {
        PlatformDependent.throwException(e);
    }

    threadLocalMap.setIndexedVariable(index, v);
    addToVariablesToRemove(threadLocalMap, this);
    return v;
}
private final FastThreadLocal<String> threadLocal = new FastThreadLocal<String>() {
  @Override
  protected String initialValue() {
    return "hello world";
  }
};

构造完用户对象数据之后,接下来就会将它填充到数组 index 的位置,然后再把当前 FastThreadLocal 对象保存到待清理的 Set 中。整个过程我们在分析 FastThreadLocal.set() 时都已经介绍过,就不再赘述了。

到此为止,FastThreadLocal 最核心的两个方法 set()/get() 我们已经分析完了。下面有两个问题我们再深入思考下。

  1. FastThreadLocal 真的一定比 ThreadLocal 快吗?答案是不一定的,只有使用FastThreadLocalThread 类型的线程才会更快,如果是普通线程反而会更慢。
  2. FastThreadLocal 会浪费很大的空间吗?虽然 FastThreadLocal 采用的空间换时间的思路,但是在 FastThreadLocal 设计之初就认为不会存在特别多的 FastThreadLocal 对象,而且在数据中没有使用的元素只是存放了同一个缺省对象的引用,并不会占用太多内存空间。

以上是关于#yyds干货盘点# 深度揭秘Netty中的FastThreadLocal为什么比ThreadLoc的主要内容,如果未能解决你的问题,请参考以下文章

#yyds干货盘点#netty系列之:Bootstrap,ServerBootstrap和netty中的实现

#yyds干货盘点#netty系列之:channel,ServerChannel和netty中的实现

#yyds干货盘点# 基于Netty,20分钟手写一个RPC框架

#yyds干货盘点# Netty源码分析之Reactor线程模型详解

Netty 简介《Netty In Action》 #yyds干货盘点#

#yyds干货盘点#netty系列之:请netty再爱UDT一次