十二ConcurrentHashMap的实现原理解析

Posted 学习JAVA的海贼王

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了十二ConcurrentHashMap的实现原理解析相关的知识,希望对你有一定的参考价值。

一、ConcurrentHashMap在多线程中的作用

  在并发编程中,如果使用HashMap那是线程不安全的会导致死循环,而线程安全的HashTable效率又特别低下,他把自己是所有操作都加了锁,虽然线程安全,但是获取不到锁的线程只能陷入阻塞,大大影响了性能。

  ConcurrentHashMap的优点是因为他采用了锁分段技术,将数据分成一段一段,通过给每一段来上锁,这样当两个线程过来,访问的是不同段的数据,就不会出现类似HashTable争抢锁而陷入阻塞的问题,因为两个线程获得的是不同的锁,大大提高了效率。

二、ConcurrentHashMap的结构

1、类图:

2、结构图:

3、结构总结:

  从上可知,ConcurrentHashMap通过维护内部的Segment数组,而每个Segment就是一个ReentrantLock并且存放了多个HashEntry键值对,这样就实现了锁分段技术。

三、ConcurrentHashMap构造分析:

1、构造方法,我们直接分析参数最全的构造方法,因为无论哪个构造方法最后都会调用这个

 //initialCapacity ConcurrentHashMap的容量 //loadFactor 每个Segment中的HashTable数量的负载因子 //concurrencyLevel 根据他来确定Segment数组的长度 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //用来确定Segment数组的长度,不能超过MAX_SEGMENTS,最大为16 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS;
int sshift = 0; int ssize = 1; //concurrencyLevel用来算出segment的长度,为了到时定位准确,那么 //segment的长度ssize的值为2的n次方,所以如果size的值为大于concurrenyLevel //的最小的那个数,例如concurrencyLevel为12,13,14,那么segment的长度为16 //也就是有16个锁 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } //在Segment的再次散列时使用 this.segmentShift = 32 - sshift; // 在Segment的再次散列时使用 this.segmentMask = ssize - 1; //ConcurrentHashMap的初始化容量 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; //每个segment存放HashEntry的个数,默认为2 int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; //原子操作,构造出这个类型的Segment数组。 UNSAFE.putOrderedObject(ss, SBASE, s0); //最后构造出ConcurrentHashMap中的Segment数组 this.segments = ss; } //Segment构造方法 Segment(float lf, int threshold, HashEntry<K,V>[] tab) { //负载因子 this.loadFactor = lf; //每个HashEntry实际存储的键值对个数 this.threshold = threshold; //每个HashEntry数组 this.table = tab; }

2、默认情况下:ConcurrentHashMap的Segment数组长度为16,每个Segment中存放的HashEntry为2,负载因子为0.75。

四、ConcurrentHashMap的操作

1、put操作的代码分析:

public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; //上述方法定位到了,要使用哪个Segment if ((s = (Segment<K,V>)UNSAFE.getObject  (segments, (j << SSHIFT) + SBASE)) == null)  // 当要使用的Segment为空时 s = ensureSegment(j); //难道获取的Segment进行添加 return s.put(key, hash, value, false); } /****如果获取到的Segment的位置,在数组中不存在这个方法对ConcurrentHashMap /***中的Segments进行扩容/ @SuppressWarnings("unchecked") private Segment<K,V> ensureSegment(int k) { //获取现在已经有的Segments数组 final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; //检查有没有存在,因为可能有别的线程先创建了 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //复制已经存在Segments数组中每个Segments的一些属性 Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; //在次检查,还是怕在这个过程中有线程已经创建 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //CAS保证安全的将创建的Segment放到Segments中去 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } //返回创建的Segment return seg; } /*********将值放入到Segment中的HashEntry中***********/ final V put(K key, int hash, V value, boolean onlyIfAbsent) { //在这个位置开始对当前Segment上锁,如果获取锁成功则=null //否则不停的去获取锁 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { //如果直接获取锁 HashEntry<K,V>[] tab = table; //根据&运算获取该存到HashEntry数组中的哪个HashEntry中 int index = (tab.length - 1) & hash; //响应位置的HashEntry HashEntry<K,V> first = entryAt(tab, index); //调用死循环一个节点节点的去对比 for (HashEntry<K,V> e = first;;) { if (e != null) { K k; //如果key值和value值已经hash值都相同 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; //! 仅设置缺省值=不设置缺省值 if (!onlyIfAbsent) { //直接设置值 e.value = value; ++modCount; } //打破循环 break; } //继续到下一个结点 e = e.next; } else { //进入到这段代码的条件是,确实有这个HashEntry但是HashEntry里面一个 //元素也没有 //如果node不等于null,则将node添加到HashEntry中 //这个地方设计的很巧妙,就是当这个线程尝试获取锁失败之后,他 //他并不是什么也不干,而是先构造出来这个node,然后再去尝试获取锁, //等自己获得成功获取锁之后,就不用在此构建这个node了 if (node != null) node.setNext(first); else //构造一个node node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //HashEntry的长度已经超过了现有的长度,hashEntry的长度小于要求的 //最大长度,进行扩容,每次扩容的长度是现在长度的2倍 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else //如果没有超过,则直接将当前node放入到HashEntry中 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; } //经典的计算hash算法 private int hash(Object k) { int h = hashSeed; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }

2、put操作的语言描述

上面排版比较乱,如果看不懂可以看语言描述

第一步:判断插入的值是否为空,如果为空则直接抛错,如果不为空进入第二步

第二步:通过Wang/Jenkins的hash算法,传入一个key,根据key来获得一个hash值

第三步:将这个hash值进行再次散列,获取到将要存入Segment数组中的哪个Segment数组中

第四步:如果获取到的Segment对象为空,表示数组中没有这个Segment,那么新增一个Segment,否则直接进入第五步

第五步:以为已经进入到Segment了,所以需要上锁,如果获取锁成功,那么直接进行第六步。否则死循环不断的尝试获取锁,并且在尝试过程中先去创建(这里叫预创建一个node),直到获取锁后,跳出循环,返回自己创建的node,进入第六步

第六步:在继续用&运算,算出将要存到HashEntry数组中的哪个HashEntry元素,将这个HashEntry元素取出,则循环判断添加的K,进入第七步

第七步-1:HashEntry不为空的情况下,看看是不是在HashEntry中已经存在,如果已经存在,根据是否设置缺省值这个字段确定是否替换掉旧值,返回旧值,进入第九步,如果不存在,进入到第七步-2。

第七步-2:如果HashEntry为空,则先判断自己是否创建了node(如果直接获取到了是不会预创建这个node的),如果node不为空的话,则直接将当前node存放在已经存在的HashEntry前面。如果为空构造一个node。进入到第八步

第八步:判断HashEntry是否需要扩容,如果需要扩容则将当前HashEntry扩为两倍,并重新进行散列计算。然后将老数据以及新数据重新放到数组中。如果不需要扩容,则直接将当前node放到相应位置的HashEntry中并更新,当前这个HashEntry数组中的对应位置的HashEntry元素

第九步:释放Segment的锁。

public V get(Object key) { Segment<K,V> s;  HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; //根据获取到的散列值,看看是否存在这个Segment if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { //存在的话,在继续散列HashEntry数组,获取数组中对应的HashEntry for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; //判断键为key的值,然后返回对应的value if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } //不存在的话返回null return null;    }

4、get操作的语言描述:

第一步:通过Wang/Jenkins的hash算法,传入一个key,根据key来获得一个hash值

第二步:将这个hash值进行再次散列,获取到将要存入Segment数组中的哪个Segment数组中

第三步:如果数组中这个位置没有Segment,则直接返回空,否则进入到第四步

第四步:因为是读,所以并没有上锁,根据Segment的散列值到HashEntry数组中进行在此散列,获取数组中对应位置的HashEntry

第五步:循环遍历这个HashEntry最后返回键值对K等于传入K的value值

5、size操作的代码:

public int size() {
final Segment<K,V>[] segments = this.segments; int size; boolean overflow; //每次算的sum值 long sum; //最后一次的size值 long last = 0L; int retries = -1; try { //死循环判断,每次对retries(尝试次数)+1 for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } //当他们两个相等时,则返回size //这种情况只发生对ConcurrentHashMap进行2次以内包括 //2次的无锁循环 //如果2次无锁循环的值都相等,则直接返回,否则进行有锁循环 if (sum == last) break; last = sum; } } finally { //当尝试次数已经超过允许的不上锁次数时,说明已经上锁了 //需要循环释放。 if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }

6、size操作的语言描述

第一步:获取当前ConcurrentHashMap中的Segment数组,定义一个循环次数计数器,一个每次的值,和一个最终值

第二步:尝试进行两次不上锁的循环获取size,如果相等则返回这个size,如果不相等,则进行加锁循环获取size。

第三歩:如果加锁了,则需要循环释放每个Segment的锁。

总结:获取size的方式就是先不加锁循环两次,比较两次的值是否相等,如果相等则直接返回,如果不相等,那么加锁重新循环一次。


以上是关于十二ConcurrentHashMap的实现原理解析的主要内容,如果未能解决你的问题,请参考以下文章

一文就懂ConcurrentHashMap实现原理

一文就懂ConcurrentHashMap实现原理

HashMap和ConcurrentHashMap实现原理及源码分析

Java ConcurrentHashMap工作原理及实现

ConcurrentHashmap原理

ConcurrentHashmap原理