十二ConcurrentHashMap的实现原理解析
Posted 学习JAVA的海贼王
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了十二ConcurrentHashMap的实现原理解析相关的知识,希望对你有一定的参考价值。
一、ConcurrentHashMap在多线程中的作用
在并发编程中,如果使用HashMap那是线程不安全的会导致死循环,而线程安全的HashTable效率又特别低下,他把自己是所有操作都加了锁,虽然线程安全,但是获取不到锁的线程只能陷入阻塞,大大影响了性能。
ConcurrentHashMap的优点是因为他采用了锁分段技术,将数据分成一段一段,通过给每一段来上锁,这样当两个线程过来,访问的是不同段的数据,就不会出现类似HashTable争抢锁而陷入阻塞的问题,因为两个线程获得的是不同的锁,大大提高了效率。
二、ConcurrentHashMap的结构
1、类图:
2、结构图:
3、结构总结:
从上可知,ConcurrentHashMap通过维护内部的Segment数组,而每个Segment就是一个ReentrantLock并且存放了多个HashEntry键值对,这样就实现了锁分段技术。
三、ConcurrentHashMap构造分析:
1、构造方法,我们直接分析参数最全的构造方法,因为无论哪个构造方法最后都会调用这个
//initialCapacity ConcurrentHashMap的容量
//loadFactor 每个Segment中的HashTable数量的负载因子
//concurrencyLevel 根据他来确定Segment数组的长度
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//用来确定Segment数组的长度,不能超过MAX_SEGMENTS,最大为16
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
int sshift = 0;
int ssize = 1;
//concurrencyLevel用来算出segment的长度,为了到时定位准确,那么
//segment的长度ssize的值为2的n次方,所以如果size的值为大于concurrenyLevel
//的最小的那个数,例如concurrencyLevel为12,13,14,那么segment的长度为16
//也就是有16个锁
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//在Segment的再次散列时使用
this.segmentShift = 32 - sshift;
// 在Segment的再次散列时使用
this.segmentMask = ssize - 1;
//ConcurrentHashMap的初始化容量
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
//每个segment存放HashEntry的个数,默认为2
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
//原子操作,构造出这个类型的Segment数组。
UNSAFE.putOrderedObject(ss, SBASE, s0);
//最后构造出ConcurrentHashMap中的Segment数组
this.segments = ss;
}
//Segment构造方法
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
//负载因子
this.loadFactor = lf;
//每个HashEntry实际存储的键值对个数
this.threshold = threshold;
//每个HashEntry数组
this.table = tab;
}
2、默认情况下:ConcurrentHashMap的Segment数组长度为16,每个Segment中存放的HashEntry为2,负载因子为0.75。
四、ConcurrentHashMap的操作
1、put操作的代码分析:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
//上述方法定位到了,要使用哪个Segment
if ((s = (Segment<K,V>)UNSAFE.getObject
(segments, (j << SSHIFT) + SBASE)) == null)
// 当要使用的Segment为空时
s = ensureSegment(j);
//难道获取的Segment进行添加
return s.put(key, hash, value, false);
}
/****如果获取到的Segment的位置,在数组中不存在这个方法对ConcurrentHashMap
/***中的Segments进行扩容/
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
//获取现在已经有的Segments数组
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
//检查有没有存在,因为可能有别的线程先创建了
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
//复制已经存在Segments数组中每个Segments的一些属性
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
//在次检查,还是怕在这个过程中有线程已经创建
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
//CAS保证安全的将创建的Segment放到Segments中去
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
//返回创建的Segment
return seg;
}
/*********将值放入到Segment中的HashEntry中***********/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//在这个位置开始对当前Segment上锁,如果获取锁成功则=null
//否则不停的去获取锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
//如果直接获取锁
HashEntry<K,V>[] tab = table;
//根据&运算获取该存到HashEntry数组中的哪个HashEntry中
int index = (tab.length - 1) & hash;
//响应位置的HashEntry
HashEntry<K,V> first = entryAt(tab, index);
//调用死循环一个节点节点的去对比
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
//如果key值和value值已经hash值都相同
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
//! 仅设置缺省值=不设置缺省值
if (!onlyIfAbsent) {
//直接设置值
e.value = value;
++modCount;
}
//打破循环
break;
}
//继续到下一个结点
e = e.next;
}
else {
//进入到这段代码的条件是,确实有这个HashEntry但是HashEntry里面一个
//元素也没有
//如果node不等于null,则将node添加到HashEntry中
//这个地方设计的很巧妙,就是当这个线程尝试获取锁失败之后,他
//他并不是什么也不干,而是先构造出来这个node,然后再去尝试获取锁,
//等自己获得成功获取锁之后,就不用在此构建这个node了
if (node != null)
node.setNext(first);
else
//构造一个node
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//HashEntry的长度已经超过了现有的长度,hashEntry的长度小于要求的
//最大长度,进行扩容,每次扩容的长度是现在长度的2倍
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
//如果没有超过,则直接将当前node放入到HashEntry中
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
//经典的计算hash算法
private int hash(Object k) {
int h = hashSeed;
if ((0 != h) && (k instanceof String)) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
2、put操作的语言描述
上面排版比较乱,如果看不懂可以看语言描述
第一步:判断插入的值是否为空,如果为空则直接抛错,如果不为空进入第二步
第二步:通过Wang/Jenkins的hash算法,传入一个key,根据key来获得一个hash值
第三步:将这个hash值进行再次散列,获取到将要存入Segment数组中的哪个Segment数组中
第四步:如果获取到的Segment对象为空,表示数组中没有这个Segment,那么新增一个Segment,否则直接进入第五步
第五步:以为已经进入到Segment了,所以需要上锁,如果获取锁成功,那么直接进行第六步。否则死循环不断的尝试获取锁,并且在尝试过程中先去创建(这里叫预创建一个node),直到获取锁后,跳出循环,返回自己创建的node,进入第六步
第六步:在继续用&运算,算出将要存到HashEntry数组中的哪个HashEntry元素,将这个HashEntry元素取出,则循环判断添加的K,进入第七步
第七步-1:HashEntry不为空的情况下,看看是不是在HashEntry中已经存在,如果已经存在,根据是否设置缺省值这个字段确定是否替换掉旧值,返回旧值,进入第九步,如果不存在,进入到第七步-2。
第七步-2:如果HashEntry为空,则先判断自己是否创建了node(如果直接获取到了是不会预创建这个node的),如果node不为空的话,则直接将当前node存放在已经存在的HashEntry前面。如果为空构造一个node。进入到第八步
第八步:判断HashEntry是否需要扩容,如果需要扩容则将当前HashEntry扩为两倍,并重新进行散列计算。然后将老数据以及新数据重新放到数组中。如果不需要扩容,则直接将当前node放到相应位置的HashEntry中并更新,当前这个HashEntry数组中的对应位置的HashEntry元素
第九步:释放Segment的锁。
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//根据获取到的散列值,看看是否存在这个Segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//存在的话,在继续散列HashEntry数组,获取数组中对应的HashEntry
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
//判断键为key的值,然后返回对应的value
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
//不存在的话返回null
return null;
}
4、get操作的语言描述:
第一步:通过Wang/Jenkins的hash算法,传入一个key,根据key来获得一个hash值
第二步:将这个hash值进行再次散列,获取到将要存入Segment数组中的哪个Segment数组中
第三步:如果数组中这个位置没有Segment,则直接返回空,否则进入到第四步
第四步:因为是读,所以并没有上锁,根据Segment的散列值到HashEntry数组中进行在此散列,获取数组中对应位置的HashEntry
第五步:循环遍历这个HashEntry最后返回键值对K等于传入K的value值
5、size操作的代码:
public int size() {
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow;
//每次算的sum值
long sum;
//最后一次的size值
long last = 0L;
int retries = -1;
try {
//死循环判断,每次对retries(尝试次数)+1
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock();
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
//当他们两个相等时,则返回size
//这种情况只发生对ConcurrentHashMap进行2次以内包括
//2次的无锁循环
//如果2次无锁循环的值都相等,则直接返回,否则进行有锁循环
if (sum == last)
break;
last = sum;
}
} finally {
//当尝试次数已经超过允许的不上锁次数时,说明已经上锁了
//需要循环释放。
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
6、size操作的语言描述
第一步:获取当前ConcurrentHashMap中的Segment数组,定义一个循环次数计数器,一个每次的值,和一个最终值
第二步:尝试进行两次不上锁的循环获取size,如果相等则返回这个size,如果不相等,则进行加锁循环获取size。
第三歩:如果加锁了,则需要循环释放每个Segment的锁。
总结:获取size的方式就是先不加锁循环两次,比较两次的值是否相等,如果相等则直接返回,如果不相等,那么加锁重新循环一次。
以上是关于十二ConcurrentHashMap的实现原理解析的主要内容,如果未能解决你的问题,请参考以下文章