深入浅出ConcurrentHashMap内部实现

Posted 彼岸教育Beacon

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入浅出ConcurrentHashMap内部实现相关的知识,希望对你有一定的参考价值。

ConcurrentHashMap可以说是目前使用最多的并发数据结构之一,作为如此核心的基本组件,不仅仅要满足我们功能的需求,更要满足性能的需求。而实现一个高性能的线程安全的HashMap也绝非易事。

ConcurrentHashMap作为JDK8的内部实现,一个成功的典范,有着诸多可以让我们学习和致敬的地方。

我全局在项目中搜索这个类的时候,发现大量项目代码和源码都用到了,为什么他会这么吃香呢?到底是道德的....呸。

下面我们就来扒一扒,ConcurrentHashMap的内部实现,来体会一下它的精妙之处吧!

ConcurrentHashMap的内部数据结构

在JDK8中, ConcurrentHashMap的内部实现发生了天翻地覆的变化。这里依据JDK8,来介绍一下ConcurrentHashMap的内部实现。

从静态数据结构上说,ConcurrentHashMap包含以下内容:

int sizeCtl

这是一个多功能的字段,可以用来记录参与Map扩展的线程数量,也用来记录新的table的扩容阈值

CounterCell[] counterCells

用来记录元素的个数,这是一个数组,使用数组来记录,是因为避免多线程竞争时,可能产生的冲突。使用了数组,那么多个线程同时修改数量时,极有可能实际操作数组中不同的单元,从而减少竞争。

Node<K,V>[] table

实际存放Map内容的地方,一个map实际上就是一个Node数组,每个Node里包含了key和value的信息。

Node<K,V>[] nextTable

当table需要扩充时,会把新的数据填充到nextTable中,也就是说nextTable是扩充后的Map。

以上就是ConcurrentHashMap的核心元素,其中最值得注意的便是Node,Node并非想象中如此简单,下面的图展示了Node的类族结构:

可以看到,在Map中的Node并非简单的Node对象,实际上,它有可能是Node对象,也有可能是一个Treebin或者ForwardingNode。

那什么时候是Node,什么时候是TreeBin,什么时候又是一个ForwardingNode呢?

其实在绝大部分场景中,使用的依然是Node,从Node数据结构中,不难看出,Node其实是一个链表,也就是说,一个正常的Map可能是长这样的:

上图中,绿色部分表示Node数组,里面的元素是Node,也就是链表的头部,当两个元素在数据中的位置发生冲突时,就将它们通过链表的形式,放在一个槽位中。

当数组槽位对应的是一个链表时,在一个链表中查找key只能使用简单的遍历,这在数据不多时,还是可以接受的,当冲突数据比较多少,这种简单的遍历就有点慢了。

因此,在具体实现中,当链表的长度大于等于8时,会将链表树状化,也就是变成一颗红黑树。如下图所示,其中一个槽位就变成了一颗树,这就是TreeBin(在TreeBin中使用TreeNode构造整科树)。

当数组容量快满时,即超过75%的容量时,数组还需要进行扩容,在扩容过程中,如果老的数组已经完成了复制,那么就会将老数组中的元素使用ForwardingNode对象替代,表示当前槽位的数据已经处理了,不需要再处理了,这样,当有多个线程同时参与扩容时,就不会冲突。


插播下,最近,很多朋友来咨询我,行业不景气,工作不顺利,又担心自己的35岁职业危机,要不要读一个在职计算机硕士。我的建议是,根据自己的情况决定,如果真的需要提升学历,那么在当前考研通过率低且备考难的情况下,可以考虑读个申请制入学、不需要备考的美国在线计算机硕士。毕业后获得和线下留学完全相同的全日制计算机硕士学位,学历这块敲门砖就有了,不用再担心学历低的问题。

下面是我给大家找的CSDN和美国伊利诺伊理工大学在线计算机硕士合作的报名资格自测链接,感兴趣的同学可以自己测测看,是否符合学校报名条件。

CSDNhttps://marketing.csdn.net/questions/Q2203171447515520151

put()方法的实现

现在来看一下作为一个HashMap最为重要的方法put():

  • public V put(K key, V value)

它负责将给定的key和value对存入HashMap,它的工作主要有以下几个步骤:

  1. 如果没有初始化数组,则尝试初始化数组

  2. 如果当前正在扩容,则参与帮助扩容(调用helpTransfer()方法)

  3. 将给定的key,value 放入对应的槽位

  4. 统计元素总数

  5. 触发扩容操作

根据以上主要4个步骤,来依次详细说明一下:

如果没有初始化数组,则尝试初始化数组

初始化数据会生成一个Node数组:

Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];

默认情况下,n为16。同时设置sizeCtl为·n - (n >>> 2); 这意味着sizeCtl为n的75%,表示Map的size,也就是说ConcurrentHashMap的负载因子是0.75。(为了避免冲突,Map的容量是数组的75%,超过这个阈值,就会扩容)

如果当前正在扩容,则参与帮助扩容

else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);

如果一个节点的hash是MOVE,则表示这是一个ForwardingNode,也就是当前正在扩容中,为了尽快完成扩容,当前线程就会参与到扩容的工作中,而不是等待扩容操作完成,如此紧密细致的操作,恰恰是ConcurrentHashMap高性能的原因。

而代码中的f.hash==MOVE语义上等同于f instanceof ForwardingNode,但是使用整数相等的判断的效率要远远高于instanceof,所以,这里也是一处对性能的极限优化。

将给定的key,value 放入对应的槽位

在大部分情况下,应该会走到这一步,也就是将key和value放入数组中。在这个操作中会使用大概如下操作:

Node<K,V> f;
synchronized (f) 
     if(所在槽位是一个链表)
         插入链表
     else if(所在槽位是红黑树)
         插入树
     if(链表长度大于8[TREEIFY_THRESHOLD])
         将链表树状化

可以看到,这使用了synchronized关键字,锁住了Node对象。由于在绝大部分情况下,不同线程大概率会操作不同的Node,因此这里的竞争应该不会太大。

并且随着数组规模越来越大,竞争的概率会越来越小,因此ConcurrentHashMap有了极好的并行性。

统计元素总数

为了有一个高性能的size()方法,ConcurrentHashMap使用了单独的方法来统计元素总数,元素数量统计在CounterCell数组中:

CounterCell[] counterCells;
@sun.misc.Contended static final class CounterCell 
    volatile long value;
    CounterCell(long x)  value = x; 

CounterCell使用伪共享优化,具有很高的读写性能。counterCells中所有的成员的value相加,就是整个Map的大小。这里使用数组,也是为了防止冲突。

如果简单使用一个变量,那么多线程累加一个计数器时,难免要有竞争,现在分散到一个数组中,这种竞争就小了很多,对并发就更加友好了。

累加的主要逻辑如下:

if (as == null || (m = as.length - 1) < 0 ||
    //不同线程映射到不同的数组元素,防止冲突
    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
    //使用CAS直接增加对应的数据
    !(uncontended =
      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)))
    //如果有竞争,在这里会重试,如果竞争严重还会将CounterCell[]数组扩容,以减少竞争

触发扩容操作

最后,ConcurrentHashMap还会检查是否需要扩容,它会检查当前Map的大小是否超过了阈值,如果超过了,还会进行扩容。

ConcurrentHashMap的扩容过程非常巧妙,它并没有完全打乱当前已有的元素位置,而是在数组扩容2倍后,将一半的元素移动到新的空间中。

所有的元素根据高位是否为1分为low节点和high节点:

//n是数组长度,数组长度是2的幂次方,因此一定是100 1000 10000 100000这种二进制数字
//这里将low节点串一起, high节点串一起
if ((ph & n) == 0)
    ln = new Node<K,V>(ph, pk, pv, ln);
else
    hn = new Node<K,V>(ph, pk, pv, hn);

接着,重新放置这些元素的位置:

//low节点留在当前位置
setTabAt(nextTab, i, ln);
//high节点放到扩容后的新位置,新位置距离老位置n
setTabAt(nextTab, i + n, hn);
//扩容完成,用ForwardingNode填充
setTabAt(tab, i, fwd);

下图显示了 从8扩充到16时的可能得一种扩容情况,注意,新的位置总是在老位置的后面n个槽位(n为原数组大小)

这样做的好处是,每个元素的位置不需要重新计算,进行查找时,由于总是会对n-1(一定是一个类似于1111 11111 111111这样的二进制数)按位与,因此,high类的节点自然就会出现在+n的位置上。

get()方法的实现

与put()方法相比,get()方法就比较简单了。步骤如下:

  1. 根据hash值 得到对应的槽位 (n - 1) & h

  2. 如果当前槽位第一个元素key就和请求的一样,直接返回

  3. 否则调用Node的find()方法查找

    1. 对于ForwardingNode  使用的是 ForwardingNode.find()

    2. 对于红黑树 使用的是TreeBin.find()

  4. 对于链表型的槽位,依次顺序查找对应的key

写在最后

ConcurrentHashMap可以说是并发设计的典范,在JDK8中,ConcurrentHashMap可以说是再一次脱胎换骨,全新的架构和实现带来了飞一般的体验(JDK7中的ConcurrentHashMap还是采用比较骨板的segment实现的),细细品读,还是有不少的收获。

他和HashMap的区别,优劣势对比,这也是常考的考点,所以大家不管是为了了解、工作还是面试,都应该好好的熟悉一下。


前面给大家提到的美国伊利诺伊理工大学,面向中国学生开放有人工智能硕士、计算机科学硕士、软件工程硕士、电气与计算机工程硕士,下面是人工智能硕士的课表。

毕竟系统化的学习前沿计算机领域的技术知识,可以帮助我们更快的成长。

​​​​​​​

有想提升学历的朋友,可以自己点击下方CSDN和学校合作链接测测看,是否符合院校报名条件。​​​​​​​​​​​​​​CSDNhttps://marketing.csdn.net/questions/Q2203171447515520151

深入浅出 ~ ConCurrentHashMap底层原理透析

ConcurrentHashMap&lt;K,V&gt; 继承了AbstractMap&lt;K,V&gt;,实现了ConcurrentMap&lt;K,V&gt;Serializable
说明:
AbstractMap&lt;K,V&gt;实现了基本的Map操作
ConcurrentMap&lt;K,V&gt;规范了对k-v的并发操作的方法

一、介绍

  1. ConCurrentHashMap是如何做到线程安全的?

    通俗来讲,ConcurrentHashMap&lt;K,V&gt;已经迭代了几个版本,我们先从JDK7说起,起初为了实现了对HashMap真正意义上的并发,在ConcurrentHashMap&lt;K,V&gt;引入了一个静态内部类Segment(段),而且ConcurrentHashMap&lt;K,V&gt;也聚合了一个Segment的成员变量数组来维护,每一个Segment数组的下标元素相当于一个HashMap,也就是一个HashEntry数组 + 每位下标元素构成一个HashEntry链的头结点(依旧保存值),每次存在线程安全的操作都会去使用该数组的其中一个下标,并锁住该下标(其他线程无法访问),而不同[下标之间的操作是不会相互影响的,也就不存在冲突的情况。
    Segment继承了ReentrantLock可重入锁),用法其实用到加锁和解锁,保证每一个线程使用前加锁,使用完成后释放锁。
    ConcurrentHashMapSegmentEntries的示意图

    更专业来讲,ConcurrentHashMap使用了分段锁Segment来解决线程安全。

    2. JDK7 的 get 和 put 以及扩容

    • get
      get 操作没有任何加锁,所以在ConCurrent是非常高效的。
      static final class HashEntry<K,V> 
      final int hash;
      final K key;
      volatile V value;
      volatile HashEntry<K,V> next;
      

      就算获取key之前value被改变了,由于volatile修饰了value变量,所以对内容是及时更新的,新增删除操作亦是如此。

      public V get(Object key) 
      Segment<K,V> s; // manually integrate access methods to reduce overhead
      HashEntry<K,V>[] tab;
      int h = hash(key);
      long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
      // Concurrent 初始化时会初始 Segment数组和 第 0 位下标即Segment[0](包括 HashEntry数组 的初始化)
      // 如果 segment 对应的 HashEntry 数组不为空,则往下遍历
      if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
      (tab = s.table) != null) 
      // 遍历 HashEntry 数组 哈希命中的下标元素(链表)
      for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
           (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
           e != null; e = e.next) 
          K k;
          // 如果找到了 key就返回对应的value,否则往下继续遍历链表
          // 这里前面匹配的是 基本数据类型,
          //如果是 复杂对象,则使用的是先 hash筛选桶下标,相等再去 比较 “值”
          if ((k = e.key) == key || (e.hash == h && key.equals(k)))
              return e.value;
      
      
      return null;
      
    • put
      ConcurrentHashMappost操作是不允许设置valuenull
      Concurrent初始化时会初始Segment数组和 第0位下标即Segment[0](其中包括HashEntry数组的初始化),所以这里会出现命中到另外15个未初始化的Segment锁段,即当前Segmentnull的情况
      public V put(K key, V value) 
      Segment<K,V> s;
      // 不允许对 value 设置为 null
      if (value == null)
      throw new NullPointerException();
      int hash = hash(key);
      int j = (hash >>> segmentShift) & segmentMask;
      // 没有找到,即说明未初始化
      if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
       (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
      // 初始化当前 Segment 对应的 HashEntry 数组
      s = ensureSegment(j);
      return s.put(key, hash, value, false);
      

      put细节

      final V put(K key, int hash, V value, boolean onlyIfAbsent) 
      // 尝试加锁
      HashEntry<K,V> node = tryLock() ? null :
      // 如果不成功, 进入 scanAndLockForPut 流程
      // 如果是多核 cpu 最多 tryLock 64 次, 进入 lock 流程
      // 在尝试期间, 还可以顺便看该节点在链表中有没有, 如果没有顺便创建出来
      scanAndLockForPut(key, hash, value);
      // 执行到这里 segment 已经被成功加锁, 可以安全执行
      V oldValue;
      try 
      HashEntry<K,V>[] tab = table;
      int index = (tab.length - 1) & hash;
      HashEntry<K,V> first = entryAt(tab, index);
      for (HashEntry<K,V> e = first;;) 
          if (e != null) 
              // 更新
              K k;
              if ((k = e.key) == key ||
                  (e.hash == hash && key.equals(k))) 
                  oldValue = e.value;
                  if (!onlyIfAbsent) 
                      e.value = value;
                      ++modCount;
                  
                  break;
              
              e = e.next;
          
          else 
              // 新增
              // 1) 之前等待锁时, node 已经被创建, next 指向链表头
              if (node != null)
                  node.setNext(first);
              else
                  // 2) 创建新 node
                  node = new HashEntry<K,V>(hash, key, value, first);
              int c = count + 1;
              // 3) 扩容
              // 必须 是 达到阈值 并且 HashEntry 数组还没被扩容,也就是长度依旧是旧值
              if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                  rehash(node);
              else
                  // tab已更新,即扩容已经完毕,将 node 作为链表头
                  setEntryAt(tab, index, node);
              ++modCount;
              count = c;
              oldValue = null;
              break;
          
      
       finally 
      unlock();
      
      return oldValue;
      
    • 扩容
      private void rehash(HashEntry<K,V> node) 
      // 每个 Segment 对应的一个 HashEntry<K,V>数组
      HashEntry<K,V>[] oldTable = table;
      // 扩容前 HashEntry<K,V>数组的长度
      int oldCapacity = oldTable.length;
      // 扩容后 HashEntry<K,V>数组的长度 = 扩容前 HashEntry<K,V>数组的长度 * 2
      int newCapacity = oldCapacity << 1;
      // 扩容阈值的元素数量,HashEntry<K,V>数组 中元素的个数到 某个数量(低于最大数)时的数量
      threshold = (int)(newCapacity * loadFactor);
      // 初始化一个新的 HashEntry 数组,每个下标表示的 HashEntry 都为 null,后面会用到这个 null 做头插法 put 元素
      HashEntry<K,V>[] newTable =
      (HashEntry<K,V>[]) new HashEntry[newCapacity];
      // hash 命中 & 操作要用的掩码(计算hash结果的长度)
      int sizeMask = newCapacity - 1;
      // 遍历 扩容前 HashEntry<K,V>数组
      for (int i = 0; i < oldCapacity ; i++) 
      HashEntry<K,V> e = oldTable[i];
      if (e != null) 
          HashEntry<K,V> next = e.next;
          int idx = e.hash & sizeMask;
          // 当前 HashEntry<K,V>数组 下标只有一个元素,则直接赋值给 新数组对应下标
          // 这里对应的新的下标跟 原下标已经不一样了,因为是按照 扩容后 的长度来计算得到的
          if (next == null)   //  Single node on list
              newTable[idx] = e;
          // 如果链表长度 大于等于两个元素以上
          else  // Reuse consecutive sequence at same slot
              HashEntry<K,V> lastRun = e;
              int lastIdx = idx;
              // 重用 链表最后面 新命中值 相同的
              for (HashEntry<K,V> last = next;
                   last != null;
                   last = last.next) 
                  int k = last.hash & sizeMask;
                  if (k != lastIdx) 
                      lastIdx = k;
                      lastRun = last;
                  
              
              // 将重用的链表头结点 转移到 新的 数组中
              newTable[lastIdx] = lastRun;
              // Clone remaining nodes
              // 链表剩下的 部分,挨个按照头插法 命中到 新的数组中 
              for (HashEntry<K,V> p = e; p != lastRun; p = p.next) 
                  V v = p.value;
                  int h = p.hash;
                  int k = h & sizeMask;
                  HashEntry<K,V> n = newTable[k];
                  newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
              
          
      
      
      // 最后再 把要 put 的值 头插法 加入到 新的数组中
      int nodeIndex = node.hash & sizeMask; // add the new node
      node.setNext(newTable[nodeIndex]);
      newTable[nodeIndex] = node;
      // 这里通过 暂时 数组操作完毕再 赋值,能够避免操作中途 被其他 线程再次重复操作
      // 在这之前 其他 线程无法进入
      // 一赋值 其他线程 就算判断 达到阈值,但newTable 的大小改变
      // tab.length < MAXIMUM_CAPACITY 也会让 其他线程直接去 新的数组 put 操作
      table = newTable;
      

      JDK8抛弃了Segment分段锁,转而使用CAS+synchronized来保证并发安全性。
      并且,不再单纯使用链表,JDK8HashEntry数组中的链表长度大于8的时候会去转化为红黑树结构。
      在这里,链表长度大于8时,才会去进一步判断阈值是否达到。
      JDK7扩容的区别在上一篇 深入浅出~HashMap的底层原理透析 详细讲过了,其实与ConCurrentHashMap大同小异,也就是多做了CAS的操作、链表转化红黑树和等待的put会帮忙扩容。

      二、附语

谢谢大家,我会继续努力,只为力争创作高质量的文章,分享给各位有需要的读者。
你们的阅读和评论是对作者最大的支持!
我的技术专栏:https://github.com/fyupeng

以上是关于深入浅出ConcurrentHashMap内部实现的主要内容,如果未能解决你的问题,请参考以下文章

数据结构 - ConcurrentHashMap 一步步深入

深入浅出 ~ ConCurrentHashMap底层原理透析

并发容器ConcurrentHashMap 深入解析(JDK1.6)

深度剖析ConcurrentHashMap源码

深度剖析ConcurrentHashMap源码

java并发包研究之-ConcurrentHashMap