ConcurrentHashMap

Posted 沧海一滴

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentHashMap相关的知识,希望对你有一定的参考价值。

ConcurrentHashMap实现原理

众所周知,哈希表是中非常高效,复杂度为O(1)的数据结构,在Java开发中,我们最常见到最频繁使用的就是HashMapHashTable,但是在线程竞争激烈的并发场景中使用都不够合理。

HashMap:先说HashMapHashMap是线程不安全的,在并发环境下,可能会形成环状链表(扩容时可能造成死循环,具体原因自行百度google或查看源码分析),导致get操作时,cpu空转,所以,在并发环境中使用HashMap是非常危险的。

HashTable HashTableHashMap的实现原理几乎一样,差别无非是
1.HashTable不允许keyvaluenull
2.HashTable是线程安全的。但是HashTable线程安全的策略实现代价却太大了,简单粗暴,get/put所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁,多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。


HashTable性能差主要是由于所有操作需要竞争同一把锁,而如果容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。这就是ConcurrentHashMap所采用的"分段锁"思想。

ConcurrentHashMap源码分析  

ConcurrentHashMap采用了非常精妙的"分段锁"策略,ConcurrentHashMap的主干是个Segment数组。

final Segment<K,V>[] segments;

Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不同Segment的数据进行操作是不用考虑锁竞争的。(就按默认的ConcurrentLeve16来讲,理论上就允许16个线程并发执行,有木有很酷)

 

所以,对于同一个Segment的操作才需考虑线程同步,不同的Segment则无需考虑。

Segment类似于HashMap,一个Segment维护着一个HashEntry数组

 

transient volatile HashEntry<K,V>[] table;

HashEntry是目前我们提到的最小的逻辑处理单元了。一个ConcurrentHashMap维护一个Segment数组,一个Segment维护一个HashEntry数组。

static final class HashEntry<K,V> {

        final int hash;

        final K key;

        volatile V value;

        volatile HashEntry<K,V> next;

        //其他省略

}    

我们说Segment类似哈希表,那么一些属性就跟我们之前提到的HashMap差不离,比如负载因子loadFactor,比如阈值threshold等等,看下Segment的构造方法


Segment(float lf, int threshold, HashEntry<K,V>[] tab) {

            this.loadFactor = lf;//负载因子

            this.threshold = threshold;//阈值

            this.table = tab;//主干数组即HashEntry数组

        }

我们来看下ConcurrentHashMap的构造方法

public ConcurrentHashMap(int initialCapacity,

                          float loadFactor, int concurrencyLevel) {

     if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)

         throw new IllegalArgumentException();

     //MAX_SEGMENTS 1<<16=65536,也就是最大并发数为65536

     if (concurrencyLevel > MAX_SEGMENTS)

         concurrencyLevel = MAX_SEGMENTS;

     //2sshif次方等于ssize,例:ssize=16,sshift=4;ssize=32,sshif=5

    int sshift = 0;

    //ssize segments数组长度,根据concurrentLevel计算得出

    int ssize = 1;

    while (ssize < concurrencyLevel) {

        ++sshift;

        ssize <<= 1;

    }

    //segmentShiftsegmentMask这两个变量在定位segment时会用到,后面会详细讲

    this.segmentShift = 32 - sshift;

    this.segmentMask = ssize - 1;

    if (initialCapacity > MAXIMUM_CAPACITY)

        initialCapacity = MAXIMUM_CAPACITY;

    //计算cap的大小,即SegmentHashEntry的数组长度,cap也一定为2n次方.

    int c = initialCapacity / ssize;

    if (c * ssize < initialCapacity)

        ++c;

    int cap = MIN_SEGMENT_TABLE_CAPACITY;

    while (cap < c)

        cap <<= 1;

    //创建segments数组并初始化第一个Segment,其余的Segment延迟初始化

    Segment<K,V> s0 =

        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),

                         (HashEntry<K,V>[])new HashEntry[cap]);

    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

    UNSAFE.putOrderedObject(ss, SBASE, s0);

    this.segments = ss;

}

初始化方法有三个参数,如果用户不指定则会使用默认值,initialCapacity16loadFactor0.75(负载因子,扩容时需要参考),concurrentLevel16

 

从上面的代码可以看出来,Segment数组的大小ssize是由concurrentLevel来决定的,但是却不一定等于concurrentLevelssize一定是大于或等于concurrentLevel的最小的2的次幂。比如:默认情况下concurrentLevel16,则ssize16;若concurrentLevel14ssize16;若concurrentLevel17,则ssize32。为什么Segment的数组大小一定是2的次幂?其实主要是便于通过按位与的散列算法来定位Segmentindex。至于更详细的原因,有兴趣的话可以参考我的另一篇文章《HashMap实现原理及源码分析》,其中对于数组长度为什么一定要是2的次幂有较为详细的分析。

接下来,我们来看看put方法

 public V put(K key, V value) {

        Segment<K,V> s;

        //concurrentHashMap不允许key/value为空

        if (value == null)

            throw new NullPointerException();

        //hash函数对keyhashCode重新散列,避免差劲的不合理的hashcode,保证散列均匀

        int hash = hash(key);

        //返回的hash值无符号右移segmentShift位与段掩码进行位运算,定位segment

        int j = (hash >>> segmentShift) & segmentMask;

        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck

             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment

            s = ensureSegment(j);

        return s.put(key, hash, value, false);

    }

从源码看出,put的主要逻辑也就两步:1.定位segment并确保定位的Segment已初始化 2.调用Segmentput方法。

关于segmentShiftsegmentMask

  segmentShiftsegmentMask这两个全局变量的主要作用是用来定位Segmentint j =(hash >>> segmentShift) & segmentMask

  segmentMask:段掩码,假如segments数组长度为16,则段掩码为16-1=15segments长度为32,段掩码为32-1=31。这样得到的所有bit位都为1,可以更好地保证散列的均匀性

  segmentShift2sshift次方等于ssizesegmentShift=32-sshift。若segments长度为16segmentShift=32-4=28;segments长度为32segmentShift=32-5=27。而计算得出的hash值最大为32位,无符号右移segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码segmentMask位运算来定位Segment


get/put方法

get方法

 public V get(Object key) {

        Segment<K,V> s;

        HashEntry<K,V>[] tab;

        int h = hash(key);

        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

        //先定位Segment,再定位HashEntry

        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&

            (tab = s.table) != null) {

            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile

                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);

                 e != null; e = e.next) {

                K k;

                if ((k = e.key) == key || (e.hash == h && key.equals(k)))

                    return e.value;

            }

        }

        return null;

    }

get方法无需加锁,由于其中涉及到的共享变量都使用volatile修饰,volatile可以保证内存可见性,所以不会读取到过期数据。

 

来看下concurrentHashMap代理到Segment上的put方法,Segment中的put方法是要加锁的。只不过是锁粒度细了而已。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {

            HashEntry<K,V> node = tryLock() ? null :

                scanAndLockForPut(key, hash, value);//tryLock不成功时会遍历定位到的HashEnry位置的链表(遍历主要是为了使CPU缓存链表),若找不到,则创建HashEntrytryLock一定次数后(MAX_SCAN_RETRIES变量决定),则lock。若遍历过程中,由于其他线程的操作导致链表头结点变化,则需要重新遍历。

            V oldValue;

            try {

                HashEntry<K,V>[] tab = table;

                int index = (tab.length - 1) & hash;//定位HashEntry,可以看到,这个hash值在定位Segment时和在Segment中定位HashEntry都会用到,只不过定位Segment时只用到高几位。

                HashEntry<K,V> first = entryAt(tab, index);

                for (HashEntry<K,V> e = first;;) {

                    if (e != null) {

                        K k;

                        if ((k = e.key) == key ||

                            (e.hash == hash && key.equals(k))) {

                            oldValue = e.value;

                            if (!onlyIfAbsent) {

                                e.value = value;

                                ++modCount;

                            }

                            break;

                        }

                        e = e.next;

                    }

                    else {

                        if (node != null)

                            node.setNext(first);

                        else

                            node = new HashEntry<K,V>(hash, key, value, first);

                        int c = count + 1;

//c超出阈值threshold,需要扩容并rehash。扩容后的容量是当前容量的2倍。这样可以最大程度避免之前散列好的entry重新散列,具体在另一篇文章中有详细分析,不赘述。扩容并rehash的这个过程是比较消耗资源的。

                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)

                            rehash(node);

                        else

                            setEntryAt(tab, index, node);

                        ++modCount;

                        count = c;

                        oldValue = null;

                        break;

                    }

                }

            } finally {

                unlock();

            }

            return oldValue;

        }

总结

ConcurrentHashMap作为一种线程安全且高效的哈希表的解决方案,尤其其中的"分段锁"的方案,相比HashTable的全表锁在性能上的提升非常之大。本文对ConcurrentHashMap的实现原理进行了详细分析,并解读了部分源码,希望能帮助到有需要的童鞋。

https://www.cnblogs.com/chengxiao/p/6842045.html

 

并发编程实践中,ConcurrentHashMap是一个经常被使用的数据结构,相比于Hashtable以及Collections.synchronizedMap()ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力,但同时降低了对读一致性的要求(这点好像CAP理论啊 O(_)O)。ConcurrentHashMap的设计与实现非常精巧,大量的利用了volatilefinalCASlock-free技术来减少锁竞争对于性能的影响,无论对于Java并发编程的学习还是Java内存模型的理解,ConcurrentHashMap的设计以及源码都值得非常仔细的阅读与揣摩。

这篇日志记录了自己对ConcurrentHashMap的一些总结,由于JDK678中实现都不同,需要分开阐述在不同版本中的ConcurrentHashMap

之前已经在ConcurrentHashMap原理分析中解释了ConcurrentHashMap的原理,主要是从代码的角度来阐述是源码是如何写的,本文仍然从源码出发,挑选个人觉得重要的点(会用红色标注)再次进行回顾,以及阐述ConcurrentHashMap的一些注意点。

 

1. JDK6JDK7中的实现

1.1 设计思路

ConcurrentHashMap采用了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个Map加锁的设计,分段锁大大的提高了高并发环境下的处理能力。但同时,由于不是对整个Map加锁,导致一些需要扫描整个Map的方法(如size(), containsValue())需要使用特殊的实现,另外一些方法(如clear())甚至放弃了对一致性的要求(ConcurrentHashMap是弱一致性的,具体请查看ConcurrentHashMap能完全替代HashTable吗?)。

ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMapJDK7JDK8HashMap的实现)的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLockSegment继承了ReentrantLock)。ConcurrentHashMap中的HashEntry相对于HashMap中的Entry有一定的差异性:HashEntry中的value以及next都被volatile修饰,这样在多线程读写过程中能够保持它们的可见性,代码如下:

static final class HashEntry<K,V> {

        final int hash;

        final K key;

        volatile V value;

        volatile HashEntry<K,V> next;

1.2 并发度(Concurrency Level

并发度可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。ConcurrentHashMap默认的并发度为16,但用户也可以在构造函数中设置并发度。当用户设置并发度时,ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度(假如用户设置并发度为17,实际并发度则为32)。运行时通过将key的高n位(n = 32 segmentShift)和并发度减1segmentMask)做位与运算定位到所在的SegmentsegmentShiftsegmentMask都是在构造过程中根据concurrency level被相应的计算出来。

如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。(文档的说法是根据你并发的线程数量决定,太多会导性能降低)

 

1.3 创建分段锁

JDK6不同,JDK7中除了第一个Segment之外,剩余的Segments采用的是延迟初始化的机制:每次put之前都需要检查key对应的Segment是否为null,如果是则调用ensureSegment()以确保对应的Segment被创建。

ensureSegment可能在并发环境下被调用,但与想象中不同,ensureSegment并未使用锁来控制竞争,而是使用了Unsafe对象的getObjectVolatile()提供的原子读语义结合CAS来确保Segment创建的原子性。代码段如下:

if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))

                == null) { // recheck

                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);

                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))

                       == null) {

                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))

                        break;

                }

}

1.4 put/putIfAbsent/putAll

JDK6一样,ConcurrentHashMapput方法被代理到了对应的Segment(定位Segment的原理之前已经描述过)中。与JDK6不同的是,JDK7版本的ConcurrentHashMap在获得Segment锁的过程中,做了一定的优化 - 在真正申请锁之前,put方法会通过tryLock()方法尝试获得锁,在尝试获得锁的过程中会对对应hashcode的链表进行遍历,如果遍历完毕仍然找不到与key相同的HashEntry节点,则为后续的put操作提前创建一个HashEntry。当tryLock一定次数后仍无法获得锁,则通过lock申请锁。

需要注意的是,由于在并发环境下,其他线程的putrehash或者remove操作可能会导致链表头结点的变化,因此在过程中需要进行检查,如果头结点发生变化则重新对表进行遍历。而如果其他线程引起了链表中的某个节点被删除,即使该变化因为是非原子写操作(删除节点后链接后续节点调用的是Unsafe.putOrderedObject(),该方法不提供原子写语义)可能导致当前线程无法观察到,但因为不影响遍历的正确性所以忽略不计。

之所以在获取锁的过程中对整个链表进行遍历,主要目的是希望遍历的链表被CPU cache所缓存,为后续实际put过程中的链表遍历操作提升性能。

在获得锁之后,Segment对链表进行遍历,如果某个HashEntry节点具有相同的key,则更新该HashEntryvalue值,否则新建一个HashEntry节点,将它设置为链表的新head节点并将原头节点设为新head的下一个节点。新建过程中如果节点总数(含新建的HashEntry)超过threshold,则调用rehash()方法对Segment进行扩容,最后将新建HashEntry写入到数组中。

put方法中,链接新节点的下一个节点(HashEntry.setNext())以及将链表写入到数组中(setEntryAt())都是通过UnsafeputOrderedObject()方法来实现,这里并未使用具有原子写语义的putObjectVolatile()的原因是:JMM会保证获得锁到释放锁之间所有对象的状态更新都会在锁被释放之后更新到主存,从而保证这些变更对其他线程是可见的。

 

1.5 rehash

相对于HashMapresizeConcurrentHashMaprehash原理类似,但是Doug Learehash做了一定的优化,避免让所有的节点都进行复制操作:由于扩容是基于2的幂指来操作,假设扩容前某HashEntry对应到Segment中数组的indexi,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此大多数HashEntry节点在扩容前后index可以保持不变。基于此,rehash方法中会定位第一个后续所有节点在扩容后index都保持不变的节点,然后将这个节点之前的所有节点重排即可。这部分代码如下:

private void rehash(HashEntry<K,V> node) {

           HashEntry<K,V>[] oldTable = table;

           int oldCapacity = oldTable.length;

           int newCapacity = oldCapacity << 1;

           threshold = (int)(newCapacity * loadFactor);

           HashEntry<K,V>[] newTable =

               (HashEntry<K,V>[]) new HashEntry[newCapacity];

           int sizeMask = newCapacity - 1;

           for (int i = 0; i < oldCapacity ; i++) {

               HashEntry<K,V> e = oldTable[i];

               if (e != null) {

                   HashEntry<K,V> next = e.next;

                   int idx = e.hash & sizeMask;

                   if (next == null)   //  Single node on list

                       newTable[idx] = e;

                   else { // Reuse consecutive sequence at same slot

                       HashEntry<K,V> lastRun = e;

                       int lastIdx = idx;

                       for (HashEntry<K,V> last = next;

                            last != null;

                            last = last.next) {

                           int k = last.hash & sizeMask;

                           if (k != lastIdx) {

                               lastIdx = k;

                               lastRun = last;

                           }

                       }

                       newTable[lastIdx] = lastRun;

                       // Clone remaining nodes

                       for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {

                           V v = p.value;

                           int h = p.hash;

                           int k = h & sizeMask;

                           HashEntry<K,V> n = newTable[k];

                           newTable[k] = new HashEntry<K,V>(h, p.key, v, n);

                       }

                   }

               }

           }

           int nodeIndex = node.hash & sizeMask; // add the new node

           node.setNext(newTable[nodeIndex]);

           newTable[nodeIndex] = node;

           table = newTable;

       }

1.6 remove

put类似,remove在真正获得锁之前,也会对链表进行遍历以提高缓存命中率。

 

1.7 getcontainsKey

getcontainsKey两个方法几乎完全一致:他们都没有使用锁,而是通过Unsafe对象的getObjectVolatile()方法提供的原子读语义,来获得Segment以及对应的链表,然后对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整,因此getcontainsKey返回的可能是过时的数据,这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性,那

以上是关于ConcurrentHashMap的主要内容,如果未能解决你的问题,请参考以下文章