ConcurrentHashMap

Posted luyuefei

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentHashMap相关的知识,希望对你有一定的参考价值。

ConcurrentHashMap是Java中的一个线程安全且高效的HashMap实现

在JDK1.7中ConcurrentHashMap采用了数组+Segment分段锁的方式实现。

1.Segment(分段锁) 初始容量 16

ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap的结构,即内部拥有一个Entry数组(最小为2),数组中的每个元素又是一个链表,同时又是一个ReentrantLock(Segment继承了ReentrantLock)。

2.内部结构

ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。如下图是ConcurrentHashMap的内部结构图:

技术图片

从上面的结构我们可以了解到,ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作。

第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部。

该结构的优劣势

坏处 :这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长

好处 :写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的Segment上)。

1.8使用Unsafe类的CAS自旋赋值+synchronized同步+LockSupport阻塞等手段实现的高效并发

put方法的逻辑

final V putVal(K key, V value, boolean onlyIfAbsent) {
   if (key == null || value == null) throw new NullPointerException();
   //1. 计算key的hash值
   int hash = spread(key.hashCode());
   int binCount = 0;
   for (Node<K,V>[] tab = table;;) {
       Node<K,V> f; int n, i, fh;
       //2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
       if (tab == null || (n = tab.length) == 0)
           tab = initTable();
       //3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
       else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
           if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
               break;                   // no lock when adding to empty bin
      }
       //4. 当前正在扩容
       else if ((fh = f.hash) == MOVED)
           tab = helpTransfer(tab, f);
       else {
           V oldVal = null;
           synchronized (f) {
               if (tabAt(tab, i) == f) {
                   //5. 当前为链表,在链表中插入新的键值对
                   if (fh >= 0) {
                       binCount = 1;
                       for (Node<K,V> e = f;; ++binCount) {
                           K ek;
                           if (e.hash == hash &&
                              ((ek = e.key) == key ||
                                (ek != null && key.equals(ek)))) {
                               oldVal = e.val;
                               if (!onlyIfAbsent)
                                   e.val = value;
                               break;
                          }
                           Node<K,V> pred = e;
                           if ((e = e.next) == null) {
                               pred.next = new Node<K,V>(hash, key,
                                                         value, null);
                               break;
                          }
                      }
                  }
                   // 6.当前为红黑树,将新的键值对插入到红黑树中
                   else if (f instanceof TreeBin) {
                       Node<K,V> p;
                       binCount = 2;
                       if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                      value)) != null) {
                           oldVal = p.val;
                           if (!onlyIfAbsent)
                               p.val = value;
                      }
                  }
              }
          }
           // 7.插入完键值对后再根据实际大小看是否需要转换成红黑树
           if (binCount != 0) {
               if (binCount >= TREEIFY_THRESHOLD)
                   treeifyBin(tab, i);
               if (oldVal != null)
                   return oldVal;
               break;
          }
      }
  }
   //8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容
   addCount(1L, binCount);
   return null;
}
?

大致过程:1、判断Node[]数组是否初始化,没有则进行初始化操作 2、通过hash定位数组的索引坐标,是否有Node节点,如果没有则使用CAS进行添加(链表的头节点),添加失败则进入下次循环。 3、检查到内部正在扩容,就帮助它一块扩容。 4、如果f!=null,则使用synchronized锁住f元素(链表/红黑树的头元素)。如果是Node(链表结构)则执行链表的添加操作;如果是TreeNode(树型结构)则执行树添加操作。 5、判断链表长度已经达到临界值8(默认值),当节点超过这个值就需要把链表转换为树结构 6、如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容

技术图片

技术图片

为了保证能够正确初始化,在第1步中会先通过if进行判断,若当前已经有一个线程正在初始化即sizeCtl值变为-1,这个时候其他线程在If判断为true从而调用Thread.yield()让出CPU时间片。正在进行初始化的线程会调用U.compareAndSwapInt方法将sizeCtl改为-1即正在初始化的状态。另外还需要注意的事情是,在第四步中会进一步计算数组中可用的大小即为数组实际大小n乘以加载因子0.75.可以看看这里乘以0.75是怎么算的,0.75为四分之三,这里n - (n >>> 2)是不是刚好是n-(1/4)n=(3/4)n,挺有意思的吧:)。如果选择是无参的构造器的话,这里在new Node数组的时候会使用默认大小为DEFAULT_CAPACITY(16),然后乘以加载因子0.75为12,也就是说数组的可用大小为12。

3.CAS关键操作

tabAt()该方法用来获取table数组中索引为i的Node元素 casTabAt()利用CAS操作设置table数组中索引为i的元素 setTabAt()该方法用来设置table数组中索引为i的元素

总结与思考

其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树,相对而言,总结如下思考

JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)
JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了
JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档
JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock,我觉得有以下几点
  因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了
  JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的关键字比使用API更加自然
  在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据

以上是关于ConcurrentHashMap的主要内容,如果未能解决你的问题,请参考以下文章

探索 ConcurrentHashMap 高并发性的实现机制

探索 ConcurrentHashMap 高并发性的实现机制

ConcurrentHashMap内存溢出问题

JDK1.8中的ConcurrentHashMap

Hashtable与ConcurrentHashMap入门——简单使用

并发编程——ConcurrentHashMap#addCount() 分析