深入分析ConcurrentHashMap的源码设计

Posted 琦彦

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入分析ConcurrentHashMap的源码设计相关的知识,希望对你有一定的参考价值。

深入分析ConcurrentHashMap的源码设计

ConcurrentHashMap提供线程安全性和可伸缩性

可伸缩性指的是一个应用程序在工作负载和可用处理资源增加时其吞吐量的表现情况。

一个可伸缩的程序能够通过使用更多的处理器、内存或者I/O带宽来相应地处理更大的工作负载。

锁住某个共享的资源以获得独占式的访问这种做法会形成可伸缩性瓶颈――它使其他线程不能访问那个资源,即使有空闲的处理器可以调用那些线程也无济于事**。为了取得可伸缩性,我们必须消除或者减少我们对独占式资源锁的依赖。**

ConcurrentHashMap

​ ConcurrentHashMap是Java5中支持高并发、高吞吐量的线程安全HashMap实现,它由Segment数组结构和HashEntry数组结构组成。

ConcurrentHashMap源码原理分析

0 HashMap简介

  • HashMap是一个线程不安全的类,不能在多线程下使用
  • JDK1.7结构:数组+链表(采用拉链法)
  • JDK1.8结构:数组+链表/红黑树(链表长度要大于阈值8)

1 JDK1.7的ConcurrentHashMap的实现

  • JDK7中,ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,任然是数组+链表组成的拉链法
  • 每个Segment独立上ReentrantLock锁,每个Segment之间互不影响,提高了并发效率(Segment继承自ReentrantLock)
  • ConcurrentHashMap默认有16个segment,所以最多支持16个线程并发写(操作在不同的segment上时)。默认值在初始化的时候可以指定,但是一旦初始化过后,就不可以扩容。但是每个Segment内部是可以扩容的

哈希桶的size:

​ 因为size用位于运算来计算(ssize <<=1),所以Segment的大小取值都是以2的N次方,无关concurrencyLevel的取值,当然concurrencyLevel最大只能用16位的二进制来表示,即65536,换句话说,Segment的大小最多65536个,没有指定concurrencyLevel元素初始化,Segment的大小ssize默认为 DEFAULT_CONCURRENCY_LEVEL =16**;**

多个线程一起put时候,currentHashMap如何操作:

​ 对于ConcurrentHashMap的数据插入,这里要进行两次Hash去定位数据的存储位置,Segment实现了ReentrantLock,也就带有锁的功能,当执行put操作时,会进行第一次key的hash来定位Segment的位置,如果该Segment还没有初始化,即通过CAS操作进行赋值,然后进行第二次hash操作,找到相应的HashEntry的位置,这里会利用继承过来的锁的特性,在将数据插入指定的HashEntry位置时(链表的尾端),会通过继承ReentrantLock的tryLock()方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用tryLock()方法去获取锁,超过指定次数就挂起,等待唤醒。

计算size方式:

​ 1、使用不加锁的模式去尝试多次计算ConcurrentHashMap的size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的

​ 2、如果步骤一失败,他就会给每个Segment加上锁,然后计算ConcurrentHashMap的size返回(美团面试官的问题,多个线程下如何确定size)

2 JDK1.8的ConcurrentHashMap的源码分析

  • 根本没有借鉴JDK1.7,而是重写了一遍。。。
  • JDK1.8中的ConcurrentHashMap结构和1.8中的HashMap结构是相似的,也是数组+链表/红黑树(阈值也是8不过还要满足table.length>=MIN_TREEIFY_CAPACITY 这个值是64)

JDK1.8的改进

​ **改进一:**取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存数据,采用table数组元素作为锁,从而实现了对每一行数据进行加锁,进一步减少并发冲突的概率。

​ **改进二:**将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。

​ JDK1.8的实现已经摒弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap,虽然在JDK1.8中还能看到Segment的数据结构,但是已经简化了属性,只是为了兼容旧版本

总而言之,Java8中主要做了如下优化:

  1. 将Segment抛弃掉了,直接采用Node(继承自Map.Entry)作为table元素。
  2. 修改时,不再采用ReentrantLock加锁,直接用内置synchronized加锁,Java8的内置锁比之前版本优化了很多,相较ReentrantLock,性能不并差。
  3. size方法优化,增加CounterCell内部类,用于并行计算每个bucket的元素数量。

JDK1.8中,出现了较大的改动。没有使用段锁,改成了Node数组 + 链表 + 红黑树的方式。

其中有个重要的变量:sizeCtl

  • 负数表示正在进行初始化或者扩容,-1表示正在初始化,-N表示有N - 1个线程正在扩容
  • 正数0,表示还没有被初始化。其他正数表示下一次扩容的大小。

put()方法

假设table已经初始化完成,put操作采用CAS+synchronized实现并发插入或更新操作,具体实现如下。

public V put(K key, V value) 
    return putVal(key, value, false);


/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) 
    //不允许key、value为空
    if (key == null || value == null) throw new NullPointerException();
    //返回 (h ^ (h >>> 16)) & HASH_BITS;
    int hash = spread(key.hashCode());
    int binCount = 0;
    //循环,直到插入成功
    for (Node[] tab = table;;) 
        Node f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            //table为空,初始化table
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) 
            //索引处无值
            if (casTabAt(tab, i, null,
                         new Node(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        
        else if ((fh = f.hash) == MOVED)// MOVED=-1;
            //检测到正在扩容,则帮助其扩容
            tab = helpTransfer(tab, f);
        else 
            V oldVal = null;
            //上锁(hash值相同的链表的头节点)
            synchronized (f) 
                if (tabAt(tab, i) == f) 
                    if (fh >= 0) 
                        //遍历链表节点
                        binCount = 1;
                        for (Node e = f;; ++binCount) 
                            K ek;
                            // hash和key相同,则修改value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) 
                                oldVal = e.val;
                                //仅putIfAbsent()方法中onlyIfAbsent为true
                                if (!onlyIfAbsent)
                                //putIfAbsent()包含key则返回get,否则put并返回
                                    e.val = value;
                                break;
                            
                            Node pred = e;
                            //已遍历到链表尾部,直接插入
                            if ((e = e.next) == null) 
                                pred.next = new Node(hash, key,
                                                          value, null);
                                break;
                            
                        
                    
                    else if (f instanceof TreeBin) // 树节点
                        Node p;
                        binCount = 2;
                        if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                       value)) != null) 
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        
                    
                
            
            if (binCount != 0) 
                //判断是否要将链表转换为红黑树,临界值和HashMap一样也是8
                if (binCount >= TREEIFY_THRESHOLD)
                //若length<64,直接tryPresize,两倍table.length;不转树
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            
        
    
    addCount(1L, binCount);
    return null;

  1. hash算法
static final int spread(int h) 
    return (h ^ (h >>> 16)) & HASH_BITS;


  1. table中定位索引位置,n是table的大小
int index = (n - 1) & hash
  1. 获取table中对应索引的元素f。

采用Unsafe.getObjectVolatile来获取。在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是volatile修饰的,但不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。

  1. 如果f为null,说明table中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject方法插入Node节点。
  • 如果CAS成功,说明Node节点已经插入,break跳出,随后addCount(1L, binCount)方法会检查当前容量是否需要进行扩容。
  • 如果CAS失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。
  1. 如果f的hash值为-1,说明当前f是ForwardingNode节点,意味有其它线程正在扩容,则一起进行扩容操作。

  2. 其余情况把新的Node节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发,代码如上。

在节点f上进行同步,节点插入之前,再次利用tabAt(tab, i) == f判断,防止被其它线程修改。

  1. 如果f.hash >= 0,说明f是链表结构的头结点,遍历链表,如果找到对应的node节点,则修改value,否则在链表尾部加入节点。
  2. 如果f是TreeBin类型节点,说明f是红黑树根节点,则在树结构上遍历元素,更新或增加节点。
  3. 如果链表中节点数binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构。

链表转红黑树: treeifyBin()

treeifyBin 不一定就会进行红黑树转换,也可能是仅仅做数组扩容。我们还是看源码吧。

private final void treeifyBin(Node[] tab, int index) 
    Node b; int n, sc;
    if (tab != null) 
        // MIN_TREEIFY_CAPACITY 为 64
        // 所以,如果数组长度小于 64 的时候,其实也就是 32 或者 16 或者更小的时候,会进行数组扩容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 后面我们再详细分析这个方法
            tryPresize(n << 1);
        // b 是头结点
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) 
            // 加锁
            synchronized (b) 
                if (tabAt(tab, index) == b) 
                    // 下面就是遍历链表,建立一颗红黑树
                    TreeNode hd = null, tl = null;
                    for (Node e = b; e != null; e = e.next) 
                        TreeNode p =
                            new TreeNode(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    
                    // 将红黑树设置到数组相应位置中
                    setTabAt(tab, index, new TreeBin(hd));
                
            
        
    

扩容:tryPresize()

如果说 Java8 ConcurrentHashMap 的源码不简单,那么说的就是扩容操作和迁移操作。

这里的扩容也是做翻倍扩容,扩容后数组容量为原来的 2 倍。

// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了
private final void tryPresize(int size) 
    // c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) 
        Node<K,V>[] tab = table; int n;
        // 这个 if 分支和之前说的初始化数组的代码基本上是一样的
        if (tab == null || (n = tab.length) == 0) 
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) 
                try 
                    if (table == tab) 
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2); // 0.75 * n
                    
                 finally 
                    sizeCtl = sc;
                
            
        
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) 
            int rs = resizeStamp(n);
            if (sc < 0) 
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法
                // 此时 nextTab 不为 null
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            
            // 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2)
            // 调用 transfer 方法,此时 nextTab 参数为 null
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        
    

这个方法的核心在于 sizeCtl 值的操作,首先将其设置为一个负数,然后执行 transfer(tab, null),再下一个循环将 sizeCtl 加 1,并执行 transfer(tab, nt),之后可能是继续 sizeCtl 加 1,并执行 transfer(tab, nt)。

至于transfer()方法的源码这里我就不分析了,它的大概功能就是将原来的 tab 数组的元素迁移到新的 nextTab 数组中。

get()方法

get方法不用加锁。利用CAS操作,可以达到无锁的访问。

public V get(Object key) 
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) //tabAt(i),获取索引i处Node
        // 判断头结点是否就是我们需要的节点
        if ((eh = e.hash) == h) 
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        
        // 如果头结点的 hash<0,说明正在扩容,或者该位置是红黑树
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //遍历链表
        while ((e = e.next) != null) 
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        
    
    return null;

Node<K,V> find(int h, Object k) 
    Node<K,V> e = this;
    if (k != null) 
        do 
            K ek;
            if (e.hash == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
         while ((e = e.next) != null);
    
    return null;

get() 执行过程:

  1. 计算 hash 值

  2. 根据 hash 值找到数组对应位置: (n – 1) & h

  3. 根据该位置处结点性质进行相应查找

  • 如果该位置为 null,那么直接返回 null 就可以了
  • 如果该位置处的节点刚好就是我们需要的,返回该节点的值即可
  • 如果该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树
  • 如果以上 3 条都不满足,那就是链表,进行遍历比对即可

对比JDK1.7与1.8

  • 首先是数据结构上:

    • 1.7是segment数组,+HashEntry(类似HashMap的结构)
    • 1.8是数据+链表/红黑树与HashMap类似
  • 并发上:

    • 1.7是使用ReentrantLock锁住每个Segment
    • 1.8是使用CAS+synchronized
  • 为什么超过8要使用红黑树

    • 首先链表的结构存储要比红黑树存储节省空间,而链表在查询上又没有红黑树块
    • 这个时候就需要一个边界,源码作者做了一个**泊松分布运算,**在链表达到8时的概率已经非常小了。而链表长度为8时,查找费时也不大。概率只有千万分之几

    总而言之:

    1. 1.7的ReentrantLock+Segment+HashEntry,1.8中synchronized+CAS+HashEntry+红黑树,JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(链表首节点)。
    2. 1.7中put和 get 两次Hash到达指定的HashEntry,第一次hash到达Segment,第二次到达Segment里面的Entry,然后在遍历entry链表。1.8取消了segment,只需一次hash。
    3. ​ 1.7中计算size 先不加锁计算3次,如果不对再给每个segment加锁计算一次,在JDK1.8版本中,对于size的计算,在put的扩容和addCount()方法就已经计算好了,直接给你。
    4. ​ HashEntry最小容量为2,1.7中segment初始容量为16,1.8中Node节点转TreeNode的阈值为8;

线程安全问题

ConcurrenthashMap并发下单独操作的确是安全的,但是组合操作就未必了。所以如果在多线程情况下,有多步操作ConcurrenthashMap的时候需要额外留心

  • 如:如果要修改一个值:可以使用boolean replace(key, oldValue, newValue)来修改,而不是先get然后put, 这个方法类似于CAS的思想
  • 此外还有putIfAbsent(key, value) ,先判断有没有这个值,如果没有就put,有就取出来给你

参考链接:

https://juejin.cn/post/6844904069287378952

https://juejin.cn/post/6844903869714006029

以上是关于深入分析ConcurrentHashMap的源码设计的主要内容,如果未能解决你的问题,请参考以下文章

源码之ConcurrentHashMap

深入并发包 ConcurrentHashMap 源码解析

深入理解JAVA集合系列二:ConcurrentHashMap源码解读

深入理解JAVA集合系列二:ConcurrentHashMap源码解读

JDK核心源码深入剖析(synchronized和ConcurrentHashMap)

深入分析ConcurrentHashMap