java多线程进阶ConcurrentHashMap
Posted 烟锁迷城
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java多线程进阶ConcurrentHashMap相关的知识,希望对你有一定的参考价值。
目录
1.2.1、computeIfAbsent:如果不存在则初始化
1.2.2、computeIfPresent:如果存在则修改
1、简单介绍
最常用的map就是HashMap,这是一个典型的线程不安全的Map,甚至在1.7的时候由于尾插的原因,多线程可能会导致环形指向导致死锁。
线程安全的Map有HashTable,但是HashTable的线程安全解决方案非常简单粗暴,是将整个散列数组加上synchronize来让访问线程安全,这样的性能毫无疑问是很差的。
于是ConturrentHashMap应运而生,这是一个高性能线程安全的Map。
1.2、常见方法
1.2.1、computeIfAbsent:如果不存在则初始化
computeIfAbsent方法的作用是如果key值不存在,则初始化value,它分为两种情况
- 如果key存在,不作任何处理。
- 如果key不存在,执行后面的函数表达式,并将结果作为value放入。
- value的数值也会对执行结果造成影响,如果value不为null,则存储成功。如果value为null,则返回null,因为ConcurrentHashMap的value不能为null
map.computeIfAbsent("",k->
return 1;
);
1.2.2、computeIfPresent:如果存在则修改
computeIfPresent方法的作用是如果key存在,则修改对应的value,它分为两种情况
- 如果key不存在,则将返回null。
- 如果key存在,则将修改key和对应value的数值,value根据函数表达式结果决定。
- value的数值也会对执行结果造成影响,如果value不为null,则存储成功。如果value为null,则将当前的key-value删除,相当于执行了一次remove(key),如果函数式异常,则不会影响到原key-value
map.computeIfPresent("",(k,v)->
return 1;
);
1.2.3、compute:无论是否存在,都将数值赋予
compute方法相当于computeIfPresent与computeIfAbsent的结合体,它分为两种情况
- 如果key不存在,则将初始化对应的key-value
- 如果key存在,则修改key-value的对应数值
map.computeIfPresent("",(k,v)->
return (v == null) ? 1 : v+1;
);
1.2.4、merge:合并
merge方法是合并,对key相同的value数值进行合并,此方法包含三个参数,key,value,函数式接口。其作用如下
- 如果key不存在,将value作为初始化的数值
- 如果key存在,执行函数式接口。BiFunction<? super V, ? super V, ? extends V>具有两个数值,第一个是key的oldValue,第二个是传入的value,使用者有四种逻辑可以进行执行来应对不同的表达方式
- 如果写为(oldValue,newValue)->newValue,表示把当前key的value修改为newValue
- 如果写为(oldValue,newValue)->oldValue,表示保留oldValue,不做修改
- 如果写为(oldValue,newValue)->oldValue+newValue,表示把当前key的value修改为oldValue+newValue,即合并新老两个数值
- 如果写为(oldValue,newValue)->null,表示把当前key移除
Map<Integer,Integer> map = new ConcurrentHashMap<>();
Stream.of(1, 2, 3, 2, 5, 6, 5, 1, 3).forEach(v ->
map.merge(v, 2, Integer::sum);
);
2、数据结构
ConcurrentHashMap的数据结构在jdk1.7和jdk1.8之间的改动比较大,其基础数据结构几乎和同版本的HashMap保持一致。
在1.7版本中,ConcurrentHashMap使用数组+链表的结构,加锁采用segment分段策略,对数组进行分段,然后加锁。
在1..8版本中,ConcurrentHashMap使用数组+链表/红黑树的结构,加锁的粒度细化至数组中每一个元素位。
2.1、put方法
ConcurrentHashMap的构造函数是空置的,也就是说,在使用时ConcurrentHashMap才会开始进行初始化。
可以看到,put执行的方法是putVal,这个方法的执行核心是for (Node<K,V>[] tab = table;;),这是一个自旋的循环,因为ConcurrentHashMap是一个在多线程条件下执行的方法,所以采用自旋来执行直到成功为止。
Node<K,V>[] tab = table,这句表示ConcurrentHashMap核心是一个Node<K,V>[]数组,即
transient volatile Node<K,V>[] table;
这是一个无法被序列化的数组,其实ConcurrentHashMap和HashMap的数据结构在很多方面是类似的,HashMap的数组是
transient Node<K,V>[] table;
可以看到两者的唯一区别就是volatile修饰符,由此可以看出来,ConcurrentHashMap更适合多线程环境。
由于putVal方法很长,接下来将会分段解析源码
public V put(K key, V value)
return putVal(key, value, false);
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent)
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;)
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else
V oldVal = null;
synchronized (f)
if (tabAt(tab, i) == f)
if (fh >= 0)
binCount = 1;
for (Node<K,V> e = f;; ++binCount)
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek))))
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
Node<K,V> pred = e;
if ((e = e.next) == null)
pred.next = new Node<K,V>(hash, key,
value, null);
break;
else if (f instanceof TreeBin)
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null)
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
if (binCount != 0)
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
addCount(1L, binCount);
return null;
2.1.1、初始化
ConcurrentHashMap的第一段内容是初始化,如果tab为null或长度为0,执行初始化方法initTable()
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
initTable初始化方法,内部具有一个自旋循环,循环条件是tab为空或长度为0,即未初始化。
将sc赋予sizeCtl的数值,并判断是否小于0,sizeCtl是一个状态数值,后面会详细分析它的数值变化与ConcurrentHashMap状态之间的关系。
如果不符合条件,则执行CAS,这是一个典型的乐观锁,目的是如果sc与SIZECTL的数值相等,sc就会被改为-1。此处使用乐观锁而不用lock,因为只要有一个线程成功初始化即可,无需阻塞。
如果更改成功,且还未完成初始化过程,则将n赋予DEFAULT_CAPACITY(因为sc一定为-1),即默认长度16,并且创建一个长度为n的数组,table与tab都被赋予nt的初始化数组结构,并且计算sc的数值,这个数值的结果是长度n-(n>>>2),其实就是n-n/4,向右位移两次既是缩小4倍,可以猜测这是扩容因子的大小。
最后,执行sizeCtl=sc,即扩容因子大小被赋予sizeCtl
private final Node<K,V>[] initTable()
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0)
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1))
try
if ((tab = table) == null || tab.length == 0)
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
finally
sizeCtl = sc;
break;
return tab;
最后总结sizeCtl的数值状态。
2.1.2、创建新节点
在初始化成功之后,继续执行循环,接下来就是下一步,
(f = tabAt(tab, i = (n - 1) & hash)) == null是判断条件
其中(n - 1) & hash长度与hash值进行与操作是和HashMap一致的数组下标位计算方法,可以猜测,tabAt是获取到tab数组指定下标位的数组元素方法
casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))目前我们不知道casTabAt具体实现方式,但是可以看到其中一个元素是创建新的元素节点,可以推测,这里代码的目的是在线程安全的情况下将新节点加入到数组中,并且使用了CAS来确保。
完成插入,break,移出循环。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
tabAt方法是一个优化方法,通过内存的偏移量来获取到tab对应下标位的数值
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
casTabAt是一个CAS方法,将新数据替换掉数组对应节点的旧数组
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v)
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
Node的构造,可以看到其各个元素的具体内容,不再赘述。
Node(int hash, K key, V val, Node<K,V> next)
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
2.1.3、辅助扩容
f.hash,获取到当前的hash数值,判断是否等于MOVED,这个数值是-1
static final int MOVED = -1
一般来说,hash值不会为-1,但是这个值是被预设的,目的是执行helpTransfer方法,即辅助转移,一般来说,只有在进行扩容的时候才会进行数组元素的移动
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
辅助扩容,此处代码非常类似下文的tryPresize扩容算法,因此在下面继续分析。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f)
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null)
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0)
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nextTab);
break;
return nextTab;
return table;
2.1.4、插入算法
根据上文结论,f其实就是当前key代表的数组元素位,synchronized (f)实现了精准加锁,仅对此位置上的数据进行加锁处理。
tabAt(tab, i) == f,根据偏移量获取数值,此前已经分析过不再赘述
在此之后的代码可以大致分为三段
第一段,插入链表
第二段,插入红黑树
第三段,转换判定
else
V oldVal = null;
synchronized (f)
if (tabAt(tab, i) == f)
if (fh >= 0)
binCount = 1;
for (Node<K,V> e = f;; ++binCount)
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek))))
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
Node<K,V> pred = e;
if ((e = e.next) == null)
pred.next = new Node<K,V>(hash, key,
value, null);
break;
else if (f instanceof TreeBin)
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null)
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
if (binCount != 0)
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
执行方法,计算hash,进行数值放置。
if (fh >= 0)
binCount = 1;
for (Node<K,V> e = f;; ++binCount)
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek))))
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
Node<K,V> pred = e;
if ((e = e.next) == null)
pred.next = new Node<K,V>(hash, key,
value, null);
break;
节点是否继承TreeBin,如果是,则进行红黑树的搭建。
else if (f instanceof TreeBin)
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null)
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
bincount计算累积的放置聊表节点数量,进行扩容计算和红黑树链表转换算法。
if (binCount != 0)
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
2.1.5、treeifyBin:辅助扩容与红黑树转换
treeifyBin是一个特殊的函数,它的主要功能有两个,辅助扩容和红黑树转换
辅助扩容的条件是数组长度是否小于MIN_TREEIFY_CAPACITY,即64。如果小于64,则启动扩容流程。
如果长度已经大于64,继续判断如果获取的数组元素节点不为空且数值hash大于0,对节点数值加锁。
如果其他线程没有对b这个元素数值修改,那么就开始进行红黑树节点的转换
private final void treeifyBin(Node<K,V>[] tab, int index)
Node<K,V> b; int n, sc;
if (tab != null)
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0)
synchronized (b)
if (tabAt(tab, index) == b)
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next)
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
setTabAt(tab, index, new TreeBin<K,V>(hd));
扩容函数tryPresize。
第一个判断,(size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1)
size是已经有符号左移一位,即扩容一倍。MAXIMUM_CAPACITY是最大容量,
private static final int MAXIMUM_CAPACITY = 1 << 30;
size会与MAXIMUM_CAPACITY的一半作比较,如果是大于,则直接扩容至最大。
如果不是,则进行一种法则运算。tableSizeFor会让计算出来的长度转化为最接近的一个2的幂次方结果,因为HashMap的算法需要数组长度必须是2的幂次方长度。
(sc = sizeCtl) >= 0这个判断是是否为初始化过程,判断依据是sizeCtl,因为tryPresize不仅仅会在这里调用,putAll也会调用此方法。
private final void tryPresize(int size)
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0)
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0)
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1))
try
if (table == tab)
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
finally
sizeCtl = sc;
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table)
int rs = resizeStamp(n);
if (sc < 0)
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
如果数组的确没有完成初始化,就进行初始化操作。
if (tab == null || (n = tab.length) == 0)
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1))
try
if (table == tab)
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
finally
sizeCtl = sc;
如果长度超出限制或小于0,则跳出循环。
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
这段是很重要的代码,因为ConcurrentHashMap是多线程的容器,可以使用多线程辅助扩容。
ConcurrentHashMap多线程辅助扩容的原理并非是多个线程进行扩容,而是扩容完成之后,每一个参与进来的线程都会进行一部分数组长度的迁移,即将旧的数组元素迁移至新的数组中。
resizeStamp方法是第一个方法,计算出一个标识符。
第一次扩容sc会大于0,所以执行的是else的代码。
U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2),可以看到rs << RESIZE_STAMP_SHIFT是左移16位,得到低16位为0,高16位为计算标识rs的数,与SC做一次替换,然后执行transfer。
此时RS的高16位为扩容标识符,是需要保持唯一的,低16位为当前扩容的线程数量,此时全是0,证明此时参与的线程数为0。
随后rs+2,低16位获取到2,线程数增加。
如果此时进来一个线程,sc必然小于0,因为原本的16位一定是1,被移动到高位之后,首位必定为1,整个整数是负数
if的判定条件很复杂,大概分为五种
(sc >>> RESIZE_STAMP_SHIFT) != rs,sc右移16位,不等于标识位,代表扩容已经结束
sc == rs + 1,唯一标识+1等于sc,代表扩容已经结束
sc == rs + MAX_RESIZERS,sc等于rs加上最大数值,代表扩容已经结束
(nt = nextTable) == null,下一指向为空,代表扩容移动结束
transferIndex <= 0,移动索引,小于0就代表扩容移动结束
因为扩容已经结束,所以跳出循环。
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)),表示当前执行扩容的线程数量+1
接下来就是扩容方法transfer。
else if (tab == table)
int rs = resizeStamp(n);
if (sc < 0)
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
resizeStamp是计算线程标识的方法。
Integer.numberOfLeadingZeros(n)是为了获取长度的二进制数的第一个1前面有多少个0。因为n一定是2的整幂次,所以只有一个1。
1 << (RESIZE_STAMP_BITS - 1)是1左移15位,即2进制数第16位为1。
计算结果就是将前者的第16位换为1。
static final int resizeStamp(int n)
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
2.1.6、transfer:多线程扩容移动
transfer方法主要的功能有三个,创建新的数组,对老的数据进行一次迁移,多线程的扩容。
多线程辅助扩容的思路是为每一个进入的线程分配需要进行迁移的范围,
扩容开始时,需要记录当前线程数量,完成数据迁移后,需要再次计算线程数量,确保迁移完成,这个数量被记录在sizeCtl之中,即之前提到过的状态。
sizeStamp是扩容戳,高16位是唯一标识,低16位是线程计数数量。
第一次进入的线程+2,之后每次线程进入将会+1。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) // initiating
try
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
catch (Throwable ex) // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
nextTable = nextTab;
transferIndex = n;
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;)
Node<K,V> f; int fh;
while (advance)
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0)
i = -1;
advance = false;
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0)))
bound = nextBound;
i = nextIndex - 1;
advance = false;
if (i < 0 || i >= n || i + n >= nextn)
int sc;
if (finishing)
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1))
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else
synchronized (f)
if (tabAt(tab, i) == f)
Node<K,V> ln, hn;
if (fh >= 0)
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next)
int b = p.hash & n;
if (b != runBit)
runBit = b;
lastRun = p;
if (runBit == 0)
ln = lastRun;
hn = null;
else
hn = lastRun;
ln = null;
for (Node<K,V> p = f; p != lastRun; p = p.next)
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
else if (f instanceof TreeBin)
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next)
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0)
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
else
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
第一个判定,用于计算多线程情况下,每一个线程的数据移动数量,结论是如果小于最小分配数值,就按最小数值分配。
最小是stride = MIN_TRANSFER_STRIDE,即16位。
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
nextTab是新数组,第一个判定的目的是查看是否已经创建了新数组。
如果没有,就进行创建。
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1],n左移一位,即双倍扩容。
如果失败,sizeCtl将会被赋予Integer的最大数值。
新数组被赋予,transferIndex的数值为n,以旧数组的长度为准,因为需要移动的长度应该是以旧数组为准
if (nextTab == null) // initiating
try
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
catch (Throwable ex) // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
nextTable = nextTab;
transferIndex = n;
nextTab是新数组,获取到长度至nextn
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab),移动标识,被移动过的旧节点将会被替换为fwd作为标志。
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
for循环,依旧是死循环,跳出循环的方法应该在方法体内部,但是无需关心。需要关心的是计算迁移范围与索引。
if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ?
nextIndex - stride : 0)))
bound = nextBound;
i = nextIndex - 1;
advance = false;
nextIndex > stride ? nextIndex - stride : 0,nextIndex是索引数值,nextIndex = transferIndex,初始数值是旧数组长度,如果大于计算范围stride,nextBound为nextIndex - stride,否则为0
假设旧数组长度为32,计算范围是16,那么nextBound为16,i 为transferIndex-1,即31,第一次的范围就是31~16,第二次的范围就是0~15
boolean advance = true;
boolean finishing = false;
for (int i = 0, bound = 0;;)
Node<K,V> f; int fh;
while (advance)
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0)
i = -1;
advance = false;
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0)))
bound = nextBound;
i = nextIndex - 1;
advance = false;
根据上面的结果,i为31,下面是终止扩容的判断,其一是扩容标志是否已经完成,其二是线程是否已经完全退出。
if (i < 0 || i >= n || i + n >= nextn)
int sc;
if (finishing)
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1))
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
如果当前节点的数值为null,无需迁移,直接将节点换为fwd节点。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
检测当前节点是否已经移动过,这个判断属于fwd的节点性质,可以看一下fwd的源码。
如果移动过,就要重新计算范围,advance变为true。
else if ((fh = f.hash) == MOVED)
advance = true;
fwd节点自身状态就是MOVED。
static final class ForwardingNode<K,V> extends Node<K,V>
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab)
super(MOVED, null, null, null);
this.nextTable = tab;
如果节点不为空,针对红黑树节点和链表节点进行不同处理。
synchronized (f) 一个线程操作节点时,需要加锁,其他线程不能操作。
if (tabAt(tab, i) == f),如果节点确定是f,才能进行移动。
if (fh >= 0),fh是f节点的hash数值,绝大多数情况下不会小于0
在扩容过程中,重新进行hash计算是很常见的一种情况,因为扩容的目的其实就是为了缩短链表或者红黑树的长度,因为数组扩大,对应的位置变多,链表法链表的长度才能下降,所以一定会进行一次重Hash。因为Hash计算的数组位与整个数组的长度是相关的,一旦数组长度变化,hash得到的数组位也会有所不同。
实际上,经过新的计算,有些数据的位置会变化,有些数据不会变化,不会变化的链表节点被放入低位链表,会移动的放入高位链表。
int runBit = fh & n,得到一个数值runBit,这个数值的作用是判断该节点是否需要进行移动。
for (Node<K,V> p = f.next; p != null; p = p.next),循环取出节点内的链表数据
int b = p.hash & n,计算当前节点的Hash计算数值,
if (b != runBit)
runBit = b;
lastRun = p;
如果这两个数值不等,就要做一个赋值,将当前节点赋予lastRun,并且把runBit替换为b,这样做的目的是遍历整个链表,假如一个链表从某个节点开始,到结束一直都不需要进行节点移动或都需要移动,那么这段链表最好不要被破坏,被保留下来后可以直接放入低/高位链表之中。
if (runBit == 0)
ln = lastRun;
hn = null;
else
hn = lastRun;
ln = null;
如果runBit为0,就把得到的节点放入低位链表,保证不移动,反之放入高位链表之中。
synchronized (f)
if (tabAt(tab, i) == f)
Node<K,V> ln, hn;
if (fh >= 0)
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next)
int b = p.hash & n;
if (b != runBit)
runBit = b;
lastRun = p;
if (runBit == 0)
ln = lastRun;
hn = null;
else
hn = lastRun;
ln = null;
接下来将整个链表再次遍历,以得到的lastRun不变节点为终止。
依旧判断,计算hash&n,得到是否移动的判断结论,分别放到高低位上。
for (Node<K,V> p = f; p != lastRun; p = p.next)
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
ConcurrentHashMap的位移节点运算和HashMap的1.8计算方式是一样的,低位链表不变自不必说,高位链表的移动位置就是原位置+原数组长度。
同时将原数组的节点置为fwd,表示已经移动。
advance重新计算线程移动范围。
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
接下来是红黑树的移动。
如果不符合链表节点,那就判断红黑树节点。
h&n的计算方式和上文一致。
红黑树不会像链表一样,可以将整段的不移动结构移植到新的节点位置,因此只能挨个遍历,红黑树同样分为高低位,同样计算是否移动,此处不再赘述。
else if (f instanceof TreeBin)
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next)
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0)
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
else
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
2.2、size方法
size方法在其他的集合之中都是一种非常简单的累计方法,但是在ConcurrentHashMap之中不是如此,因为可以多线程调用put,且put还会针对数组位加锁,所以在竞争激烈的情况下,这个数值计算将会非常复杂。
size方法调用了sumCount方法。
public int size()
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
在sumCount方法中,有两个特殊的量,counterCells,baseCount。
private transient volatile CounterCell[] counterCells
private transient volatile long baseCount
现在我们并不知道这两个数值是如何进行计算的,因为计算累加数量的方法是addCount(),这个方法在putVal的最后一行,添加完毕后,进行数量计算。
final long sumCount()
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null)
for (int i = 0; i < as.length; ++i)
if ((a = as[i]) != null)
sum += a.value;
return sum;
@sun.misc.Contended注解,在Java 8中,提供了@sun.misc.Contended注解来避免伪共享,原理是在使用此注解的对象或字段的前后各增加128字节大小的padding,使用2倍于大多数硬件缓存行的大小来避免相邻扇区预取导致的伪共享冲突。
因为long只有8字节,但是一个缓存行却有64字节,肯定需要解决伪共享问题。
@sun.misc.Contended static final class CounterCell
volatile long value;
CounterCell(long x) value = x;
实际上,ConcurrentHashMap有特殊的计数。
- 在竞争不激烈的情况下,使用sumCount进行计算。
- 在竞争激烈的情况下,使用CounterCell数组进行计算。
计数方法也使用了CAS加锁计算,所谓是否激烈,就是是不是有多个CAS在进行计算,如果失败的次数少,竞争就不激烈,使用sumCount计算。如果失败次数多,竞争就激烈,使用CounterCell数组进行计算。
CounterCell数组的使用思想是,如果一个元素节点使用CAS失败次数多,那么就把CAS分配到多个节点上进行计算,计算完成后将数组内所有的结果累加,并与sumCount相加,得到最终结果,这样就可以把一个节点上的压力分配到多个节点上。
addCount的代码可以分为两部分,一个判断负责进行计数统计,一个是判断check是否大于0,此处的check是之前计数结果binCount,负责进行扩容
private final void addCount(long x, int check)
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)))
fullAddCount(x, uncontended);
return;
if (check <= 1)
return;
s = sumCount();
if (check >= 0)
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY)
int rs = resizeStamp(n);
if (sc < 0)
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
counterCells不为空,或一次CAS操作是否成功。
就像之前说过的,是否启用counterCells的条件是线程竞争的激烈程度,而这个CAS的作用是将baseCount的数值替换为baseCount+x,即计数增加,如果失败,那肯定代表竞争激烈,进入到counterCells计数,如果成功那就更好,没有竞争,直接成功。
接下来的判断比较复杂,分为四种情况
as == null,数组未完成初始化
(m = as.length - 1) < 0,数组未完成初始化
(a = as[ThreadLocalRandom.getProbe() & m]) == null,ThreadLocalRandom.getProbe()线程安全地获取到一个随机数,与长度取模之后,得到一个数值,进行数值判空
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) ,获得上一个判断中的数值,累加之后修改CELLVALUE的数值。
如果最后的CAS都没成功,证明竞争非常激烈,所以要执行fullAddCount(x, uncontended)
check <= 1,无需扩容
sumCount(),累加计算
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)))
fullAddCount(x, uncontended);
return;
if (check <= 1)
return;
s = sumCount();
fullAddCount方法
for(;;)是自旋循环,整个代码可以大致分为两部分,一个是完成数组初始化的,一个是未完成的。
private final void fullAddCount(long x, boolean wasUncontended)
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0)
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
boolean collide = false; // True if last slot nonempty
for (;;)
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0)
if ((a = as[(n - 1) & h]) == null)
if (cellsBusy == 0) // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1))
boolean created = false;
try // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null)
rs[j] = r;
created = true;
finally
cellsBusy = 0;
if (created)
break;
continue; // Slot is now non-empty
collide = false;
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1))
try
if (counterCells == as) // Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
finally
cellsBusy = 0;
collide = false;
continue; // Retry with expanded table
h = ThreadLocalRandom.advanceProbe(h);
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1))
boolean init = false;
try // Initialize table
if (counterCells == as)
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
finally
cellsBusy = 0;
if (init)
break;
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
如果完成初始化,就走这个分支。
如果需要的数组位为空,则需要对这个数组位进行一次插入。
如果可以抢占并抢占成功,对指定数组位进行初始化,然后释放锁,跳出循环
if ((as = counterCells) != null && (n = as.length) > 0)
if ((a = as[(n - 1) & h]) == null)
if (cellsBusy == 0) // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1))
boolean created = false;
try // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null)
rs[j] = r;
created = true;
finally
cellsBusy = 0;
if (created)
break;
continue; // Slot is now non-empty
collide = false;
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
此处为数组扩容方法,当抢占锁成功时,记性一次扩容,扩容的范围是双倍扩容,并且将旧数组中的数据存到新数组之中。
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1))
try
if (counterCells == as) // Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
finally
cellsBusy = 0;
collide = false;
continue; // Retry with expanded table
如果数组位空,就判断加锁标志cellBusy是否被抢占,如果没有被抢占,就尝试进行抢占。
抢占成功后,初始化长度为2的数组,并且将x(累加计数)保存到某个位置,然后赋值给全局变量counterCells,最后释放锁cellsBusy = 0。
如果成功初始化,就离开循环。if (init) break
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1))
boolean init = false;
try // Initialize table
if (counterCells == as)
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
finally
cellsBusy = 0;
if (init)
break;
如果都没有成功,再对baseCount进行一次CAS累加计算,成功就跳出循环。
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
完成数组内数据计数的累加,在与sum累加。
final long sumCount()
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null)
for (int i = 0; i < as.length; ++i)
if ((a = as[i]) != null)
sum += a.value;
return sum;
以上是关于java多线程进阶ConcurrentHashMap的主要内容,如果未能解决你的问题,请参考以下文章