Java集合源码剖析——基于JDK1.8中ConcurrentHashMap的实现原理
Posted 宋子浩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java集合源码剖析——基于JDK1.8中ConcurrentHashMap的实现原理相关的知识,希望对你有一定的参考价值。
文章目录:
1.2 Collections.synchronizedMap
1.解决HashMap线程不安全的几种方法
1.1 Hashtable
第一种方法就是使用Hashtable,这是个线程安全的集合类,而它在针对 put、get等一系列操作时,都是对整个方法进行上锁。
public synchronized V put(K key, V value)
// Make sure the value is not null
if (value == null)
throw new NullPointerException();
// Makes sure the key is not already in the hashtable.
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
Entry<K,V> entry = (Entry<K,V>)tab[index];
for(; entry != null ; entry = entry.next)
if ((entry.hash == hash) && entry.key.equals(key))
V old = entry.value;
entry.value = value;
return old;
addEntry(hash, key, value, index);
return null;
public synchronized V get(Object key)
Entry<?,?> tab[] = table;
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<?,?> e = tab[index] ; e != null ; e = e.next)
if ((e.hash == hash) && e.key.equals(key))
return (V)e.value;
return null;
可以看到,不管是往 map 里边添加元素还是获取元素,都会用 synchronized 关键字加锁。那么如果有多个线程同时操作这个map集合,肯定会存在资源竞争,同时也只能有一个线程可以获取到锁,操作资源。所以,Hashtable 的缺点显而易见,它不管是 get 还是 put 操作,都是锁住了整个 哈希表,效率十分低下,因此并不适合高并发场景。
1.2 Collections.synchronizedMap
这个集合工具类中提供了一系列 synchronizedXXX 的静态方法,用于将那些线程不安全的集合转为线程安全的,不过不太推荐使用,原因来看看:↓↓↓
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
return new SynchronizedMap<>(m);
/**
* @serial include
*/
private static class SynchronizedMap<K,V>
implements Map<K,V>, Serializable
private static final long serialVersionUID = 1978198479659022715L;
private final Map<K,V> m; // Backing Map
final Object mutex; // Object on which to synchronize
SynchronizedMap(Map<K,V> m)
this.m = Objects.requireNonNull(m);
mutex = this;
SynchronizedMap(Map<K,V> m, Object mutex)
this.m = m;
this.mutex = mutex;
public int size()
synchronized (mutex) return m.size();
public boolean isEmpty()
synchronized (mutex) return m.isEmpty();
public boolean containsKey(Object key)
synchronized (mutex) return m.containsKey(key);
public boolean containsValue(Object value)
synchronized (mutex) return m.containsValue(value);
public V get(Object key)
synchronized (mutex) return m.get(key);
public V put(K key, V value)
synchronized (mutex) return m.put(key, value);
public V remove(Object key)
synchronized (mutex) return m.remove(key);
public void putAll(Map<? extends K, ? extends V> map)
synchronized (mutex) m.putAll(map);
public void clear()
synchronized (mutex) m.clear();
在调用 Collections.synchronizedMap 之后,它实际上是为我们 new 了一个 SynchronizedMap 类,追进这个类的源码(截取了部分代码),发现它和Hashtable差不多,也是锁住了整个哈希表,那就不用多说了,在高并发场景下效率肯定是个大问题。
1.3 ConcurrentHashMap(JDK1.7)
主角登场啦,针对上面两种解决方法的局限性,它们都是把整个哈希表锁住再进行后续的操作。那么为了进一步提升效率,我们能不能把整张哈希表分成 N 个部分,并使元素尽量均匀的分布到每个部分中,分别给每个部分加锁,使它们互相之间并不影响,这种方式岂不是更好 。这就是在 JDK1.7 中 ConcurrentHashMap 采用的方案,被叫做锁分段技术,每个部分就是一个 Segment(段)。
在 JDK1.7中,本质上还是采用链表+数组的形式存储键值对的。但是,为了提高并发,把原来的整个 table 划分为 n 个 Segment 。所以,从整体来看,它是一个由 Segment 组成的数组。然后,每个 Segment 里边是由 HashEntry 组成的数组,每个 HashEntry之间又可以形成链表。我们可以把每个 Segment 看成是一个小的 HashMap,其内部结构和 HashMap 是一模一样的。
当对某个 Segment 加锁时,如图中 Segment2,并不会影响到其他 Segment 的读写。每个 Segment 内部自己操作自己的数据。这样一来,我们要做的就是尽可能的让元素均匀的分布在不同的 Segment中。最理想的状态是,所有执行的线程操作的元素都是不同的 Segment,这样就可以降低锁的竞争。
2.JDK1.8中的ConcurrentHashMap
上面针对1.7的ConcurrentHashMap就简单的说这么多了,我主要是考虑目前基本用的都是JDK1.8,所以这里主要来说一下JDK1.8中ConcurrentHashMap的实现原理。
JDK 1.8 的 CHM(ConcurrentHashMap) 实现,完全重构了 1.7 。不再有 Segment 的概念,只是为了兼容 1.7 才申明了一下,并没有用到。因此,不再使用分段锁,而是给数组中的每一个头节点(为了方便,以后都叫桶)都加锁,锁的粒度降低了。采用的是 Synchronized + CAS ,把锁的粒度进一步降低,而放弃了 Segment 分段。
2.1 put方法
注释写的很详细了,唉,花了好长时间才理解。。。还是自己太菜了
根据 key 计算出 hashcode 对应的桶下标。
判断是否需要进行初始化。(只会有一个线程初始化哈希表成功,其他失败的线程会自旋)
根据hash算法找到当前 key 定位出的 Node,如果为空表示当前位置可以写入数据,利用 CAS 尝试写入,失败则自旋保证成功。
如果当前位置的
hashcode == MOVED == -1,
则需要进行扩容。如果都不满足,则利用 synchronized 锁写入数据。
如果数量大于
TREEIFY_THRESHOLD
则要执行树化方法,在treeifyBin
中会首先判断当前数组长度≥64时才会将链表转换为红黑树。
public V put(K key, V value)
return putVal(key, value, false);
final V putVal(K key, V value, boolean onlyIfAbsent)
//可以看到,在并发情况下,key 和 value 都不能为null。有一个为null直接空指针
if (key == null || value == null) throw new NullPointerException();
//这里就是计算key的hash值(具体的好像还挺复杂,我这里没有深入研究)
int hash = spread(key.hashCode());
//用来计算当前链表上kv键值对的个数
int binCount = 0;
for (Node<K,V>[] tab = table;;)
Node<K,V> f; int n, i, fh;
//如果哈希表为空,说明还未进行初始化
if (tab == null || (n = tab.length) == 0)
//initTable()方法用来初始化哈希表,后面我来说这个方法
tab = initTable();
//根据key的hash值通过哈希算法计算得到对应的桶下标,并且判断是否为空
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)
//走进来则说明当前桶为空,进而通过CAS原子操作将新节点插入到此位置,保证了只有一个线程可以CAS成功
//判断哈希表下标为i的节点是否为null,如果为null,则更新为new Node<K,V>(hash, key, value, null)
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
//如果计算得到对应的桶下标不为空,则判断该节点的hash值是否为MOVED(值为-1)
else if ((fh = f.hash) == MOVED)
//如果节点的hash值为-1,则说明当前桶下的数组正在扩容,需要当前线程帮忙迁移数据
tab = helpTransfer(tab, f);
//排除上面三种情况,则会走到这里
else
//临时变量,用来保存key对应的旧值
V oldVal = null;
//加上同步锁,保证线程安全,这里是对当前桶中的第一个节点对象加锁
synchronized (f)
//这里先再次检查一下,确保当前桶的第一个节点没有变化
//tabAt(tab, i)方法表明获取tab这个哈希表中下标为i的第一个节点对象
if (tabAt(tab, i) == f)
//如果key的hash值大于等于0,则说明是链表结构
if (fh >= 0)
binCount = 1;
//从头节点开始遍历,每遍历一次binCount计数加1
for (Node<K,V> e = f;; ++binCount)
K ek;
//找到了和当前key相同的节点(hash值相同 && equals为true)
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek))))
//先将key对应的旧值保存到变量oldVal中
oldVal = e.val;
if (!onlyIfAbsent)
//用新的value值替换key对应的旧值
e.val = value;
break; //已经找到,可以退出循环
Node<K,V> pred = e;
//如果遍历到了链表的尾节点,还没有找到与要插入的key相同的节点
if ((e = e.next) == null)
//则将要插入的节点放在链表的最后(尾插)
pred.next = new Node<K,V>(hash, key,
value, null);
break; //退出循环
//不是链表了话,那就是树节点了。这里提一下,TreeBin只是头结点对TreeNode的再封装
else if (f instanceof TreeBin)
Node<K,V> p;
binCount = 2;
//这里进行红黑树的相关操作
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null)
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
//注意下,这个判断是在同步锁的外部,因为treeifyBin方法内部也有同步锁,所以并不影响
if (binCount != 0)
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
//put完成之后,哈希表的元素要加1,并且可能触发扩容,这个比较复杂,我没有深入研究
addCount(1L, binCount);
return null;
2.2 initTable方法
从源码中可以发现
ConcurrentHashMap
的初始化是通过自旋和 CAS 操作完成的。里面需要注意的是变量sizeCtl
,它的值决定着当前的初始化状态
//此时哈希表为空,要进行初始化操作
private final Node<K,V>[] initTable()
Node<K,V>[] tab; int sc;
//循环判断哈希表是否为空,直到初始化成功为止
while ((tab = table) == null || tab.length == 0)
//sizeCtl这个值有很多情况,默认值为0,
//当为 -1 时,说明有其它线程正在对哈希表进行初始化操作
//当哈希表初始化成功后,又会把它设置为扩容阈值
//当为一个小于 -1 的负数,用来表示当前有几个线程正在帮助扩容
if ((sc = sizeCtl) < 0)
//若sc小于0,其实在这里就是-1,因为此时哈希表是空的,不会发生扩容,sc只能为正数或者-1
//因此,当前线程放弃CPU时间片,只是自旋
Thread.yield(); // lost initialization race; just spin
//通过CAS把 sc 的值设置为-1,表明当前线程正在进行哈希表的初始化
//当其中一个线程将sc更新为-1之后,其它失败的线程就会自旋
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1))
try
//重新检查一下哈希表是否为空
if ((tab = table) == null || tab.length == 0)
//如果sc大于0,则为sc,否则返回默认容量16
//当调用有参构造创建 Map 时,sc的值是大于0的
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//创建哈希表,长度大小为n
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//n - 1/4 n,也就是0.75n,表示扩容阈值
sc = n - (n >>> 2);
finally
//最后将sizeCtl更新为扩容阈值
sizeCtl = sc;
//如果当前线程初始化哈希表成功,则跳出循环
//其他自旋的线程因为判断哈希表不为空,也会停止自旋
break;
return tab;
2.3 get方法
- 根据 hash 值计算位置。
- 查找到指定位置,如果头节点就是要找的,直接返回它的 value.
- 如果头节点 hash 值小于 0 ,说明正在扩容或者是红黑树,find查找。
- 如果是链表,遍历查找。
public V get(Object key)
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//这里就是计算key的hash值
int h = spread(key.hashCode());
//如果哈希表不为空,并且根据key的hash值计算得到的桶下标对应的节点也不为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null)
//如果当前桶的头节点直接命中,匹配成功
if ((eh = e.hash) == h)
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
//直接返回key对应的value
return e.val;
//头节点的hash值小于0,说明正在扩容或者是红黑树,find查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//否则说明是正常的链表,遍历查找即可
while ((e = e.next) != null)
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
return null;
3.总结
以上分享借鉴了很多大佬的文章博客,最后加上自己的理解。
Java7 中
ConcurrentHashMap
使用的分段锁,也就是每一个 Segment 上同时只有一个线程可以操作,每一个Segment
都是一个类似HashMap
数组的结构,它可以扩容,它的冲突会转化为链表。但是Segment
的个数一但初始化就不能改变。Java8 中的
ConcurrentHashMap
使用的Synchronized
锁加 CAS 的机制。底层的数据结构也由 Java7 中的Segment
数组 +HashEntry
数组 + 链表 进化成了 Node 数组 + 链表 / 红黑树,Node 是类似于一个 HashEntry 的结构。当某个桶下挂载的节点数超过一定大小时会转化成红黑树,节点数小于一定数量时又会恢复成链表。
以上是关于Java集合源码剖析——基于JDK1.8中ConcurrentHashMap的实现原理的主要内容,如果未能解决你的问题,请参考以下文章
Java集合源码剖析——基于JDK1.8中ConcurrentHashMap的实现原理
Java集合源码剖析——基于JDK1.8中HashMap的实现原理
Java集合源码剖析——基于JDK1.8中LinkedList的实现原理
Java集合源码剖析——基于JDK1.8中LinkedList的实现原理