ConcurrentHashMap源码解析_04 transfer方法源码分析(难点)
Posted 兴趣使然の草帽路飞
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentHashMap源码解析_04 transfer方法源码分析(难点)相关的知识,希望对你有一定的参考价值。
- 上一篇文章介绍过
put
方法以及其相关的方法,接下来,本篇就介绍一下transfer
这个方法(比较难),最好能动手结合着源码进行分析,并仔细理解前面几篇文章的内容~ - 注:代码分析的注释中的CASE0、CASE1… ,这些并没有直接关联关系,只是为了给每个
if
逻辑判断加一个标识,方便在其他逻辑判断的地方进行引用而已。
再复习一下并发Map的结构图:
1、transfer方法
transfer
方法的作用是:迁移元素,扩容时table容量变为原来的两倍,并把部分元素迁移到其它桶nextTable中。该方法迁移数据的流程就是在发生扩容时,从散列表原table中,按照桶位下标,依次将每个桶中的元素(结点、链表、树)迁移到新表nextTable中。
另外还有与其相关的方法helpTransfer
:协助扩容(迁移元素),当线程添加元素时发现table正在扩容,且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。
下图为并发扩容流程图,在分析源码前熟悉一下流程:
链表迁移示意图:
下面就开始分析代码把:
/**
* 迁移元素,扩容时table容量变为原来的两倍,并把部分元素迁移到其它桶nextTable中:
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// n 表示扩容之前table数组的长度
// stride 表示分配给线程任务的步长
int n = tab.length, stride;
// 这里为了方便分析,根据CUP核数,stride 固定为MIN_TRANSFER_STRIDE 16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// 控制线程迁移数据的最小步长(桶位的跨度~)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// ------------------------------------------------------------------------------
// CASE0:nextTab == null
// 条件成立:表示当前线程为触发本次扩容的线程,需要做一些扩容准备工作:(在if代码块中)
// 条件不成立:表示当前线程是协助扩容的线程...
if (nextTab == null) { // initiating
try {
// 创建一个比扩容之前容量n大一倍的新table
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 赋值nextTab对象给 nextTable ,方便协助扩容线程 拿到新表
nextTable = nextTab;
// 记录迁移数据整体位置的一个标记,初始值是原table数组的长度。
// 可以理解为:全局范围内散列表的数据任务处理到哪个桶的位置了
transferIndex = n;
}
// 表示新数组的长度
int nextn = nextTab.length;
// fwd节点,当某个桶位数据迁移处理完毕后,将此桶位设置为fwd节点,其它写线程或读线程看到后,会有不同逻辑。fwd结点指向新表nextTab的引用
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 推进标记(后面讲数据迁移的时候再分析~)
boolean advance = true;
// 完成标记(后面讲数据迁移的时候再分析~)
boolean finishing = false; // to ensure sweep before committing nextTab
// i 表示分配给当前线程的数据迁移任务,任务执行到的桶位下标(任务进度~表示当前线程的任务已经干到哪里了~)
// bound 表示分配给当前线程任务的下界限制。(线程的数据迁移任务先从高位开始迁移,迁移到下界位置结束)
int i = 0, bound = 0;
// 自旋~ 非常长的一个for循环...
for (;;) {
// f 桶位的头结点
// fh 头结点的hash
Node<K,V> f; int fh;
/**
* while循环的任务如下:(循环的条件为推进标记advance为true)
* 1.给当前线程分配任务区间
* 2.维护当前线程任务进度(i 表示当前处理的桶位)
* 3.维护map对象全局范围内的进度
*/
// 整个while循环就是在算i的值,过程太复杂,不用太关心
// i的值会从n-1依次递减,感兴趣的可以打下断点就知道了
// 其中n是旧桶数组的大小,也就是说i从15开始一直减到1这样去迁移元素
while (advance) {
// nextIndex~nextBound分配任务的区间:
// 分配任务的开始下标
// 分配任务的结束下标
int nextIndex, nextBound;
// -----------------------------------------------------------------------
// CASE1:
// 条件一:--i >= bound
// 成立:表示当前线程的任务尚未完成,还有相应的区间的桶位要处理,--i 就让当前线程处理下一个桶位.
// 不成立:表示当前线程任务已完成 或者 未分配
if (--i >= bound || finishing)
// 推进标记设置为flase
advance = false;
// -----------------------------------------------------------------------
// CASE2: (nextIndex = transferIndex) <= 0
// transferIndex:可以理解为,全局范围内散列表的数据任务处理到哪个桶的位置了~
// 前置条件:当前线程任务已完成 或 者未分配,即没有经过CASE1
// 条件成立:表示对象全局范围内的桶位都分配完毕了,没有区间可分配了,设置当前线程的i变量为-1 跳出循环,然后执行退出迁移任务相关的程序
// 条件不成立:表示对象全局范围内的桶位尚未分配完毕,还有区间可分配~
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
// 推进标记设置为flase
advance = false;
}
// -----------------------------------------------------------------------
// CASE3,其实就是移动i的一个过程:CAS更新成功,则i从当前位置-1,再复习一下i的含义:
// i 表示分配给当前线程的数据迁移任务,任务执行到的桶位下标(任务进度~表示当前线程的任务已经干到哪里了~)
// CASE3前置条件(未经过CASE1-2):1、当前线程需要分配任务区间 2.全局范围内还有桶位尚未迁移
// 条件成立:说明给当前线程分配任务成功
// 条件失败:说明分配任务给当前线程失败,应该是和其它线程发生了竞争~
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
/**
处理线程任务完成后,线程退出transfer方法的逻辑:
**/
// -------------------------------------------------------------------------
// CASE4:
// 条件一:i < 0
// 成立:表示当前线程未分配到任务,或已完成了任务
// 条件二、三:一般情况下不会成里~
if (i < 0 || i >= n || i + n >= nextn) {
// 如果一次遍历完成了
// 也就是整个map所有桶中的元素都迁移完成了
// 保存sizeCtl的变量
int sc;
// 扩容结束条件判断
if (finishing) {
// 如果全部桶中数据迁移完成了,则替换旧桶数组
// 并设置下一次扩容门槛为新桶数组容量的0.75倍
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// -------------------------------------------------------------------------
// CASE5:当前线程扩容完成,把并发扩容的线程数-1,该操作基于CAS,修改成功返回true
// 条件成立:说明,更新 sizeCtl低16位 -1 操作成功,当前线程可以正常退出了~
// 如果条件不成立:说明CAS更新操作时,与其他线程发生竞争了~
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 条件成立:说明当前线程不是最后一个退出transfer任务的线程
// eg:
// 1000 0000 0001 1011 0000 0000 0000 0000
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 扩容完成两边肯定相等
// 正常退出
return;
// 完成标记finishing置为true,,finishing为true才会走到上面的if扩容结束条件判断
// 推进标记advance置为true
finishing = advance = true;
// i重新赋值为n
// 这样会再重新遍历一次桶数组,看看是不是都迁移完成了
// 也就是第二次遍历都会走到下面的(fh = f.hash) == MOVED这个条件
i = n; // recheck before commit
}
}
/**
线程处理一个桶位数据的迁移工作,处理完毕后,
设置advance为true: 表示继续推荐,然后就会回到for,继续循环
**/
// -------------------------------------------------------------------------
// CASE6:
// 【CASE6~CASE8】前置条件:当前线程任务尚未处理完,正在进行中!
// 条件成立:说明当前桶位未存放数据,只需要将此处设置为fwd节点即可。
else if ((f = tabAt(tab, i)) == null)
// 如果桶中无数据,直接放入FWD,标记该桶已迁移:
// casTabAt通过CAS的方式去向Node数组指定位置i设置节点值,设置成功返回true,否则返回false
// 即:将tab数组下标为i的结点设置为FWD结点
advance = casTabAt(tab, i, null, fwd);
// -------------------------------------------------------------------------
// CASE7: (fh = f.hash) == MOVED 如果桶中第一个元素的hash值为MOVED
// 条件成立:说明头节f它是ForwardingNode节点,
// 则,当前桶位已经迁移过了,当前线程不用再处理了,直接再次更新当前线程任务索引,再次处理下一个桶位 或者 其它操作~
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// -------------------------------------------------------------------------
// CASE8:
// 前置条件:**当前桶位有数据**,而且node节点 不是 fwd节点,说明这些数据需要迁移。
else {
// 锁定该桶并迁移元素:(sync 锁加在当前桶位的头结点)
synchronized (f) {
// 再次判断当前桶第一个元素是否有修改,(可能出现其它线程抢先一步迁移/修改了元素)
// 防止在你加锁头对象之前,当前桶位的头对象被其它写线程修改过,导致你目前加锁对象错误...
if (tabAt(tab, i) == f) {
// 把一个链表拆分成两个链表(规则按照是桶中各元素的hash与桶大小n进行与操作):
// 等于0的放到低位链表(low)中,不等于0的放到高位链表(high)中
// 其中低位链表迁移到新桶中的位置相对旧桶不变
// 高位链表迁移到新桶中位置正好是其在旧桶的位置加n
// 这也正是为什么扩容时容量在变成两倍的原因 (链表迁移原理参考上面的图)
// ln 表示低位链表引用
// hn 表示高位链表引用
Node<K,V> ln, hn;
// ---------------------------------------------------------------
// CASE9:
// 条件成立:表示当前桶位是链表桶位
if (fh >= 0) {
// 第一个元素的hash值大于等于0,说明该桶中元素是以链表形式存储的
// 这里与HashMap迁移算法基本类似,唯一不同的是多了一步寻找lastRun
// 这里的lastRun是提取出链表后面不用处理再特殊处理的子链表
// 比如所有元素的hash值与桶大小n与操作后的值分别为 0 0 4 4 0 0 0
// 则最后后面三个0对应的元素肯定还是在同一个桶中
// 这时lastRun对应的就是倒数第三个节点
// 至于为啥要这样处理,我也没太搞明白
// lastRun机制:
// 可以获取出 当前链表 末尾连续高位不变的 node
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 看看最后这几个元素归属于低位链表还是高位链表
// 条件成立:说明lastRun引用的链表为 低位链表,那么就让 ln 指向 低位链表
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 否则,说明lastRun引用的链表为 高位链表,就让 hn 指向 高位链表
else {
hn = lastRun;
ln = null;
}
// 遍历链表,把hash&n为0的放在低位链表中,不为0的放在高位链表中
// 循环跳出条件:当前循环结点!=lastRun
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 低位链表的位置不变
setTabAt(nextTab, i, ln);
// 高位链表的位置是:原位置 + n
setTabAt(nextTab, i + n, hn);
// 标记当前桶已迁移
setTabAt(tab, i, fwd);
// advance为true,返回上面进行--i操作
advance = true;
}
// ---------------------------------------------------------------
// CASE10:
// 条件成立:表示当前桶位是 红黑树 代理结点TreeBin
else if (f instanceof TreeBin) {
// 如果第一个元素是树节点,也是一样,分化成两颗树
// 也是根据hash&n为0放在低位树中,不为0放在高位树中
// 转换头结点为 treeBin引用 t
TreeBin<K,V> t = (TreeBin<K,V>)f;
// 低位双向链表 lo 指向低位链表的头 loTail 指向低位链表的尾巴
TreeNode<K,V> lo = null, loTail = null;
// 高位双向链表 lo 指向高位链表的头 loTail 指向高位链表的尾巴
TreeNode<K,V> hi = null, hiTail = null;
// lc 表示低位链表元素数量
// hc 表示高位链表元素数量
int lc = 0, hc = 0;
// 遍历整颗树(TreeBin中的双向链表,从头结点至尾节点),根据hash&n是否为0分化成两颗树:
for (Node<K,V> e = t.first; e != null; e = e.next) {
// h 表示循环处理当前元素的 hash
int h = e.hash;
// 使用当前节点 构建出来的新的TreeNode
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// ---------------------------------------------------
// CASE11:
// 条件成立:表示当前循环节点 属于低位链节点
if ((h & n) == 0) {
// 条件成立:说明当前低位链表还没有数据
if ((p.prev = loTail) == null)
lo = p;
// 说明低位链表已经有数据了,此时当前元素追加到低位链表的末尾就行了
else
loTail.next = p;
// 将低位链表尾指针指向 p 节点
loTail = p;
++lc;
}
// ---------------------------------------------------
// CASE12:
// 条件成立:当前节点属于高位链节点
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果分化的树中元素个数小于等于6,则退化成链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位树的位置不变
setTabAt(nextTab, i, ln);
// 高位树的位置是原位置加n
setTabAt(nextTab, i + n, hn);
// 标记该桶已迁移
setTabAt(tab, i, fwd);
// advance为true,返回上面进行--i操作
advance = true;
}
}
}
}
}
}
补充,lastRun机制示意图:
小节:
(1)新桶数组大小是旧桶数组的两倍;
(2)迁移元素先从靠后的桶开始;
(3)迁移完成的桶在里面放置一ForwardingNode类型的元素,标记该桶迁移完成;
(4)迁移时根据hash&n是否等于0把桶中元素分化成两个链表或树;
(5)低位链表(树)存储在原来的位置;
(6)高们链表(树)存储在原来的位置加n的位置;
(7)迁移元素时会锁住当前桶,也是分段锁的思想;
2、helpTransfer方法
helpTransfer
:协助扩容(迁移元素),当线程添加元素时发现table正在扩容,且当前元素所在的桶元素已经迁移完成了,则协助迁移其它桶的元素。当前桶元素迁移完成了才去协助迁移其它桶元素!
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
// nextTab 引用的是 fwd.nextTable == map.nextTable 理论上是这样。
// sc 保存map.sizeCtl
Node<K,V>[] nextTab; int sc;
// CASE0: 如果桶数组不为空,并且当前桶第一个元素为ForwardingNode类型,并且nextTab不为空
// 说明当前桶已经迁移完毕了,才去帮忙迁移其它桶的元素
// 扩容时会把旧桶的第一个元素置为ForwardingNode,并让其nextTab指向新桶数组
// 条件一:tab != null 恒成立 true
// 条件二:(f instanceof ForwardingNode) 恒成立 true
// 条件三:((ForwardingNode<K,V>)f).nextTable) != null 恒成立 true
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 根据前表的长度tab.length去获取扩容唯一标识戳,假设 16 -> 32 扩容:1000 0000 0001 1011
int rs = resizeStamp(tab.length);
// 条件一:nextTab == nextTable
// 成立:表示当前扩容正在进行中
// 不成立:1.nextTable被设置为Null了,扩容完毕后,会被设为Null
// 2.再次出发扩容了...咱们拿到的nextTab 也已经过期了...
// 条件二:table == tab
// 成立:说明 扩容正在进行中,还未完成
// 不成立:说明扩容已经结束了,扩容结束之后,最后退出的线程 会设置 nextTable 为 table
// 条件三:(sc = sizeCtl) < 0
// 成立:说明扩容正在进行中
// 不成立:说明sizeCtl当前是一个大于0的数,此时代表下次扩容的阈值,当前扩容已经结束。
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 条件一:(sc >>> RESIZE_STAMP_SHIFT) != rs
// true -> 说明当前线程获取到的扩容唯一标识戳 非 本批次扩容
// false -> 说明当前线程获取到的扩容唯一标识戳 是 本批次扩容
// 条件二:JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs << 16 ) + 1
// true -> 表示扩容完毕,当前线程不需要再参与进来了
// false -> 扩容还在进行中,当前线程可以参与
// 条件三:JDK1.8 中有bug jira已经提出来了 其实想表达的是 = sc == (rs<<16) + MAX_RESIZERS
// true -> 表示当前参与并发扩容的线程达到了最大值 65535 - 1
// false -> 表示当前线程可以参与进来
// 条件四:transferIndex <= 0
// true -> 说明map对象全局范围内的任务已经分配完了,当前线程进去也没活干..
// false -> 还有任务可以分配。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 扩容线程数加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 当前线程帮忙迁移元素
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
这块代码逻辑很难理解,可以结合着ConcurrentHashMap面试题去巩固一下,重新吸收~
以上是关于ConcurrentHashMap源码解析_04 transfer方法源码分析(难点)的主要内容,如果未能解决你的问题,请参考以下文章
ConcurrentHashMap源码解析_04 transfer方法源码分析(难点)
ConcurrentHashMap源码解析_04 transfer方法源码分析(难点)
ConcurrentHashMap源码解析_03 put方法源码分析
ConcurrentHashMap源码解析_03 put方法源码分析