JUCConcurrentHashMap源码分析

Posted remo0x

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUCConcurrentHashMap源码分析相关的知识,希望对你有一定的参考价值。

类注释

  • 取数方法没有加锁,所以会被存数方法影响
  • 聚合方法:size/isEmpty/containsValue,在没有被并发更新的情况下是准确的,但是存在并发更新时,上述聚合方法只是反映了map的一个瞬时状态,这种瞬时状态只能用于监测或估算,而不能用于程序控制
  • 和Hashtable一样,和HashMap相反,不允许使用null作为key或value

属性

常量

MAXIMUM_CAPACITY

  • map在确定数组下标时,采用的是(length-1)&hash的方式,只有当length为2的指数幂的时候才能较均匀的分布元素。所以map规定了其容量必须是2的n次方,使用位运算同时还提高了Java的处理速度
  • map内部由Entry[]数组构成,Java的数组下标是由int表示的。所以对于map来说其最大的容量应该是不超过int最大值的一个2的指数幂,而最接近int最大值的2的指数幂用位运算符表示就是1<<30

LOAD_FACTOR

  • 在构造函数中指定loadFactor只能影响初次构造的map的capacity,后续不会用到
  • 其实LOAD_FACTOR的值不会常用,因为直接用n-(n>>>2)即表示n的3/4,即n*LOAD_FACTOR

TREEIFY_THRESHOLD/UNTREEIFY_THRESHOLD

  • TREEIFY_THRESHOLD设置为8的原因:TreeNode占用空间是Node的两倍,而且结点数较少时,红黑树的查找效率跟链表相差不大,所以只在结点数量较多时用红黑树才能得到时间和空间上的tradeoff。在随机hash和LOAD_FACTOR的前提下,bins中的结点分布符合泊松分布,在一个bin中节点数达到8的概率是0.00000006,所以使用8作为阈值是很考究的。
  • UNTREEIFY_THRESHOLD设置为6的原因:使用6也是一个tradeoff,在TREEIFY_THRESHOLD使用8的前提下,如果UNTREEIFY_THRESHOLD使用7,则可能存在对同一个bin频繁插入和删除的操作就会导致bin频繁的转为红黑树和链表,如果UNTREEIFY_THRESHOLD使用6以下的值,那红黑树在空间和时间效率上并不比链表优良,所以使用6是最合适的

MIN_TRANSFER_STRIDE

  • 扩容操作中,transfer这个步骤是允许多线程的,这个常量表示一个线程执行transfer时,最少要对连续的16个hash桶进行transfer(不足16就按16算,多控制下正负号就行)。也就是单线程执行transfer时的最小任务量,单位为一个hash桶,这就是线程的transfer的步进(stride)
  • 最小值是DEFAULT_CAPACITY,不使用太小的值,避免太小的值引起transfer时线程竞争过多,如果计算出来的值小于此值,就使用此值。正常步骤中会根据CPU核心数目来算出实际的,一个核心允许8个线程并发执行扩容操作的transfer步骤,这个8是个经验值,不能调整的
  • 因为transfer操作不是IO操作,也不是死循环那种100%的CPU计算,CPU计算率中等,1核心允许8个线程并发完成扩容,理想情况下也算是比较合理的值。一段代码的IO操作越多,1核心对应的线程就要相应设置多点,CPU计算越多,1核心对应的线程就要相应设置少一些
  • 表明:默认的容量是16,也就是默认构造的实例,第一次扩容实际上是单线程执行的,看上去是可以多线程并发(方法允许多个线程进入),但是实际上其余的线程都会被一些if判断拦截掉,不会真正去执行扩容

MOVED/TREEBIN/RESERVED/HASH_BITS

  • MOVED:ForwardingNode的hash值,ForwardingNode是一种临时节点,在扩进行中才会出现,并且它不存储实际的数据。如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode。读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容
  • TREEBIN:TreeBin的hash值,TreeBin是ConcurrentHashMap中用于代理操作TreeNode的特殊节点,持有存储实际数据的红黑树的根节点。因为红黑树进行写入操作,整个树的结构可能会有很大的变化,这个对读线程有很大的影响,所以TreeBin还要维护一个简单读写锁,这是相对HashMap,这个类新引入这种特殊节点的重要原因
  • RESERVED:ReservationNode的hash值,ReservationNode是一个保留节点,就是个占位符,不会保存实际的数据,正常情况是不会出现的,在jdk1.8新的函数式有关的两个方法computeIfAbsent和compute中才会出现
  • HASH_BITS:用于和负数hash值进行&运算,将符号位置为0,将其转化为正数(绝对值不相等),Hashtable中定位hash桶也有使用这种方式来进行负数转正数

NCPU

  • CPU的核心数,用于在扩容时计算一个线程一次要干多少活

serialPersistentFields

  • 在序列化时使用,这是为了兼容以前的版本

变量

Node<K,V>[] table

  • Node数组,用volatile修饰,通过Unsafe方法读写

Node<K,V>[] nextTable

  • 扩容后的新的table数组,只有在扩容时才有用
  • nextTable != null,说明扩容方法还没有真正退出,一般可以认为是此时还有线程正在进行扩容,极端情况需要考虑此时扩容操作只差最后给几个变量赋值(包括nextTable = null)的这个大的步骤,这个大步骤执行时,通过sizeCtl经过一些计算得出来的扩容线程的数量是0

long baseCount

  • 计数器基本值,主要在没有碰到多线程竞争时使用,需要通过CAS进行更新

int sizeCtl

  • 非常重要的一个属性,源码中的英文翻译,直译过来是下面的四行文字的意思
    • sizeCtl = -1,表示有线程正在进行真正的初始化操作
    • sizeCtl = -(1 + nThreads),表示有nThreads个线程正在进行扩容操作
    • sizeCtl > 0,表示接下来的真正的初始化操作中使用的容量,或者初始化/扩容完成后的threshold
    • sizeCtl = 0,默认值,此时在真正的初始化操作中使用默认容量
  • 但是,通过我对源码的理解,这段注释实际上是有问题的,有问题的是第二句,sizeCtl = -(1 + nThreads)这个,网上好多都是用第二句的直接翻译去解释代码,这样理解是错误的。默认构造的16个大小的ConcurrentHashMap,只有一个线程执行扩容时,sizeCtl = -2145714174,但是照这段英文注释的意思,sizeCtl的值应该是-(1 + 1) = -2,sizeCtl在小于0时的确有记录有多少个线程正在执行扩容任务的功能,但是不是这段英文注释说的那样直接用-(1 + nThreads),实际中使用了一种生成戳,根据生成戳算出一个基数,不同轮次的扩容操作的生成戳都是唯一的,来保证多次扩容之间不会交叉重叠,当有n个线程正在执行扩容时,sizeCtl在值变为(基数 + n),1.8.0_111的源码的383-384行写了个说明:A generation stamp in field sizeCtl ensures that resizings do not overlap.

int transferIndex

  • 下一个transfer任务的起始下标index加上1之后的值,transfer时下标index从length - 1开始往0走。transfer时方向是倒过来的,迭代时是下标从小往大,二者方向相反,尽量减少扩容时transefer和迭代两者同时处理一个hash桶的情况,顺序相反时,二者相遇过后,迭代没处理的都是已经transfer的hash桶,transfer没处理的,都是已经迭代的hash桶,冲突会变少。
  • 下标在[nextIndex - 实际的stride (下界要 >= 0), nextIndex - 1]内的hash桶,就是每个transfer的任务区间,每次接受一个transfer任务,都要CAS执行transferIndex = transferIndex - 实际的stride,保证一个transfer任务不会被几个线程同时获取(相当于任务队列的size减1),当没有线程正在执行transfer任务时,一定有transferIndex <= 0,这是判断是否需要帮助扩容的重要条件(相当于任务队列为空)

int cellsBusy

  • CAS自旋锁标志位,用于初始化,或者counterCells扩容时

CounterCell[] counterCells

  • 用于高并发的计数单元,如果初始化了这些计数单元,那么跟table数组一样,长度必须是2^n的形式

KeySetView<K,V> keySet/ValuesView<K,V> values/EntrySetView<K,V> entrySet

  • 视图变量

方法

静态方法

int spread(int h)

  • 将高16位与低16位异或,并将符号位置为0
  • 这样做是因为在计算node的index时,是用2的幂作为掩码,所以只用低位进行计算,存在大量的碰撞,比如一些Float的值,所以将高位的影响扩散到低位,可以减少这种碰撞。同时,因为table的容量限制,hash中的高位在计算index时很难被用到
  • 处于对速度、效能和bit位扩散的质量的考虑,并且使用红黑树处理大量的碰撞,所以只是简单的将高位和低位进行异或就够了

int tableSizeFor(int c)

  • 返回大于输入参数且最近的2的整数次幂的数
    先分析有关n位操作部分
        假设n的二进制为01xxx...xxx。接着
        对n右移1位:001xx...xxx,再位或:011xx...xxx
        对n右移2为:00011...xxx,再位或:01111...xxx
        此时前面已经有四个1了,再右移4位且位或可得8个1
        同理,有8个1,右移8位肯定会让后八位也为1
        综上可得,该算法让最高位的1后面的位全变为1
        最后再让结果n+1,即得到了2的整数次幂的值了
    
    现在回来看看第一条语句:int n = cap - 1
        让cap-1再赋值给n的目的是令找到的目标值大于或等于原值。例如二进制1000,十进制数值为8
        如果不对它减1而直接操作,将得到答案10000,即16。显然不是结果。减1后二进制为111,再进行操作则会得到原来的数值1000,即8
    

table元素访问方法

  • <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
    • volatile语义获取table元素i
  • <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)
    • CAS将位置i的元素c替换为v
  • <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)
    • volatile语义将位置i的元素设置为v
  • 用Unsafe类实现这三个方法的原因是,Java的数组在元素层面上的设计缺失,无法表达元素是final和volatile的语义,所以使用getObjectVolatile补充volatile的语义,使用@Stable补充final的语义。数组元素本身和没有volatile修饰的字段一样,无法保证线程之间的可见性,只有触发happens-before关系的操作,才能保证线程之间的可见性。比如使用table[0] = new Object()直接赋值,这个赋值不会触发任何happens-before关系的操作,相当于对一个无volatile变量进行赋值一样

构造函数

  • ConcurrentHashMap()
  • ConcurrentHashMap(int initialCapacity)
  • ConcurrentHashMap(Map<? extends K, ? extends V> m)
  • ConcurrentHashMap(int initialCapacity, float loadFactor)
  • ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)
    • long size = (long)(1.0 + (long)initialCapacity / loadFactor);initialCapacity * loadFactor = resizeThreshold,所以这里的initialCapacity / loadFactor表示乘以loadFactory后得到的resizeThreshold就是initialCapacity,即size达到initialCapacity后就会进行扩容,因此如果放到map中的元素数量刚好是initialCapacity个,那就避免了扩容操作,而+1.0相当于是使用Math.ceil将浮点数向上取整,不过如果initialCapacity/loadFactor是正数,就会多出一个元素,再用tableSizeFor调整size,就得到了最合适的capacity。比如initalCapacity=16,loadFactor=0.75,则size=22

get

源码

public V get(Object key) 
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 重hash
    int h = spread(key.hashCode());
    // 先看bin是否有结点
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) 
        // bin的头结点就是要找的结点
        if ((eh = e.hash) == h) 
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        
        // hash值小于0,都是特殊节点,调用find方法查询
        // 包括已经迁移的结点、树节点、临时节点
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 链表查找
        while ((e = e.next) != null) 
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        
    
    return null;

put

源码

final V putVal(K key, V value, boolean onlyIfAbsent) 
    if (key == null || value == null) throw new NullPointerException();
    // 重hash
    int hash = spread(key.hashCode());
    // 统计链表长度
    int binCount = 0;
    // 自旋
    for (Node<K,V>[] tab = table;;) 
        Node<K,V> f; int n, i, fh;
        // table为空则初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果bin没有结点,则直接CAS放入,失败则自旋重试
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) 
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        
        // 如果是fwd结点,表示正在迁移,则当前线程参与迁移table
        // 迁移结束后,自旋重试
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 说明是链表或者红黑树
        else 
            V oldVal = null;
            // 对bin加锁
            synchronized (f) 
                // 检查是否仍然是之前的头结点
                if (tabAt(tab, i) == f) 
                    // 链表
                    if (fh >= 0) 
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) 
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) 
                                oldVal = e.val;
                                // onlyIfAbsent表示只有key不存在时才赋值,否则仍然为旧值
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            
                            Node<K,V> pred = e;
                            // 链表遍历完仍然没有key,则放入新结点到链表尾
                            if ((e = e.next) == null) 
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            
                        
                    
                    // 红黑树
                    else if (f instanceof TreeBin) 
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) 
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        
                    
                
            
            // 判断链表是否需要转为红黑树
            if (binCount != 0) 
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // 如果之前已经存在该key,则直接返回
                if (oldVal != null)
                    return oldVal;
                break;
            
        
    
    // 如果key之前不存在,则是新增结点,需要自增count
    addCount(1L, binCount);
    return null;

initTable()

private final Node<K,V>[] initTable() 
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) 
        // sizeCtl小于0表示已经有线程在初始化table了
        // 直接将cpu时间让出去,等他们初始化完,再自旋检查下
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // 将sizeCtl设置为-1,表示当前线程将要初始化table
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) 
            try 
                // 再次检查table是否仍然为空
                if ((tab = table) == null || tab.length == 0) 
                    // 如果sizeCtl>0,表示创建Map时指定了初始化容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 将sizeCtl置为当前容量的0.75,表示触发扩容的阈值
                    sc = n - (n >>> 2);
                
             finally 
                sizeCtl = sc;
            
            break;
        
    
    return tab;

treeifyBin(Node<K,V>[],int)

private final void treeifyBin(Node<K,V>[] tab, int index) 
    Node<K,V> b; int n, sc;
    if (tab != null) 
        // 如果table容量小于能支持红黑树的阈值,则只扩容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        // 判断索引位置的bin是链表结点
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) 
            // 对bin加锁
            synchronized (b) 
                if (tabAt(tab, index) == b) 
                    // 将所有结点转为双向链表
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) 
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    
                    // 创建TreeBin即红黑树,并放置在索引处
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                
            
        
    

tryPresize()

private final void tryPresize(int size) 
    // 给tableSizeFor传1.5倍size,可以理解为将0.75size是触发扩容的阈值
    // 而1.5size是下次触发扩容的阈值,所以tableSizeFor会返回该阈值对应的容量
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    // 自旋
    while ((sc = sizeCtl) >= 0) 
        Node<K,V>[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) 
            // sc>0表示创建Map时指定了capacity,这里用sc和c中最大值
            n = (sc > c) ? sc : c;
            // sizeCtl置为-1表示正在初始化
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) 
                try 
                    if (table == tab) 
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    
                 finally 
                    sizeCtl = sc;
                
            
        
        // 在table已经初始化的情况下,sc表示下次扩容的阈值
        // 这里c<sc,表示容量已经够了,不需要扩容,可能是其他线程已经扩容了
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        // 校验是否还是之前的table,是否有其他线程已经初始化了
        else if (tab == table) 
            int rs = resizeStamp(n);
            // sc<0,表示正在初始化或者正在扩容
            if (sc < 0) 
                Node<K,VJUCConcurrentHashMap源码分析

JUCConcurrentHashMap源码分析

RocketMQ 源码分析 —— 高可用

Java高并发系列之ReentrantReadWriteLock源码分析

高并发之——从源码角度分析创建线程池究竟有哪些方式

Tornado 高并发源码分析之一---启动一个web服务