这一文道尽JUC的ConcurrentHashMap
Posted keep-go-on
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了这一文道尽JUC的ConcurrentHashMap相关的知识,希望对你有一定的参考价值。
相关文章 : 《聊一聊线程互斥与同步的那些事(以实例解释synchronized与ReentrantLock》
ConcurrentHashMap
ConcurrentHashMap
本篇文章是对之前的源码注释做的一些修正,在以前的注释中片面的写了ConcurrentHashMap使用分段锁,实现了更高的并发度,但是在
JDK8
中已经不再使用分段锁实现ConcurrentHashMap,因此特发此文纠正过错。
JDK7中
ConcurrentHashMap
使用分段锁提高了ConcurrentHashMap
的并发度。
JDK8中使用
数组
+链表
+红黑树
实现ConcurrentHashMap
,与HashMap如出一辙,而锁使用了乐观锁CAS
和synchronized
关键字实现。
对象锁与分段锁
为了说明分段锁与对象锁的原理,我们以多线程修改资源过程进行举例,三个线程同时申请不同的资源 :
线程1
申请修改资源1
的数据,同时线程2
申请修改资源3
的数据,线程3
申请修改资源5
的数据。
对象锁
在对象锁中执行的过程:
- 线程1获取到对象锁,对资源1进行修改。
- 线程2等待对象锁释放。
- 线程3等待对象锁释放。
- 线程1修改完成,并释放对象锁。
- 线程2获取对象锁,对资源3进行修改。
- 线程3等待对象锁释放。
- 线程2对资源3修改完成,并释放对象锁。
- 线程3获取对象锁,对资源5进行修改。
- 线程3修改完成,释放对象锁。
真实情况下线程2,线程3 执行顺序并不一定是线程2先执行,实例仅展示资源争抢的原理。
分段锁
在分段锁中执行的过程:
- 线程1获取到段锁1,并对资源1进行修改。
- 线程2等待段锁1。
- 线程3获取段锁2,对资源5进行修改。
- 线程1修改完成,释放段锁1。
- 线程3修改完成,释放段锁2。
- 线程2获取段锁,对资源3进行修改。
以上流程可以看出,线程3并没有对线程1和线程2进行资源争抢,也就是它与另外两个线程不存在冲突。这样就提高了修改速度,提高了并发度。
JDK7使用分段锁实现ConcurrentHashMap
ConcurrentHashMap在对象中保存了一个Segment数组,将整个Hash表划分为多个分段;而每个Segment元素,即每个分段则类似于一个Hashtable;这样,在执行put操作时首先根据hash算法定位到元素属于哪个Segment,然后对该Segment加锁即可。因此,ConcurrentHashMap在多线程并发编程中可是实现多线程put操作。Segment也就是上面描述的段锁
。
由于JDK7已经大势已去,这里不再深究其原理。
JDK8中的ConcurrentHashMap
为什么放弃使用分段锁 ? 为什么使用红黑树 ?
JDK8中放弃了JDK7中的分段锁,而是使用链表+红黑树实现ConcurrentHashMap
,在安全上使用了CAS
+synchronized
。其原因在于JDK7的实现查询效率不高,而红黑树可以实现(O(logn)
)的查询效率。
初始化
先上构造函数
// 这构造函数里
public ConcurrentHashMap()
// 初始化大小的构造方法
public ConcurrentHashMap(int initialCapacity)
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
问,创建MAP时传
16
,Map大小就是16
吗 ?
Map的大小是 通过提供初始容量,计算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】。如 initialCapacity 为 10,那么得到 sizeCtl 为 16,如果 initialCapacity 为 11,得到 sizeCtl 为 32。 sizeCtl 这个属性使用的场景很多,这与HashMap一致。
例如:
如果传入的是 16 那么Map的大小是 32 。如果传入 10,map大小刚好是 16 。
初始化方法中的并发问题是通过对 sizeCtl 进行一个 CAS 操作来控制的。
什么是红黑树?
红黑树(自平衡二叉查找树)是平衡二叉树的一种实现。集合类中的Map、关联数组具有较高的查询效率,它们的底层实现就是红黑树。
链表转红黑树
链表转红黑树的方法为 treeifyBin
,直接从源代码下手解释:
private final void treeifyBin(Node<K,V>[] tab, int index)
Node<K,V> b; int n, sc;
if (tab != null)
// MIN_TREEIFY_CAPACITY 大小为 64
// 所以,如果数组长度小于 64 的时候,其实也就是 32 或者 16 或者更小的时候,会进行数组扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 后面我们再详细分析这个方法
tryPresize(n << 1);
// b 是头结点
else if ((b = tabAt(tab, index)) != null && b.hash >= 0)
// 加锁
synchronized (b)
if (tabAt(tab, index) == b)
// 下面就是遍历链表,建立一颗红黑树
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next)
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
// 将红黑树设置到数组相应位置中
setTabAt(tab, index, new TreeBin<K,V>(hd));
- 两个很重要的对象
Node
和TreeNode
,node
为链表节点,TreeNode为红黑树节点。 - 该方法并不一定是在做链表转红黑树,也可能是在调用扩容。
扩容(tryPresize)
ConcurrentHashMap
扩容与HashMap扩容一样,每次扩容是原来的2倍。源码如下 :
// 当方法参数 size 传进来的时候就已经翻了倍了
private final void tryPresize(int size)
// c等于 size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方 (上面初始化中说明过)。
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0)
Node<K,V>[] tab = table; int n;
// 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码
if (tab == null || (n = tab.length) == 0)
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1))
try
if (table == tab)
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2); // 0.75 * n
finally
sizeCtl = sc;
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table)
// 我没看懂 rs 的真正含义是什么,不过也关系不大
int rs = resizeStamp(n);
if (sc < 0)
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法
// 此时 nextTab 不为 null
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
// 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2)
// 我是没看懂这个值真正的意义是什么? 不过可以计算出来的是,结果是一个比较大的负数
// 调用 transfer 方法,此时 nextTab 参数为 null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
在扩容方法中多次调用了transfer
(数据迁移)方法。
数据迁移
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)
int n = tab.length, stride;
// stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16
// stride 可以理解为”步长“,有 n 个位置是需要进行迁移的,
// 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果 nextTab 为 null,先进行一次初始化
// 前面我们说了,外围会保证第一个发起迁移的线程调用此方法时,参数 nextTab 为 null
// 之后参与迁移的线程调用此方法时,nextTab 不会为 null
if (nextTab == null)
try
// 容量翻倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
catch (Throwable ex) // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
// nextTable 是 ConcurrentHashMap 中的属性
nextTable = nextTab;
// transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置
transferIndex = n;
int nextn = nextTab.length;
// ForwardingNode 翻译过来就是正在被迁移的 Node
// 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED
// 后面我们会看到,原数组中位置 i 处的节点完成迁移工作后,
// 就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了
// 所以它其实相当于是一个标志。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
/*
* 下面这个 for 循环,最难理解的在前面,而要看懂它们,应该先看懂后面的,然后再倒回来看
*
*/
// i 是位置索引,bound 是边界,注意是从后往前
for (int i = 0, bound = 0;;)
Node<K,V> f; int fh;
// 下面这个 while 真的是不好理解
// advance 为 true 表示可以进行下一个位置的迁移了
// 简单理解结局: i 指向了 transferIndex,bound 指向了 transferIndex-stride
while (advance)
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
// 将 transferIndex 值赋给 nextIndex
// 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了
else if ((nextIndex = transferIndex) <= 0)
i = -1;
advance = false;
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0)))
// 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前
bound = nextBound;
i = nextIndex - 1;
advance = false;
if (i < 0 || i >= n || i + n >= nextn)
int sc;
if (finishing)
// 所有的迁移操作已经完成
nextTable = null;
// 将新的 nextTab 赋值给 table 属性,完成迁移
table = nextTab;
// 重新计算 sizeCtl: n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍
sizeCtl = (n << 1) - (n >>> 1);
return;
// 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
// 然后,每有一个线程参与迁移就会将 sizeCtl 加 1,
// 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1))
// 任务结束,方法退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 到这里,说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
// 也就是说,所有的迁移任务都做完了,也就会进入到上面的 if(finishing) 分支了
finishing = advance = true;
i = n; // recheck before commit
// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 该位置处是一个 ForwardingNode,代表该位置已经迁移过了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else
// 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作
synchronized (f)
if (tabAt(tab, i) == f)
Node<K,V> ln, hn;
// 头结点的 hash 大于 0,说明是链表的 Node 节点
if (fh >= 0)
// 下面这一块和 Java7 中的 ConcurrentHashMap 迁移是差不多的,
// 需要将链表一分为二,
// 找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
// lastRun 之前的节点需要进行克隆,然后分到两个链表中
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next)
int b = p.hash & n;
if (b != runBit)
runBit = b;
lastRun = p;
if (runBit == 0)
ln = lastRun;
hn = null;
else
hn = lastRun;
ln = null;
for (Node<K,V> p = f; p != lastRun; p = p.next)
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
// 其中的一个链表放在新数组的位置 i
setTabAt(nextTab, i, ln);
// 另一个链表放在新数组的位置 i+n
setTabAt(nextTab, i + n, hn);
// 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,
// 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了
setTabAt(tab, i, fwd);
// advance 设置为 true,代表该位置已经迁移完毕
advance = true;
else if (f instanceof TreeBin)
// 红黑树的迁移
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next)
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0)
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
else
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
// 如果一分为二后,节点数少于 8,那么将红黑树转换回链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin以上是关于这一文道尽JUC的ConcurrentHashMap的主要内容,如果未能解决你的问题,请参考以下文章