JDK1.8中的ConcurrentHashMap
Posted 愉悦滴帮主)
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK1.8中的ConcurrentHashMap相关的知识,希望对你有一定的参考价值。
目录
JDK1.8中的ConcurrentHashMap
前言
在JDK1.7中的ConcurrentHashMap中我们了解到ConcurrentHashMap采用分段锁的机制,实现并发的更新操作,它首先将数据分成一段一段地存储,然后给每一段数据配一个锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问,锁分段技术的使用大大了提高并发访问效率。底层由ReentrantLock+Segment+HashEntry组成。然而在jdk1.8中的实现已经抛弃了Segment分段锁机制,利用CAS+Synchronized来保证并发更新的安全,底层依然采用数组+链表+红黑树的存储结构。
问题1:为什么在1.8中舍弃了分段锁的机制?
其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树,相对而言,总结如下思考
-
JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)
-
JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了
-
JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档
-
JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock,我觉得有以下几点
1.因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了
2.JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的关键字比使用API更加自然
3.在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据。 -
转自:https://blog.csdn.net/q289336929/article/details/95742247
ConcurrentHashMap中的常量
最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
默认初始容量。一定是2的幂次方数。最小1,最大MAXIMUM_CAPACITY。
private static final int DEFAULT_CAPACITY = 16;
//最大数组的大小(非2的幂)。toArray和相关方法需要。
MAX_ARRAY_SIZE = Integer。MAX_VALUE - 8;
默认并发级别。未使用但定义是为了与该类的以前版本兼容。
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
负载因子。
private static final float LOAD_FACTOR = 0.75f;
使用树而不是列表的bin计数阈值
static final int TREEIFY_THRESHOLD = 8;
调整操作。应该小于TREEIFY_THRESHOLD,在最多6个网格与收缩检测下去除。
static final int UNTREEIFY_THRESHOLD = 6;
可以树形化的最小表容量。(否则,如果一个bin中有太多节点,则会调整表的大小。建议至少设置为4 *TREEIFY_THRESHOLD调整大小和树形化阈值之间的冲突。
static final int MIN_TREEIFY_CAPACITY = 64;
每个传输步骤的最小重新绑定数。范围是细分为允许多个调整线程。这个值作为一个下界,以避免遇到调整器内存占用过多。取值为默认为DEFAULT_CAPACITY。
private static final int MIN_TRANSFER_STRIDE = 16;
在sizeCtl中用于生成stamp的位数。32位数组必须至少为6。
private static int RESIZE_STAMP_BITS = 16;
可以调整大小的最大线程数。必须符合32 - RESIZE_STAMP_BITS位。
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
在sizeCtl中记录大小戳的位移位。
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
Node散列字段的编码。参见上面的解释。
static final int MOVED = -1;//转发节点的散列
static final int TREEBIN = -2;//哈希为树的根
static final int RESERVED = -3;//暂态保留
static final int HASH_BITS = 0x7fffffff;//正常节点散列的可用位
cpu数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
ConcurrentHashMap的初始化方法
在高并发的场景下,initTable()方法通过cas(compareAndSwapInt)的机制来保证只有一个线程能够成功初始化。其他线程放弃本次循环的cpu竞争,进行自旋。但是这样有可能导致cpu占有率过高的问题?比如:一个线程本次循环放弃竞争,但是第二次循环又竞争到了cpu,依次叠加,就会导致cpu占有率过高甚至百分之百的问题。
/**
* 初始化表,使用大小记录在sizeCtl。
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); //释放cpu资源放弃本次循环的cpu竞争;只是自旋
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//取n的四分之三的值
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
ConcurrentHashMap的put方法
不同点:1.8中的HashMap中table[index]的位置放入的是TreeNode(红黑树)的根节点。而1.8中的ConcurrentHashMap放入的是TreeBin对象,这个对象代表整颗红黑树。
问题2:为什么要用TreeBin对象来代表整棵树?
我们通过了解1.8的HashMap知道在插入新节点后,插入新节点所在的红黑树的根节点可能发生改变。我们假设在1.8的ConcurrentHashMap也与HashMap一样,那么首先会以该红黑树的根节点为对象加锁,那么有可能在插入新节点过后该红黑树的根节点发生改变,这样就会造成第二个线程同时在操作该红黑树的时候,原本线程未结束因为根节点对象改变导致线程2获取到了锁,从而导致产生数据冲突这样的一个问题。而TreeBin对象将整棵树包起来,从而对TreeBin作为锁对象,就不会因根节点改变而导致上述问题。
final V putVal(K key, V value, boolean onlyIfAbsent) {
//如果key或value为空则抛空指针异常
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
//遍历数组
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果数组为空,则初始化一个数组
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//如果table[index]位置为空,则创建Node对象插入其中
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果key位置的hash值为-1(MOVED),标志这其他线程在对数组进行扩容,本线程帮助其他线程一起扩容,加快效率。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//key位置上不为空,存在链表或红黑树或单个Node对象
else {
V oldVal = null;
//对要加入的Node对象上锁,在1.8中优化了synchronized ,所以效率没有以前那么低
synchronized (f) {
if (tabAt(tab, i) == f) {
//如果是链表
if (fh >= 0) {
binCount = 1;
//遍历该链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
//位置冲突则进行值覆盖,返回旧值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//尾插法,插入到链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//判断链表长度是否大于等于8,是则转为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//返回旧值
if (oldVal != null)
return oldVal;
break;
}
}
}
//数组中元素个数加一
addCount(1L, binCount);
return null;
}
size代码块
size方法最后返回 baseCount属性 加上CounterCell数组里面的所有值的和。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
//再看看 sumCount()
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
CounterCell是一个静态内部类,里面的long属性是通过volatile 修饰,来保证并发安全。
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
/**
* Table of counter cells. When non-null, size is a power of 2.
* 计数器单元表。当非空时,大小是2的乘方。
*/
private transient volatile CounterCell[] counterCells;
addCount代码块
addCount()方法分为两个部分,第一部分,如下
前提: addCount先判断CounterCell数组是否为空:
1、如果为空则对当前map对象cas操作 baseCount 加 1,cas成功了就跳过if,失败了就执行fullAddCount方法。
2、如果不为空则通过当前线程的hash值找到此线程在CounterCell数组中对应的位置,如果此位置的CounterCell对象为空,就执行fullAddCount方法。如果不为空就对此CounterCell对象cas操作value加1。如果成功return;失败就执行fullAddCount方法。大概意思就是多个线程去调用put方法,也就是多个线程去size+1。在ConcurrentHashMap中通过baseCount去计数。在高并发的场景下,通过CAS机制去控制baseCount+1,也就是只有一个线程能够操作成功。其他线程通过 “随机数&table.length-1” 获取到CounterCell的数组下标,然后去操作CounterCell数组对应下标对象中的value属性,使其value属性+1。最后统计baseCount+CounterCell的数目。
第二部分:数组扩容,我们放在后面描述。
那么为什么不直接for循环对当前map对象cas操作 baseCount 加 1,却要引入CounterCell数组?
因为for循环cas这种方式可以解决多线程并发问题,但因为cas的是当前map对象,所以同一时刻还是只有一个线程能cas成功,而对于引入CounterCell数组,cas的是当前线程对应在数组中特定位置的元素,也就是说如果位置不冲突,n个长度的CounterCell数组是可以支持n个线程同时cas成功的。
总结:以数组的形式去分散线程,防止多个线程去操作属性只有一个线程能够成功,从而导致浪费其他线程的资源,进而提高并发效率。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//首先if判断counterCells为空并且对当前map对象cas操作baseCount + x成功,就跳过if里的操作,
//因为都cas操作baseCount + x成功了,就不需要通过counterCells辅助了,简单明了。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//如果上面判断失败了,counterCells不为空 或者counterCells为空但cas失败了。
//如果counterCells为空,直接执行fullAddCount。
//如果不为空,判断当前线程在counterCells中的槽位是否为空,如果不为空,
//对槽位中的CounterCell对象cas操作value加1,成功return,失败执行fullAddCount,如果槽位为空,直接执行fullAddCount
if (as == null || (m = as.length - 1) < 0 ||
//ThreadLocalRandom.getProbe()就相当于是 [当前线程的哈希值]
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
//cas对CounterCell对象中的value执行+x(也就是+1)操作
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
//判断size+1后数组是否需要扩容。
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
fullAddCount代码块
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//如果当前线程hash值==0,则重新生成一个hash值。
//当前线程生成的hash值不会改变
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
//循环
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//如果counterCells已经被初始化了
if ((as = counterCells) != null && (n = as.length) > 0) {
//如果当前线程对应于counterCell数组中的槽位为空
if ((a = as[(n - 1) & h]) == null) {
//如果没有其他线程操作对应counterCell数组中的槽位
if (cellsBusy == 0) {
//counterCell数组中为空的槽位中创建一个CounterCell对象
CounterCell r = new CounterCell(x);
//如果将cellsBusy成功修改成1
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try {
//再次校验有没有其他线程操作该数组槽位
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
//标记创建成功
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//wasUncontended一直为true
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//如果当前线程对应槽位已经存在CounterCell元素了,就对value+x
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//不扩容条件
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
//为扩容做条件
else if (!collide)
collide = true;
//一个线程循环两次,都没有加1成功的情况下,数组扩容
//在collide = true的情况下,数组扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
//扩展数组,长度变为两倍
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//当前线程重新生成的hash值
h = ThreadLocalRandom.advanceProbe(h);
}
//如果counterCells 没有被初始化,
// cellsBusy==0表示没有其他线程在使用counterCells数组。
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//表示该数组已经有线程在用
boolean init = false;
try {
//初始化数组
if (counterCells == as) {
//初始化长度为2
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
//表示当前线程已经操作完成该数组
cellsBusy = 0;
}
if (init)
break;
}
//如果都不满足,则线程自旋去竞争baseCount和CounterCell
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
以上是关于JDK1.8中的ConcurrentHashMap的主要内容,如果未能解决你的问题,请参考以下文章
ConcurrentHashMap在jdk1.7和jdk1.8中的不同
JDK1.8中的HashMap.HashTable, ConcurrentHashMap有什么区别?
Java并发编程总结4——ConcurrentHashMap在jdk1.8中的改进