2.Java集合-ConcurrentHashMap实现原理及源码分析
Posted 白日梦想家12138
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2.Java集合-ConcurrentHashMap实现原理及源码分析相关的知识,希望对你有一定的参考价值。
一、为何用ConcurrentHashMap
在并发编程中使用HashMap可能会导致死循环,而使用线程安全的HashTable效率又低下。
线程不安全的HashMap
在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap
效率低下的HashTable
Hashtable使用synchronized来保证线程的安全,但是在线程竞争激烈的情况下Hashtable的效率非常低下。当一个线程访问Hashtable的同步方法,其他方法访问Hashtable的同步方法时,会进入阻塞或者轮询状态。如果线程1使用put进行元素添加,线程2不但不能用put方法添加于元素同是也无法用get方法来获取元素,所以竞争越激烈效率越低。
ConcurrentHashMap的锁分段技术
Hashtable容器在竞争激烈的并发环境效率低下的原因是所有访问Hashtable的线程都必须竞争同一把锁,假如容器有多把锁,每一把锁用于锁住容器中一部分数据,那么多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问率,这就是ConcurrentHashMap的锁分段技术。将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一段数据的时候,其他段的数据也能被其他线程访问。
二、结构解析
ConcurrentHashMap和Hashtable主要区别就是围绕着锁的粒度以及如何锁,可以简单理解成把一个大的Hashtable分解成多个,形成了锁分离
感觉一个segment 就相当于一个 Hashtable
二、应用场景
当有一个大数组时需要在多个线程共享时就可以考虑是否把它给分层多个节点了,避免大锁。并可以考虑通过hash算法进行一些模块定位
其实不止用于线程,当设计数据表的事务时(事务事务某种意义上也是同步机制的体现),可以把一个表看成一个需要同步的数组,如果操作的表数据太多就可以考虑事务分离了(这也是为什么要避免大表的出现),比如把数据进行字段拆分、水平分表等
三、源码解读
从上图可以看出,ConcurrentHashMap内部分为很多个Segment,每一个Segment拥有一把锁,然后每个Segment(继承ReentrantLock)下面包含很多个HashEntry列表数据。对于一个key,需要经过三次(为什么要hash三次?后面解释)hash操作,才能最终定位这个元素的位置,这三次hash分别为:
1.对于一个key,先进行一次hash操作,得到hash值h1,也即h1 = hash1(key)
2.将得到的h1的高几位进行第二次hash,得到hash值h2,也即h2 = hash2(h1高几位),通过h2能够确定该元素放在哪个Segment
3.将得到的h1进行第三次hash,得到hash值h3,也即h3=hash3(h1),确定h3能够确定该元素放置在哪个HashEntry
注:在使用key定位Segment之前进行的那次hash操作,即第一次hash, 这次hash的主要目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义
ConcurrentHashMap核心方法特点阐述
ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap的实现技术是保证 HashEntry几乎是不可变的,为了确保读操作能够看到最新的值,将value设置成volatile;;下面是HashEntry的结构
1. static final class HashEntry<K,V> { 2. final K key; 3. final int hash; 4. volatile V value; 5. final HashEntry<K,V> next; 6. }
可以看到除了value 不是final的,其他值都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改 next 的引用值,所有的节点的修改只能从头部开始
对于put操作,可以一律添加到Hash链的头部但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操作时还会详述。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。
ConcurrentHashMap的数据成员及其作用阐述:
1 public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> 2 implements ConcurrentMap<K, V>, Serializable { 3 /** 4 * Mask value for indexing into segments. The upper bits of a 5 * key\'s hash code are used to choose the segment. 6 */ 7 final int segmentMask; 8 9 /** 10 * Shift value for indexing within segments. 11 */ 12 final int segmentShift; 13 14 /** 15 * The segments, each of which is a specialized hash table 16 */ 17 final Segment<K,V>[] segments; 18 }
所有的成员都是final的,其中segmentMask和segmentShift主要是为了定位段
1. final Segment<K,V> segmentFor(int hash) { 2. return segments[(hash >>> segmentShift) & segmentMask]; 3. }
每个Segment相当于一个小的 Hashtable,它的数据成员如下:
1. static final class Segment<K,V> extends ReentrantLock implements Serializable { 2. private static final long serialVersionUID = 2249069246763182397L; 3. /** 4. * The number of elements in this segment\'s region. 5. */ 6. transient volatile int count; 7. 8. /** 9. * Number of updates that alter the size of the table. This is 10. * used during bulk-read methods to make sure they see a 11. * consistent snapshot: If modCounts change during a traversal 12. * of segments computing size or checking containsValue, then 13. * we might have an inconsistent view of state so (usually) 14. * must retry. 15. */ 16. transient int modCount; 17. 18. /** 19. * The table is rehashed when its size exceeds this threshold. 20. * (The value of this field is always <tt>(int)(capacity * 21. * loadFactor)</tt>.) 22. */ 23. transient int threshold; 24. 25. /** 26. * The per-segment table. 27. */ 28. transient volatile HashEntry<K,V>[] table; 29. 30. /** 31. * The load factor for the hash table. Even though this value 32. * is same for all segments, it is replicated to avoid needing 33. * links to outer object. 34. * @serial 35. */ 36. final float loadFactor; 37. }
count用来统计该段数据的个数,它是volatile,它用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的:每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操作之前都要读取count的值。这利用了Java5中对volatile语义的增强,对同一个volatile变量的写和读存在happens-before关系。modCount统计段结构改变的次数,主要是为了检测对多个段是否发生改变,在讲述跨段操作时还会阐述。threashold用来表示需要进行rehash的界限值。table数组存储段中节点,每个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得读取到最新的table值而不需要同步。loadFactor表示负载因子。
先来看下删除操作remove(key)
1 1. public V remove(Object key) { 2 2. hash = hash(key.hashCode()); 3 3. return segmentFor(hash).remove(key, hash, null); 4 4. } 5 整个操作是先定位到段,然后委托给段的remove操作。当多个删除操作并发进行时,只要它们所在的段不相同,它们就可以同时进行。下面是Segment的remove方法实现: 6 1. V remove(Object key, int hash, Object value) { 7 2. lock(); 8 3. try { 9 4. int c = count - 1; 10 5. HashEntry<K,V>[] tab = table; 11 6. int index = hash & (tab.length - 1); 12 7. HashEntry<K,V> first = tab[index]; 13 8. HashEntry<K,V> e = first; 14 9. while (e != null && (e.hash != hash || !key.equals(e.key))) 15 10. e = e.next; 16 11. 17 12. V oldValue = null; 18 13. if (e != null) { 19 14. V v = e.value; 20 15. if (value == null || value.equals(v)) { 21 16. oldValue = v; 22 17. // All entries following removed node can stay 23 18. // in list, but all preceding ones need to be 24 19. // cloned. 25 20. ++modCount; 26 21. HashEntry<K,V> newFirst = e.next; 27 22. *for (HashEntry<K,V> p = first; p != e; p = p.next) 28 23. *newFirst = new HashEntry<K,V>(p.key, p.hash, 29 24. newFirst, p.value); 30 25. tab[index] = newFirst; 31 26. count = c; // write-volatile 32 27. } 33 28. } 34 29. return oldValue; 35 30. } finally { 36 31. unlock(); 37 32. } 38 33. }
整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点e
接下来,如果不存在这个节点就直接返回null,否则就将e前面的节点复制一遍,尾节点指向e的下一个节点。e后面的节点不需要复制,他们可以重用
示例图:删除e3这个节点
整个remove实现并不复杂,但是需要注意以下几点。第一,当要删除的节点存在时,删除的最后一步操作要将count的值 减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。第二,remove执行的开始就将table赋给一个局部变量tab,这是因为table是 volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。
接下来看put操作,同样地,put操作也是委托给段的put方法。下面是段的put方法:
1. V put(K key, int hash, V value, boolean onlyIfAbsent) { 2. lock(); 3. try { 4. int c = count; 5. if (c++ > threshold) // ensure capacity 6. rehash(); 7. HashEntry<K,V>[] tab = table; 8. int index = hash & (tab.length - 1); 9. HashEntry<K,V> first = tab[index]; 10. HashEntry<K,V> e = first; 11. while (e != null && (e.hash != hash || !key.equals(e.key))) 12. e = e.next; 13. 14. V oldValue; 15. if (e != null) { 16. oldValue = e.value; 17. if (!onlyIfAbsent) 18. e.value = value; 19. } 20. else { 21. oldValue = null; 22. ++modCount; 23. tab[index] = new HashEntry<K,V>(key, hash, first, value); 24. count = c; // write-volatile 25. } 26. return oldValue; 27. } finally { 28. unlock(); 29. } 30. }
该方法也是在持有段锁(锁定整个segment)的情况下执行的,这当然是为了并发的安全,修改数据是不能并发进行的,必须得有个判断是否超限的语句以确保容量不足时能够rehash。
接着找是否存在同样一个key的节点,如果存在就直接替换这个节点的值。否则创建一个新的结点并添加到hash链的头部,这时一定要修改modCount和count的值,同样修改count的值一定要放到最后一步
修改操作还有putAll和replace。putAll就是多次调用put方法,没什么好说的。replace甚至不用做结构上的更改,实现要比put和delete要简单得多。replace甚至不用做结构上的更改,实现要比put和delete要简单得多
获取操作
首先看下get操作,同样ConcurrentHashMap的get操作是直接委托给Segment的get方法,直接看Segment的get方法:
1. V get(Object key, int hash) { 2. if (count != 0) { // read-volatile 当前桶的数据个数是否为0 3. HashEntry<K,V> e = getFirst(hash); 得到头节点 4. while (e != null) { 5. if (e.hash == hash && key.equals(e.key)) { 6. V v = e.value; 7. if (v != null) 8. return v; 9. return readValueUnderLock(e); // recheck 10. } 11. e = e.next; 12. } 13. } 14. return null; 15. }
get操作不需要锁。第一步是访问count变量,这是一个volatile变量,由于所有的修改操作在进行结构修改时都会在最后一步写count变量,通过这种机制保证get操作能够得到几乎最新的结构更新。对于非结构更新,也就是结点值的改变,由于HashEntry的value变量是volatile的,也能保证读取到最新的值
接下来就是根据hash和key对hash链进行遍历找到要获取的结点,如果没有找到,直接返回null。hash链进行遍历不需要加锁的原因在于链指针next是final的。但是头指针却不是final的,这是通过getFirst(hash)方法返回,也就是存在 table数组中的值。这使得getFirst(hash)可能返回过时的头结点,例如,当执行get方法时,刚执行完getFirst(hash)之后,另一个线程执行了删除操作并更新头结点,这就导致get方法中返回的头结点不是最新的。这是可以允许,通过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步。
最后,如果找到了所求的结点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上结点的值不可能为空,这是因为 put的时候就进行了判断,如果为空就要抛NullPointerException。空值的唯一源头就是HashEntry中的默认值,因为 HashEntry中的value不是final的,非同步读取有可能读取到空值。仔细看下put操作的语句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被重新排序,这就可能导致结点的值为空。这里当v为空时,可能是一个线程正在改变节点,而之前的get操作都未进行锁定,根据bernstein条件,读后写或写后读都会引起数据的不一致,所以这里要对这个e重新上锁再读一遍,以保证得到的是正确值。
1. V readValueUnderLock(HashEntry<K,V> e) { 2. lock(); 3. try { 4. return e.value; 5. } finally { 6. unlock(); 7. } 8. }
另一个操作是containsKey,这个实现就要简单得多了,因为它不需要读取值:
1. boolean containsKey(Object key, int hash) { 2. if (count != 0) { // read-volatile 3. HashEntry<K,V> e = getFirst(hash); 4. while (e != null) { 5. if (e.hash == hash && key.equals(e.key)) 6. return true; 7. e = e.next; 8. } 9. } 10. return false; 11. }
get与containsKey两个方法几乎完全一致:他们都没有使用锁对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整,因此get和containsKey返回的可能是过时的数据,这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性,那么必须使用Collections.synchronizedMap()方法。
四、总结
JDK6、7中的ConcurrentHashMap主要使用Segment来 减小锁粒度,把HashMap分割成若干个segment,在put的时候需要锁住Segment,get时候不加锁,使用volatile来保证内存可见性,当要统计全局时(比如size),首先会尝试多次计算modCount来确定,这几次尝试中,是否有其他线程进行了修改操作,如果没有,直接返回size,如果有,则需要依次锁住所有的Segment来计算
jdk7中ConcurrentHashMap中,当长度过长碰撞会很频繁,链表的增删改查操作都会消耗很长时间,影响性能,所以jdk8中完全重写了ConcurrentHashMap,代码量从原来的1000多行变成了6000多行,实现上也和原来的分段式存储有很大区别
主要设计上的变化有以下几点:
1.它摒弃了锁分段的方式,而是启用了一种全新的方式实现,利用CAS算法
2.它沿用了与它同时期的HashMap版本的思想,底层由 "数组 + 链表 + 红黑树"的方式实现
3.为了做到并发,增加了很多辅助的类,例如:TreeBin,Traverser等对象内部类
参考:http://www.cnblogs.com/ITtangtang/p/3948786.html
以上是关于2.Java集合-ConcurrentHashMap实现原理及源码分析的主要内容,如果未能解决你的问题,请参考以下文章
201671010129 2016—2017—2 《Java程序设计》学习Java的集合的小结