java基础-ConcurrentHashMap

Posted holoyong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java基础-ConcurrentHashMap相关的知识,希望对你有一定的参考价值。

 jdk7中ConcurrentHashMap的put方法

 1     public V put(K key, V value) {
 2         Segment<K,V> s;
 3         //在并发map中key、value均不能为null
 4         if (value == null)
 5             throw new NullPointerException();
 6         int hash = hash(key);
 7         //j是segments中的index,定位segment,段用于提高并发度,更新时只在段内加锁,不影响其他段,segments的大小是ssize,ssize<=concurrencyLevel,且是2的n次方。
 8         int j = (hash >>> segmentShift) & segmentMask;
 9          //(j << SSHIFT) + SBASE是内存中第j位的segment在segments中的偏移量
10         if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
11              (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
12             //保证第j位segment的存在
13             s = ensureSegment(j);
14         //向第j位的segment存入key-value
15         return s.put(key, hash, value, false);
16     }

 

 segment的put方法,segment可以看做是一个HashMap,但其并没有实现Map,Segment<K,V> extends ReentrantLock implements Serializable,可以看到segment继承了ReentrantLock,其更像是一把锁。

 1 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 2             //首先尝试获取锁,如果失败则不断扫描hash映射到的桶(以确定是否有相同的key,思想就是即便暂时没有获取到锁,也不要闲着,找些之后要做的事先做着),并尝试获取锁,当尝试获取锁的次数到达阈值,则lock。
 3             HashEntry<K,V> node = tryLock() ? null :
 4                 scanAndLockForPut(key, hash, value);
 5     //之后的逻辑都是在获得锁的前提下进行的
 6             V oldValue;
 7             try {
 8                 //segment其实是一个map结构(但没有实现Map接口),这里的table就类似于HashMap中的table
 9                 HashEntry<K,V>[] tab = table;
10                 //index即桶的索引
11                 int index = (tab.length - 1) & hash;
12                 //取得桶中的首节点
13                 HashEntry<K,V> first = entryAt(tab, index);
14                 //循环逻辑类似HashMap,遇到相同的key就替换,遍历置尾节点仍没有找到相同的key则插入
15                 for (HashEntry<K,V> e = first;;) {
16                     if (e != null) {
17                         K k;
18                         if ((k = e.key) == key ||
19                             (e.hash == hash && key.equals(k))) {
20                             oldValue = e.value;
21                             if (!onlyIfAbsent) {
22                                 e.value = value;
23                                 ++modCount;
24                             }
25                             break;
26                         }
27                         e = e.next;
28                     }
29                     else {
30                         if (node != null)
31                             node.setNext(first);
32                         else
33                             node = new HashEntry<K,V>(hash, key, value, first);
34                         int c = count + 1;
35                         if (c > threshold && tab.length < MAXIMUM_CAPACITY)
36                             rehash(node);
37                         else
38                             setEntryAt(tab, index, node);
39                         ++modCount;
40                         count = c;
41                         oldValue = null;
42                         break;
43                     }
44                 }
45             } finally {
46                 //释放锁
47                 unlock();
48             }
49             return oldValue;
50         }

 

jdk8中ConcurrentHashMap的put方法

1     public V put(K key, V value) {
2         return putVal(key, value, false);
3     }

 

 1     /** Implementation for put and putIfAbsent */
 2     final V putVal(K key, V value, boolean onlyIfAbsent) {
 3         //并发map中key、value均不能为null
 4         if (key == null || value == null) throw new NullPointerException();
 5         //spread和HashMap中的hash函数类似(h ^ (h >>> 16))&0x7fffffff,最后的与是为了消除负号
 6         int hash = spread(key.hashCode());
 7         int binCount = 0;
 8         //与jdk7不同,jdk8不在有segment,直接对每个桶加同步锁
 9         for (Node<K,V>[] tab = table;;) {
10             Node<K,V> f; int n, i, fh;
11             //table是懒加载的
12             if (tab == null || (n = tab.length) == 0)
13                 tab = initTable();
14             //如果桶中没有节点,则尝试使用cas添加
15             else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
16                 if (casTabAt(tab, i, null,
17                              new Node<K,V>(hash, key, value, null)))
18                     //第一个可以跳出循环的地方
19                     break;                   // no lock when adding to empty bin
20             }
21             //hash为MOVED说明该节点正在参与resize
22             else if ((fh = f.hash) == MOVED)
23                 tab = helpTransfer(tab, f);
24             else {//多数情况
25                 V oldVal = null;
26                 synchronized (f) {//对桶的头结点加同步锁,也就是对桶加锁,遍历桶中节点,进行插入或更新操作
27                     if (tabAt(tab, i) == f) {
28                         if (fh >= 0) {
29                             binCount = 1;
30                             for (Node<K,V> e = f;; ++binCount) {
31                                 K ek;
32                                 if (e.hash == hash &&
33                                     ((ek = e.key) == key ||
34                                      (ek != null && key.equals(ek)))) {
35                                     oldVal = e.val;
36                                     if (!onlyIfAbsent)
37                                         e.val = value;
38                                     break;
39                                 }
40                                 Node<K,V> pred = e;
41                                 if ((e = e.next) == null) {
42                                     pred.next = new Node<K,V>(hash, key,
43                                                               value, null);
44                                     break;
45                                 }
46                             }
47                         }
48                         else if (f instanceof TreeBin) {
49                             Node<K,V> p;
50                             binCount = 2;
51                             if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
52                                                            value)) != null) {
53                                 oldVal = p.val;
54                                 if (!onlyIfAbsent)
55                                     p.val = value;
56                             }
57                         }
58                     }
59                 }
60                 if (binCount != 0) {
61                     if (binCount >= TREEIFY_THRESHOLD)
62                         treeifyBin(tab, i);
63                     if (oldVal != null)
64                         return oldVal;
65                     //第二个可以跳出循环的地方
66                     break;
67                 }
68             }
69         }
70         addCount(1L, binCount);
71         return null;
72     }

 

其实看到jdk7的源码时我就在想,为什么要有segment呢?为什么不直接在桶上加锁呢?这是resize的缘故。如果直接在桶上加锁,resize时依旧需要在全局(table)加锁,那此时如果又有桶内节点的更新操作呢?table锁和桶锁并不互斥,这就出现并发问题了。在jdk8中取消了segment,采用直接在桶上加锁的方式,对于resize并发问题也有了相应的解决途径,就是采用复制。resize时会新建一个nextTable,在复制桶A中节点时会先将桶A加锁,解锁前会将原table中的桶首节点置为ForwardingNode(hash为MOVED),将原table中所有节点复制完成后,再table=nextTable。这里的关键就在于ForwardingNode,当线程看到当前桶的首节点为ForwardingNode时,线程不会继续更新操作,转而helpTransfer(帮助resize),resize结束后继续之前的更新操作。

jdk8和jdk7还有一个改变是,jdk7直接使用ReentrantLock,而jdk8转而使用cas+synchronized,cas肯定要比锁机制好,据说synchronized在jdk8中得到了优化,得空再看一下。

以上是关于java基础-ConcurrentHashMap的主要内容,如果未能解决你的问题,请参考以下文章

Java ConcurrentHashMap

Java并发:并发集合ConcurrentHashMap的源码分析

Java经典面试题:HashMap和HashTable以及ConcurrentHashMap分析

ConcurrentHashMap 实现原理

并发编程系列之并发容器:ConcurrentHashMap

java面试问题分类