Java并发编程:并发容器之ConcurrentHashMap

Posted 阿玛尼迪迪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程:并发容器之ConcurrentHashMap相关的知识,希望对你有一定的参考价值。

  JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能。因为同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低。因此Java5.0开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包。与Vector和Hashtable、Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的并发容器主要解决了两个问题:

  1)根据具体场景进行设计,尽量避免synchronized,提高并发性。

  2)定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。

  util.concurrent中容器在迭代时,可以不封装在synchronized中,可以保证不抛异常,但是未必每次看到的都是“最新的、当前的”数据。

  下面是对并发容器的简单介绍:

  ConcurrentHashMap代替同步的Map(Conllections.synchronizedMap(new HashMap<>())),众所周知,HashMap是根据散列值分段存储的,同步Map在同步的时候锁住了所有的段,而ConcurrentHashMap加锁的时候根据散列值锁住了散列值锁对应的那段,因此提高了并发性能

  ConcurrentHashMap也增加了对常用复合操作的支持,比如“若没有则添加”:putIfAbsent(),替换:replace()。这2个操作都是原子操作。

  CopyOnWriteArrayList和CopyOnWriteArraySet分别代替List和Set,主要是在遍历操作为主的情况下来代替同步的List和同步的Set,这也就是上面所述的思路:迭代过程要保证不出错,除了加锁,另外一种方法就是“克隆”容器对象。

  ConcurrentLinkedQueue是一个先进先出的队列,它是非阻塞队列。

  ConcurrentSkipListMap可以在高效并发中替代SoredMap(例如用Collections.synchronizedMap包装的TreeMap)。

  ConcurrentSkipListSet可以在高效并发中替代SoredSet(例如用Collections.synchronizedSet包装的TreeSet)。

  大家都知道HashMap是非线程安全的,Hashtable是线程安全的,但是由于Hashtable是采用synchronized进行同步,相当于所有线程进行读写时都去竞争一把锁,导致效率非常低下。

  ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持得尽量的小,不用对整个ConcurrentHashMap加锁

  

ConcurrentHashMap的内部结构

  ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类哈希表的结构,Segment内部维护了一个链表数组,我们用下面这一幅图来看下ConcurrentHashMap的内部结构:

  从上面的结构可以了解到,ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部,因此这一种结构带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其它的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的Segment上),所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高

 

Segment

  我们再来具体了解一下Segment的数据结构:

1 static final class Segment<K,V> extends ReentrantLock implements Serializable {
2     transient volatile int count;
3     transient int modCount;
4     transient int threshold;
5     transient volatile HashEntry<K,V>[] table;
6     final float loadFactor;
7 }

  详细解释一下segment里面的成员变量的意义:

  • count:Segment中元素的数量,它是volatile,用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的,每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操作开始都要读取count值。这利用了Java 5中对volatile语义的增强,对同一个volatile变量的写和读存在happens-before关系(对一个volatile域的写,happens-before于任意后续对这个volatile域的读,即写操作的执行结果,对读操作可见)
  • modCount:统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变,在讲述跨段操作时还会讲述。
  • threshold:用来表示需要进行rehash的界限值,超过该阈值,则对Segment中数组的大小进行扩容。
  • table:table也是volatile,这使得能够读取到最新的table值而不需要同步。
  • loadFactor:表示负载因子,用于确定threshold。

HashEntry

  Segment中的元素是以HashEntry的形式存放在链表数组中的,看一下HashEntry的结构:

1 static final class HashEntry<K,V> {
2     final K key;
3     final int hash;
4     volatile V value;
5     final HashEntry<K,V> next;
6 }

  可以看到HashEntry的一个特点,除了value以外,其它的几个变量都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改next引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到HashEntry链的头部(next为final型,它的唯一一次赋值可以发生在构造方法中,即可以使用new HashEntry(...),在构造方法HashEntry(...)中放入key,hash,value,并将next赋值为原HashEntry链的头部。PS:太巧妙了TAT)。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除的节点的前面所有节点整个复制一遍(使用new HashEntry(...)在构造方法中将传入待复制的节点的key,hash,value,然后将next指向新节点),最后一个节点指向要删除节点的下一个节点。将value设置成volatile,这避免了加锁。

 

ConcurrentHashMap的初始化

  下面结合源代码来具体分析一下ConcurrentHashMap的实现,先看下初始化方法:

 1 public ConcurrentHashMap(int initialCapacity,
 2                          float loadFactor, int concurrencyLevel) {
 3     if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
 4         throw new IllegalArgumentException();
 5   
 6     if (concurrencyLevel > MAX_SEGMENTS)
 7         concurrencyLevel = MAX_SEGMENTS;
 8   
 9     // Find power-of-two sizes best matching arguments
10     int sshift = 0;
11     int ssize = 1;
12     // 保证ssize一定为2的指数个
13     // 如concurrencyLevel为11,12,13,14,15,16时,ssize都为16
14     while (ssize < concurrencyLevel) {  
15         ++sshift;   // 记录ssize左移的次数
16         ssize <<= 1;
17     }
18     // 这两个全局变量在定位segment时的哈希算法里需要使用
19     segmentShift = 32 - sshift; // 之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的
20     segmentMask = ssize - 1;    // 为哈希运算的掩码,等于ssize,保证其二进制位都是1
21     this.segments = Segment.newArray(ssize);
22   
23     if (initialCapacity > MAXIMUM_CAPACITY)
24         initialCapacity = MAXIMUM_CAPACITY;
25     int c = initialCapacity / ssize;    // c为ssize的倍数
26     if (c * ssize < initialCapacity)
27         ++c;
28     int cap = 1;
29     while (cap < c) // 如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方
30         cap <<= 1;
31   
32     for (int i = 0; i < this.segments.length; ++i)
33         this.segments[i] = new Segment<K,V>(cap, loadFactor); // segment的容量threshold = (int)cap*loadFactor

  ConcurrentHashMap的初始化一共三个参数,一个initialCapacity,表示初始的容量,一个loadFactor,表示负载参数,最后一个是concurrentLevel,代表ConcurrentHashMap内部的Segment的数量,concurrentLevel一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConcurrentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了

  整个ConcurrentHashMap的初始化方法还是非常简单的,先是根据concurrentLevel来new出Segment,这里Segment的数量是不大于concurrentLevel的最大的2的指数(while(ssize < concurrentLevel) {ssize <<= 1}),就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash,加快hash的过程。接下来就是根据initialCapacity确定Segment的容量大小,每一个Segment的容量大小也是2的指数,同样是为了加快hash的过程。

  这边需要特别注意两个变量:segmentShift和segmentMask,这两个变量在后面将会起很大的作用,假设构造函数确定了segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一。

 

ConcurrentHashMap的get操作

  前面提到过ConcurrentHashMap的get操作是不用加锁的,我们这里看一下其实现:

1 public V get(Object key) {
2     int hash = hash(key.hashCode());
3     return segmentFor(hash).get(key, hash);
4 }

  看第三行,segmentFor这个函数用来确定操作应该在哪一个segment中进行,几乎对ConcurrentHashMap的所有操作都需要用到这个函数,我们来看下这个函数的实现:

1 final Segment<K,V> segmentFor(int hash) {
2     return segments[(hash >>> segmentShift) & segmentMask];
3 }

  这个函数用了位操作来确定Segment,根据传入的hash值向右无符号右移segmentShift位,然后和segmentMask进行与操作,综合我们之前说的segmentShift和segmentMask的值,就可以得出如下结论:假设Segment的数量是2的n次方,根据元素的hash值的高n位就可以确定元素到底在哪一个Segment中

  在确定了需要在哪一个segment中进行操作后,接下来的事情就是调用对应的segment的get方法:

 1 V get(Object key, int hash) {
 2     if (count != 0) { // read-volatile
 3         HashEntry<K,V> e = getFirst(hash);
 4         while (e != null) {
 5             if (e.hash == hash && key.equals(e.key)) {
 6                 V v = e.value;
 7                 if (v != null)
 8                     return v;
 9                 return readValueUnderLock(e); // recheck
10             }
11             e = e.next;
12         }
13     }
14     return null;
15 }

  先看第二行代码,这里对count进行了一次判断,其中count表示该Segment中包含的元素的数量,我们可以来看一下count的定义:

transient volatile int count;

  可以看到count是volatile的,实际上这里面利用了volatile的语义:

  “对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作。”

  因为实际上put、remove等操作也会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前,可就保证了写操作对后续的读操作都是可见的。

  通过这种机制来保证get操作能够得到几乎最新的结构更新。对于非结构更新(也就是结点值的改变),由于HashEntry的value变量是volatile的,也能保证读取到“最新”的值。接下来就是对hash链进行遍历找到要获取的节点,如果没有找到,直接返回null。对hash链进行遍历不需要加锁的原因在于链指针next是final的,但是头指针却不是final的,头指针是通过getFirst(hash)方法返回的

1 HashEntry<K,V> getFirst(int hash) {
2     HashEntry<K,V>[] tab = table;
3     return tab[hash & (tab.length - 1)];
4 }

  也就是存在table数组中的值。这使得getFirst(hash)可能返回过时的头节点。例如,当执行get方法时,刚执行完getFirst(hash)之后,另一个线程执行了删除操作并更新头结点,这就导致get方法中返回的头结点不是最新的。这是可以允许的,通过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步

  最后,如果找到了所求的节点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上节点的值不可能为空,这是因为put的时候就进行了判断,如果为空就抛出NullPointerException。空值的唯一源头就是HashEntry中的默认值,因为HashEntry中的value不是final的,非同步读取有可能读取到空值。仔细看下put操作的语句:tab[index] = new HashEntry<K,V>(key,hash,first,value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被重新排序,这就可能导致结点的值为空(HashEntry对象构造好了,但对value的赋值还未完成,此时取到其默认值空)。这种情况应当很罕见,一旦发生这种情况,ConcurrentHashMap采取的方式是在持有锁的情况下再读一遍,这能够保证读到最新的值,并且一定不会为空值。

1 V readValueUnderLock(HashEntry<K,V> e) {
2     lock();
3     try {
4         return e.value;
5          } finally {
6                 unlock();
7             }
8 }

  

ConcurrentHashMap的put操作

  看完了get操作,再看下put操作,put操作的前面也是确定Segment的过程,这里不再赘述,直接看关键的segment的put方法:

 1 V put(K key, int hash, V value, boolean onlyIfAbsent) {
 2     lock();
 3     try {
 4         int c = count;
 5         if (c++ > threshold) // ensure capacity
 6             rehash();
 7         HashEntry<K,V>[] tab = table;
 8         int index = hash & (tab.length - 1);
 9         HashEntry<K,V> first = tab[index];
10         HashEntry<K,V> e = first;
11         while (e != null && (e.hash != hash || !key.equals(e.key)))
12             e = e.next;
13   
14         V oldValue;
15         if (e != null) {
16             oldValue = e.value;
17             if (!onlyIfAbsent)
18                 e.value = value;
19         }
20         else {
21             oldValue = null;
22             ++modCount;
23             tab[index] = new HashEntry<K,V>(key, hash, first, value);
24             count = c; // write-volatile
25         }
26         return oldValue;
27     } finally {
28         unlock();
29     }
30 }

  该方法是在持有段锁的情况下执行的,在第五行,如果Segment中元素的数量超过了阈值(由构造函数中的loadFactor算出)就需要对segment进行扩容,并且要进行rehash,关于rehash的过程大家可以自己去了解,这里不详细讲了。

  第8行和第9行的操作就是getFirst的过程,确定链表头部的位置。

  第11行这里的这个while循环是在链表中寻找和要put的元素相同key的元素,如果找到,就直接更新key的value,如果没有找到,则进入21行这里,生成一个新的HashEntry并且把它加到整个segment的头部,然后再更新count值。

 

ConcurrentHashMap的remove操作

  remove操作的前面一部分和前面的get、put操作一样,都是定位segment的过程,然后再调用segment的remove方法:

 1 V remove(Object key, int hash, Object value) {
 2     lock();
 3     try {
 4         int c = count - 1;
 5         HashEntry<K,V>[] tab = table;
 6         int index = hash & (tab.length - 1);
 7         HashEntry<K,V> first = tab[index];
 8         HashEntry<K,V> e = first;
 9         while (e != null && (e.hash != hash || !key.equals(e.key)))
10             e = e.next;
11   
12         V oldValue = null;
13         if (e != null) {
14             V v = e.value;
15             if (value == null || value.equals(v)) {
16                 oldValue = v;
17                 // All entries following removed node can stay
18                 // in list, but all preceding ones need to be
19                 // cloned.
20                 ++modCount;
21                 HashEntry<K,V> newFirst = e.next;
22                 for (HashEntry<K,V> p = first; p != e; p = p.next)
23                     newFirst = new HashEntry<K,V>(p.key, p.hash,
24                                                   newFirst, p.value);
25                 tab[index] = newFirst;
26                 count = c; // write-volatile
27             }
28         }
29         return oldValue;
30     } finally {
31         unlock();
32     }
33 }

  整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点e。接下来,如果不存在这个节点就直接返回null,否则就要将e前面的节点复制一遍,尾节点指向e的下一个节点。e后面的节点不需要复制,它们可以重用。(之前说过HashEntry中的next是final的,一经赋值以后就不可修改,所以只能通过复制来达成目的)如下图所示:

 

 

  

   注意,复制的节点中,值为2的节点在前面,值为1的节点在后面,也就是刚好和原来节点顺序相反

  整个remobe实现并不复杂,但是需要注意如下几点:第一,当要删除的节点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。第二,remove执行的开始就将table赋值给一个局部变量tab,这是因为table是volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。

 

跨段操作

  有些操作需要涉及到多个段,比如说size(),containsValue()。先来看下size()方法:

 1 public int size() {
 2     final Segment<K,V>[] segments = this.segments;
 3     long sum = 0;
 4     long check = 0;
 5     int[] mc = new int[segments.length];
 6     // Try a few times to get accurate count. On failure due to
 7     // continuous async changes in table, resort to locking.
 8     for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
 9         check = 0;
10         sum = 0;
11         int mcsum = 0;
12         for (int i = 0; i < segments.length; ++i) {
13             sum += segments[i].count;
14             mcsum += mc[i] = segments[i].modCount;
15         }
16         if (mcsum != 0) {
17             for (int i = 0; i < segments.length; ++i) {
18                 check += segments[i].count;
19                 if (mc[i] != segments[i].modCount) {
20                     check = -1; // force retry
21                     break;
22                 }
23             }
24         }
25         if (check == sum)
26             break;
27     }
28     if (check != sum) { // Resort to locking all segments
29         sum = 0;
30         for (int i = 0; i < segments.length; ++i)
31             segments[i].lock();
32         for (int i = 0; i < segments.length; ++i)
33             sum += segments[i].count;
34         for (int i = 0; i < segments.length; ++i)
35             segments[i].unlock();
36     }
37     if (sum > Integer.MAX_VALUE)
38         return Integer.MAX_VALUE;
39     else
40         return (int)sum;
41 }

  size方法主要思路是先在没有锁的情况下对所有段大小求和,如果不能成功(这是因为遍历过程中可能有其它线程正在对已经遍历过的段进行结构性更新),最多执行RETRIES_BEFORE_LOCK次,如果还不成功就在持有所有段锁的情况下再对所有段大小求和。在没有锁的情况下主要是利用Segment中的modCount进行检测,在遍历过程中保存每个Segment的modCount,遍历完成之后再检测每个Segment的modCount有没有改变,如果有改变表示有其它线程正在对Segment进行结构性并发更新,需要重新计算。

  size()的实现还有一点需要注意,必须要先segments[i].count,才能segments[i].modCount,这是因为segment[i].count是对volatile变量的访问,接下来segments[i].modCount才能得到几乎最新的值(前面我已经说了为什么只是“几乎”了)。这点在containsValue方法中得到了淋漓尽致的展现:

 1 public boolean containsValue(Object value) {
 2     if (value == null)
 3         throw new NullPointerException();
 4 
 5     // See explanation of modCount use above
 6 
 7     final Segment<K,V>[] segments = this.segments;
 8     int[] mc = new int[segments.length];
 9 
10     // Try a few times without locking
11     for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
12         int sum = 0;
13         int mcsum = 0;
14         for (int i = 0; i < segments.length; ++i) {
15             int c = segments[i].count;
16             mcsum += mc[i] = segments[i].modCount;
17             if (segments[i].containsValue(value))
18                 return true;
19         }
20         boolean cleanSweep = true;
21         if (mcsum != 0) {
22             for (int i = 0; i < segments.length; ++i) {
23                 int c = segments[i].count;
24                 if (mc[i] != segments[i].modCount) {
25                     cleanSweep = false;
26                     break;
27                 }
28             }
29         }
30         if (cleanSweep)
31             return false;
32     }
33     // Resort to locking all segments
34     for (int i = 0; i < segments.length; ++i)
35         segments[i].lock();
36     boolean found = false;
37     try {
38         for (int i = 0; i < segments.length; ++i) {
39             if (segments[i].containsValue(value)) {
40                 found = true;
41                 break;
42             }
43         }
44     } finally {
45         for (int i = 0; i < segments.length; ++i)
46             segments[i].unlock();
47     }
48     return found;
49 }

  注意内层的第一个for循环,里面有语句int c = segments[i].count;但是c却从来没有被使用过,即时如此,编译器也不能做优化将这条语句去掉,因为存在对volatile变量count的读取,这条语句存在的唯一目的就是保证segments[i].modCount读取到几乎最新的值。

 

解释“必须要先segments[i].count,才能segments[i].modCount,这是因为segment[i].count是对volatile变量的访问,接下来segments[i].modCount才能得到几乎最新的值”


  写volatile变量和它之前的读写操作是不能重排序reorder的,读volatile变量和它之后的读写操作也是不能reorder的。

  在此程序中,表现为修改modCount发生在修改count之前(查看源代码会发现在写count之前必定有写modCount),由于count是volatile变量,修改modCount不能和写count的操作reorder,读取count和它之后的操作,比如读取modCount,不能reorder。有了这两个“不能reorder”才能保证读取了count之后,能读到线程在写count之前写入的modCount值,这个modCount值是几乎最新的。如果在读modCount之前不读count,读modCount甚至可能会reorder到写modCount之前。如果写modCount放在写count之后,则写modCount可能会被reorder到读modCount之后。即读写顺序需要相互配合,才能保证读取到的modCount几乎是最新的


  最后简单地介绍下迭代方法,如keySet(),values(),entrySet()方法,这些方法都返回相应的迭代器,所有迭代器都继承于HashIterator类里实现的主要的方法。其结构是:

1 abstract class HashIterator {
2     int nextSegmentIndex;
3     int nextTableIndex;
4     HashEntry<K,V>[] currentTable;
5     HashEntry<K, V> nextEntry;
6     HashEntry<K, V> lastReturned;
7 }

  nextSegmentIndex是段的索引,nextTableIndex是nextSegmentIndex对应段中hash链的索引,currentTable是nextSegmentIndex对应段的table。调用next方法时主要是调用了advance()方法:

 1 final void advance() {
 2     if (nextEntry != null && (nextEntry = nextEntry.next) != null)
 3         return;
 4 
 5     while (nextTableIndex >= 0) {
 6         if ( (nextEntry = currentTable[nextTableIndex--]) != null)
 7             return;
 8     }
 9 
10     while (nextSegmentIndex >= 0) {
11         Segment<K,V> seg = segments[nextSegmentIndex--];
12         if (seg.count != 0) {
13             currentTable = seg.table;
14             for (int j = currentTable.length - 1; j >= 0; --j) {
15                 if ( (nextEntry = currentTable[j]) != null) {
16                     nextTableIndex = j - 1;
17                     return;
18                 }
19             }
20         }
21     }
22 }

  不想再多介绍了,唯一需要注意的是跳到下一个段时,一定要先读取下一个段的count变量。

  这种迭代方式的主要效果是不会抛出ConcurrentModificationException。一旦获取到下一个段的table,也就意味着这个段的头结点在迭代过程中就确定了,在迭代过程中就不能反映对这个段节点并发的删除和添加,对于节点的更新是能够反映的,因为节点的值是一个volatile变量。

 

参考文章:《Java并发编程:并发容器之ConcurrentHashMap(转载)

Java并发编程:并发容器之CopyOnWriteArrayList(转载)

Java并发编程:并发容器之ConcurrentHashMap

Java并发编程:并发容器之CopyOnWriteArrayList

并发编程系列之并发容器:ConcurrentHashMap

java并发编程之工具类