JDK1.8中ConcurrentHashMap源码解析

Posted 小志的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK1.8中ConcurrentHashMap源码解析相关的知识,希望对你有一定的参考价值。

目录

一、ConcurrentHashMap使用场景

  • 我们平时最常用的HashMap其实不是线程安全的,而当我们有多线程使用场景的时候,即想线程安全,又想拥有Map的能力,我们可以选择HashTable,因为它是针对我们常用的方法上面加上了synchronize锁,但是在高并发的场景下,效率低是它的弊端。如果我们还非常在意效率,那么我们更好的选择是使用ConcurrentHashMap。

  • 示例:

    启动100个线程,每个线程循环100次,像容器中应该放入10000个元素。我们看到运行结果可以发现,HashMap并不是10000,这就说明,它在多线程并发的情况下,出现了线程不安全的问题。而ConcurrentHashMap返回的结果是没有问题的。

二、put方法的整体流程

1、put方法源码

 public V put(K key, V value) 
        return putVal(key, value, false);

2、在put方法中,调用了putVal方法。由于该方法代码较多,我们只保留框架性质的代码,这样会更方便我们理解。如下所示:

/** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) 
        ... ...
        for (Node<K,V>[] tab = table;;) 
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) 
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else 
                V oldVal = null;
                synchronized (f) 
 				... 针对f链表或红黑树进行添加Node节点操作,执行完毕后break ...  
 				  
                if (binCount != 0) 
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                
            
        
        addCount(1L, binCount);
        return null;
    

3、上面的代码,可以分为两部分内容,

(1)、第一部分:首先开启了无限循环,在里面进行了4中情况的判断。

  • 第一种情况:如果table数组需要被创建。
    如果table数组为null或者长度为0,则创建table数组。

  • 第二种情况:如果寻址后的位置没有被占用。
    创建Node节点,插入到这个位置

  • 第三种情况:如果寻址后的位置是正在迁移状态。
    加入到迁移大军中,帮助一起进行扩容迁移操作。

  • 第四种情况:其他情况
    将节点插入到链表中或者红黑树中。

(2)、 第二部分:执行addCount,将ConcurrentHashMap中存储的k,v总数+1。

  • 下面将骤一的对上述的四个情况和addCount步骤进行分析。

三、第一种情况(初始化table数组)

3.1、initTable源码如下:

 private final Node<K,V>[] initTable() 
        Node<K,V>[] tab; int sc;
        //table=null
        while ((tab = table) == null || tab.length == 0) 
            //sizeCtl =0
            //如果sizeCtl为-1,则表示table数组正在被别的线程初始化
            if ((sc = sizeCtl) < 0)
                /**
                * 其他竞争失败的线程,会在while循环中spin,直到table创建完毕
                * 才能跳出while循环
                */
                Thread.yield();
            /**
             * 如果我们是第一个执行初始化table数组,那么我们首先通过CAS把
             * sizeCtl设置为-1表明名正在初始化中。
             * 
             * 只有一条线程可以成功执行CAS操作,将 sizeCtl赋值为-1,其他竞争失败
             * 的线程,在外面spin,直到table创建完毕才能跳出while循环。
             */ 
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) 
                try 
                    //table=null
                    if ((tab = table) == null || tab.length == 0) 
                    	//sc =0 ,DEFAULT_CAPACITY=16
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        // n =16
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //n>>>2=10000>>>2=00100=4 (16/2/2=4)
                        /**sc= (4-1)/4n =0.75*n*/
                        sc = n - (n >>> 2); //sc =16-4 =12
                    
                 finally 
                    //sc=12, sizeCtl =12
                    sizeCtl = sc;
                
                break;
            
        
        //返回size为16的空的Node数组
        return tab;
    

3.2、sizeCtl

  • 如果sizeCtl为-1,则表示table数组正在被别的线程初始化。默认sizeCtl=0,当table数组初始化或者扩容完毕的时候,sizeCtl会表示扩容阈值。

  • 默认table数组的长度为16

3.3、流程解释

  • 我们通过源码可以看到,如果table没有被初始化完毕的话,那么会一直在while循环中,直到table数组初始化完毕:

    while ((tab = table) == null || tab.length == 0)
    
  • 假设现在有4条线程同时的要去创建table数组,那么当有一条线程已经优先开始初始化table数组操作的时候,sizeCtl就会被赋值为-1,那么其他线程就会执行Thread.yield()让出cpu,并继续while循环,然后再执行Thread.yield(),在那spin旋转,直到那个最早的线程创建好创建table数组之后,所有线程都会跳出while继续往下执行。

    private final Node<K,V>[] initTable() 
            Node<K,V>[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) 
                if ((sc = sizeCtl) < 0) 
                    Thread.yield(); // 其他竞争失败的线程,会在while循环中spin,直到table创建完毕才能跳出while循环
                
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) 
                	 
                   /**
                    * 只有一个线程可以成功执行CAS操作,将sizeCtl赋值为-1 
                    */
                   ...  此处代码省略 ...
                   /**
                    * 执行创建table数组操作
                    */
                   ...  此处代码省略 ...
                
            
            return tab; // 返回创建好的table数组
        
    

四、第二种情况(寻址后的位置没有被占用)

4.1、源码如下

/**
*目标位置为空,直接设置
*通过hash值对应的数组下标得到第一个节点;虽然table数组是线程间可见,但是数组元素未必。
*所以,以volatile读的方式来读取table数组中的元素,保证每次拿到的数组都是最新的
*i=[在数组中的位置]
*tabAt(tab, i = (n - 1) & hash)相当于tab[i]
*/
 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) //f =null i=[在数组中的位置]
   //如果该下标返回的节点为空,则直接通过cas将新的值封装成node插入即可;
   //如果cas失败,说明存在竞争,则进入下一次循环
    if (casTabAt(tab, i, null,
                   new Node<K,V>(hash, key, value, null)))
                       break; 
                     

  • 其中,关键点是,我们要理解tabAtcasTabAt这两个方法的实现逻辑,那么在了解这两个方法之前,我们需要先了解两个变量的含义,即:ABASEASHIFT

4.2、ABASE和ASHIFT 变量

  • 源码部分

     private static final long ABASE;
     static 
            try 
               ... 此处代码省略...
                Class<?> ak = Node[].class;
                /**
                *数组对象:对象头(8字节)* 指针(4字节)* 数组长度(4字节)=16,所以
                *基础偏移量为16
                *
                * public native int arrayBaseOffset(Class<?> var1);
                * 返回数组类型的[第一个元素]的偏移地址(基础偏移地址)。
                * 如果arrayIndexScale方法返回的比例因子不为0,可以结合基础偏移地址
                * 和比例因子访问数组的所有元素。
                * Unsafe类中已经初始化了很多类似的常量如ARRAY_BOOLEAN_BASE_OFFSET等。
                */
                ABASE = U.arrayBaseOffset(ak);//16
                /**
                * public native int arrayIndexScale(Class<?> var1);
                * 返回数组类型的[比例因子](其实就是【数组中元素偏移地址的增量】,
                * 因为数组中的元素的地址是连续的)。
                * 此方法不适用于数组类型为“narrow”类型的数组,“narrow”类型的数组类型
                * 使用此方法会返回0(这里narrow应该是狭义的意思,但是具体指哪些类型
                * 暂时不明确)。
                * Unsafe类中已经初始化了很多类似的常量如ARRAY_BOOLEAN_INDEX_SCALE等。
                **/
                int scale = U.arrayIndexScale(ak);//4
                if ((scale & (scale - 1)) != 0)// 4 & (4-1) =0100 & 0011 =0000
                    throw new Error("data type scale not a power of two");
                 /**
                 *Integer.numberOfLeadingZeros(int i) 
                 *给定一个int类型数据,返回这个数据的二进制串中从最左边算起连续的“0”的
                 *总数量。
                 *而ASHIFT其实就是将scale数值转换为[按位左移对应的数值],
                 *即:通过scale =4,name计算出ASHIFT=2,
                 *而N<<2其实就相当于N*2*2=N*4=N*scale 
                 *举例:
                 *   如果scale =8(十进制)=1000(二进制)
                 *   name计算出Integer.numberOfLeadingZeros(scale)=28
                 *   ASHIFT =31-28 =3
                 *   N<<3 =n*2*2*2=N*8=N*scale
                 */
                //ASHIFT=31-29=2
                ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
             catch (Exception e) 
                throw new Error(e);
            
        
    

4.3、tabAt 方法

  • 源码部分

  • 作用
    获得tab数组下标为i位置上的Node元素

  • 数组的寻址公式为
    a[i]_address = base_address + i*data_type_size,通过该方式可以获得对应下标为i的值,即:获得tab[i]的值。

  • public native Object getObjectVolatile(Object o, long offset);
    (1)、此方法和getObject功能类似,不过附加了volatile语义,也就是强制从主存中获取属性值
    (2)、类似的方法有getIntVolatile、getDoubleVolatile等等。
    (3)、这个方法要被使用的属性由volatile修饰,否则功能和getObject方法相同。
    (4)、offset= ((long)i << ASHIFT) + ABASE,表示从ABASE开始,计算第i个元素的偏移量。
    (5)、所以:tabAt(tab, i)就等同于tab[i]。

4.4、casTabAt 方法

  • 源码部分

  • 作用
    如果tab数组下标为i的Node元素是c,则将c修改为v。

  • public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
    (1)、针对Object对象进行CAS操作。即是对应Java变量引用var1,原子性地更新var1中偏移地址为offset的属性的值为var5,当且仅的偏移地址为offset的属性的当前值为var4才会更新成功返回true,否则返回false。
    (2)、var1:目标Java变量引用。
    (3)、var2:目标Java变量中的目标属性的偏移地址。
    (4)、var4:目标Java变量中的目标属性的期望的当前值。
    (5)、var5:目标Java变量中的目标属性的目标更新值。

  • 类似的方法有compareAndSwapInt和compareAndSwapLong,在Jdk8中基于CAS扩展出来的方法有getAndAddInt、getAndAddLong、getAndSetInt、getAndSetLong、getAndSetObject,它们的作用都是:通过CAS设置新的值,返回旧的值。

4.5、流程解释

  • 这部分属于判断中的第二种情况,通过hash值计算出来应该插入的下标i,如果这个位置是空的,即:还没有保存Node元素,那么就根据我们要put的key和value来创建一个新的Node,并插入到下标为i的位置上,即:

    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) 
        break;
    
    
  • 这段采用CAS来保证只有一个线程可以赋值成功。如果我们还是有A,B,C,D这4个线程都执行到了这个判断语句中,假设线程A第一个执行的这个CAS操作,那么只有它会执行成功,其余的3个线程(B,C,D)则会执行失败,casTabAt的结果为false。那么线程A会执行break语句跳出for循环,而其他三个线程会再次执行for循环,并执行到case4的代码段中。

五、第三种情况(如果寻址后的位置是正在迁移状态)

5.1、源码

  • 源码如下

    /**
    *插入元素遇到扩容,即MOVED =-1,表示插入元素时,无限循环内部正好在扩容;那么就
    *调用helpTransfer(tab, f)一起扩容;
    *由于是在for(Node<K,V>[] tab =table;;),所以当帮忙扩容完毕后,还会像新的
    *table中插入元素的。
    */
    else if ((fh = f.hash) == MOVED)
        tab = helpTransfer(tab, f);
    
    
  • MOVED变量的值是什么

  • 当我们通过hash寻址到了我们应该插入的下标为i的位置上,已经存在了Node f,并且这个f的hash值等于-1,说明当前这个下标为i的位置,正在执行移动操作。那么,我们会通过执行helpTransfer方法来协助其他线程进行扩容操作。详细操作我们来看helpTransfer方法的具体实现。

5.2、helpTransfer方法

  • 源码如下

  • helpTransfer方法作用
    参与扩容数据迁移的操作。

  • 源码解释
    (1)、f.nextTable存储的是扩容后新的table数组。
    (2)、int rs = resizeStamp(tab.length);返回的是旧数组的长度信息。
    (3)、sc = sizeCtl) < 0 说明当前还是在对旧表操作中的状态,即:扩容数据转移还在操作中,没有操作完毕。
    (4)、U.compareAndSwapInt(this, SIZECTL, sc, sc + 1);含义是,由于当前线程要帮忙去执行扩容和数据迁移操作,所以将总参与线程数+1。
    (5)、执行transfer操作。该方法具体操作请参考【5.3.6> transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) 】

六、其他情况( 将节点插入到链表中或者红黑树中。)

我们调用put方法的时候,此部分为最核心的处理逻辑了。因为当table表被初始化了,并且出现哈希冲突了,并且Node f这个位置并没有发生移动的情况下,都会走到这段代码段中。而这种情况又是最多发生的情况。所以,这部分我们要着重的仔细分析一番。

6.1、源码如下

6.2、流程解释

  • 上面代码一共可以分为3部分内容,已经都用红框标注好了,分别是:
  • 第一部分:向链表中插入Node的操作。
  • 第二部分:向红黑树中插入Node的操作。(红黑树本次不涉及)
  • 第三部分:扩容或者转换元素存储类型的操作。
  • 当然,前提是针对f进行synchronize加锁。通过这段代码,我们可以看得出,ConcurrentHashMap是针对具体某个下标的Node进行并发竞争加锁的。极大的避免了由于加锁导致的效率低下的问题。

6.3、第一部分:向链表中插入Node的操作

  • 源码如下

    if (fh >= 0) 
       binCount = 1;
        for (Node<K,V> e = f;; ++binCount) 
            K ek;
            if (e.hash == hash &&
                ((ek = e.key) == key ||
                 (ek != null && key.equals(ek)))) 
                oldVal = e.val;
                if (!onlyIfAbsent)
                    e.val = value;
                break;
            
            Node<K,V> pred = e;
            if ((e = e.next) == null) 
                pred.next = new Node<K,V>(hash, key,
                                          value, null);
                break;
            
        
    
    
  • 源码解释
    (1)、fh 表示Node f的hash值,如果大于等于0,则表示正常的Node节点。
    (2)、binCount=1对应链表中的第2个Node节点。
    (3)、从链表的头节点遍历到末尾节点:如果f节点的hash值与put的key的hash值相同,并且两个key值也是相同,那么如果onlyIfAbsent=false则将新的value值替换旧的value值,否则不替换value值。执行完毕后,break跳出循环; 遍历到末尾节点,依然没有找到key值且hash值相同的Node,则将新Node加入到链表末尾。执行完毕后,break跳出循环。

6.4、第三部分:扩容或者转换元素存储类型的操作

如果Node链表长度大于等于9,则执行treeifyBin方法进行扩容或者转换元素存储操作。

6.4.1 、treeifyBin方法

  • 源码部分

  • 如果table数组长度小于64,则执行扩容操作。

  • 如果数组长度大于等于64,则进行红黑树扩充。

6.4.2、 tryPresize方法

  • 源码如下

    首先,通过tableSizeFor根据size计算出2的n次方所有值中,所有大于size值中最小的那个值。具体的逻辑,参考【6.4.4> tableSizeFor(int c)】

  • 源码逻辑解释
    (1)、首先,正常情况下,sizeCtl表示table数组的阈值,所以肯定是大于0的。while循环里一共有3个判断逻辑:
    (2)第一个判断:table数组没有初始化完毕。这块就是创建table数组,没什么别的复杂逻辑。
    (3)、第二个判断:数组超过最大值,或者扩容发生越界。(MAXIMUM_CAPACITY=1<<30) 针对如上特殊情况,即直接break跳出循环。
    (4)、第三个判断:table还是那个table,这个过程中没有被其他线程重建过。

6.4.2.1、 第一个判断(table数组没有初始化完毕)
  • 源码如下

    if (tab == null || (n = tab.length) == 0) 
        n = (sc > c) ? sc : c;
        /** 将sizeCtl设置为-1 */
        if (U.compareAndSwapInt(this, SIZECTL, sc, -1))  
            try 
                if (table == tab) 
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = nt;
                    sc = n - (n >>> 2); /** 3/4*n */
                
             finally 
                /** 将sizeCtl设置为3/4的table数组长度 */
                sizeCtl = sc;
            
        
    
    
  • 执行过程如下所示
    (1)、首先,将sizeCtl赋值为-1,则表示针对table正在操作中。
    (2)、其次,创建table,且sc赋值为table数组的3/4长度。
    (3)、最后,将sizeCtl赋值为扩容阈值,表示针对table的操作已经执行完毕。

6.4.2.2、第二个判断(数组超过最大值,或者扩容发生越界)
  • 源码如下

6.4.2.3、第三个判断(table还是那个table,这个过程中没有被其他线程重建过

1、源码如下

2、resizeStamp方法的具体作用是返回table数组长度相关信息。源码如下所见

3、resizeStamp方法的解释

  • 其中,Integer.numberOfLeadingZeros(n)具体逻辑,请参考【6.4.5> numberOfLeadingZeros(int i)】
  • 我么可以举例上面方面发计算过程:
    假设n=16,那么二进制就是00010000,那么从左侧最高位开始计算,连续一共有27个0,那么Integer.numberOfLeadingZeros(16)就返回27。
    RESIZE_STAMP_BITS=16,那么1<<(RESIZE_STAMP_BITS - 1)=1<<(16-1)=1<<15
    我们在计算27 | 15,转换为二进制就是:00011011 | 00001000000000000000 = 00001000000000011011
  • 综上所述,resizeStamp返回的结构由三部分组成,就是:
    (1)、【高16位】16个0
    (2)、【第16位】1
    (3)、【第0~15位】"以二进制对table数组长度进行转换,然后计算从最左边算起连续的“0”的总数量"的二进制表现。

4、我们介绍完resizeStamp方法后,往下看if判断,sc表示sizeCtl,如果sc < 0,则说明table数组正在被其他线程操作着(比如:扩容)。

if (sc <以上是关于JDK1.8中ConcurrentHashMap源码解析的主要内容,如果未能解决你的问题,请参考以下文章

JDK1.8中的ConcurrentHashMap

多线程-ConcurrentHashMap(JDK1.8)

Java集合源码剖析——基于JDK1.8中ConcurrentHashMap的实现原理

Java集合源码剖析——基于JDK1.8中ConcurrentHashMap的实现原理

Java集合源码剖析——基于JDK1.8中ConcurrentHashMap的实现原理

JDK1.8 ConcurrentHashMap源码阅读