并发编程—6ConcurrentHashMap1.7 & 1.8
Posted codetree
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程—6ConcurrentHashMap1.7 & 1.8相关的知识,希望对你有一定的参考价值。
目录
6 ConcurrentHashMap jdk1.7
- hash算法的介绍
- 构造方法做了什么
- get方法做了什么
- put方法做了什么
- 动态扩容逻辑
6.1 预备知识
hash算法:
就是把任意长度的输入,通过散列算法,变换成固定长度的输出,该输出就是散列值。这种转换是一种压缩映射,也就是,散列值的空间通常远小于输入的空间,不同的输入可能会散列成相同的输出,所以不可能从散列值来唯一的确定输入值。简单的说就是一种将任意长度的消息压缩到某一固定长度的消息摘要的函数。在jdk1.7为了使得hash出来的值更加均匀,在concurrentHashMap里面使用了Wang/Jenkins hash算法再做了一次hash。
HashMap和HashTable的缺陷:
Hashmap多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
HashTable使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。
putIfAbsent() :没有这个值则放入map,有这个值则返回key本来对应的值。
6.2 jdk1.7原理和实现
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment实际继承自可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,每个Segment里包含一个HashEntry数组,我们称之为table,每个HashEntry是一个链表结构的元素。
面试常问:
ConcurrentHashMap实现原理是怎么样的或者问ConcurrentHashMap如何在保证高并发下线程安全的同时实现了性能提升?
答:ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hash table,只要多个修改操作发生在不同的段上,它们就可以并发进行。
6.3 源码
6.3.1 构造方法
initialCapacity:初始容量大小 ,默认16。
loadFactor, 扩容因子,默认0.75,当一个Segment存储的元素数量大于initialCapacity* loadFactor时,该Segment会进行一次扩容。
concurrencyLevel 并发度,默认16。并发度可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
//点击this进去
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
//参数校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
//设置ssize的值,ssize是segments数组的大小,这里取的是大于等于concurrencyLevel的2^n的一个值。
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//设置segmentShift和segmentMask的值
//sshift就是上面描述中的n值,默认情况下concurrencyLevel等于16,sshift就等于4
//因此默认情况下segmentShift的值就是28,这个值会参与hash运算。
//segmentMask是hash运算的掩码,默认情况下等于16-1=15,类似于网络中的子网掩码,segmentMask的二进制最后几位都是1,最大值是末尾16个1(65535)。
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
//初始化segment,其中cap是segment里面的HashEntry数组的长度。它取的是大于等于c(Map容量/ssize)的2^N的一个值
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
//创建segments和segments[0](这里面只初始化了一个segments数组中的第0个元素)。
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}
执行流程小结::
--初始化currentHashMap时,初始化map的容量,负载因子,并发等级等信息
--默认会在map里面新建一个segment数组,并且会只初始化segments数组中的第0个元素。
6.2.2 get方法
get操作是先定位到segment,然后再到segment中去获取对应的value值
定位segment和定位table后,依次扫描这个table元素下的的链表,要么找到元素,要么返回null。
在高并发下的情况下如何保证取得的元素是最新的?
答:用于存储键值对数据的HashEntry,在设计上它的成员变量value等都是volatile类型的,这样就保证别的线程对value值的修改,get方法可以马上看到。
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
int h = hash(key);
// 根据Segment的索引((h >>> segmentShift) & segmentMask)算出在Segment[]上的偏移量
// 默认情况下segmentShift为28,segmentMask为15(低位有1111),从而可以得到h的高四位的值。作为segment数组的索引。
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
//若segment存在则继续查找segment上面的table[]的索引位置
//根据table的索引((tab.length - 1) & h)算出在table[]上的偏移量,循环链表找出结果
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
//对象相等
//或者
//hash相等且key相等时判定同一个key。返回value
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
执行流程小结:
-- 1。定位segment的位置,通过segment的索引(h >>> segmentShift) & segmentMask)即通过hash的高位算出Segment[]上的偏移量。
-- 2。 根据table的索引((tab.length - 1) & h)算出table[]上的偏移量。
-- 3。 循环HashEntry链表直到找到结果。
-- 4。因为没有加锁如果在get的时候,同时有线程修改了hashEntry的值可能会出现获取不到真实的值。出现弱一致性的问题。
6.2.3 put方法
对于put()操作,前面的定位Segment的操作都是和put()相同的。找到Segment以后,然后对整个Segment加锁,然后再进行后续的操作
1。put方法
public V put(K key, V value) {
Segment<K,V> s;
//1.校验
if (value == null)
throw new NullPointerException();
int hash = hash(key);
// 2. 定位Segment,并判断其是否存在
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);//如果Segment不存在,则新建。
return s.put(key, hash, value, false);// 如果Segment存在,提交给Segment去处理。调用Segment.put方法
}
执行流程小结:
-- a 根据key算出的hash值,确定是属于哪个segement。判定segment是否存在。
-- b 如果对应的脚标segement存在,则提交Segment.put()去处理。
-- c 如果对应的脚标未存在,新建一个,通过cas操作设置到segement数组中去。
//2 ensureSegment 分析
@SuppressWarnings("unchecked")
private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // segment数组的偏移量
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // 使用 segment 0 作为原型,可以省略一些参数的计算
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);//创建Segment。使用cas创建直到成功
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
执行流程小结:
-- 根据segment数组的偏移量去获取看是否返回非null元素
-- 如果seg非null,直接返回
-- 如果seg为null,则使用segments数组第0个元素作为原型创建一个新的segment数组元素。并使用cas操作设置到segments数组
//3 Segment.put()方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//尝试加锁。首先尝试tryLock(),多次失败以后使用lock();同时会查找HashEntry,如果没有找到,创建并返回一个
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
// 循环定位链表中的HashEntry位置,然后执行变更
for (HashEntry<K,V> e = first;;) {
if (e != null) {// 查找key,若找到直接修改value值。如果找不到继续找下一个节点e.next
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {//到达链表尾部
if (node != null)//node如果不等于null,说明之前已经预热完成,可以直接插入
node.setNext(first);
else //
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//threshold 一般等于 (int)(capacity *loadFactor),默认是16 * 0.75 = 12。当table里面的链表长度大于12时,需要动态扩容。rehash()
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
执行流程小结:
-- a 因为put是修改操作,这里需要加锁。会先尝试加锁,如果加锁次数过多,则直接lock
-- b 先根据(tab.length - 1) & hash定位在table里面的位置
-- c 循环链表,如果存在相同的key值,就修改oldvalue,放回oldevalue。
-- c 如果不存在相同的key值,则设置。
// 4.scanAndLockForPut分析
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
//通过给定的segment和hash,定位table里hashentry的位置
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // 定位node时为负值
while (!tryLock()) {// 这里首先尝试使用tryLock(),达到最大重试次数MAX_SCAN_RETRIES(2次或者65次)后,转为lock()的阻塞操作
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) { //定位node的位置,若找不到则创建一个HashEntry。表明table里面一个HaseEntry都没有
if (node == null) // speculatively create node //到达链表尾部
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {//MAX_SCAN_RETRIES 1 or 64
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
// 如果加锁过程中,node有新增,则重新遍历链表,(这里可以解释对于链表的插入位置总是head的问题了)
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
执行流程小结:
-- a 尝试tryLock().达到最大重试次数MAX_SCAN_RETRIES后,转为lock()的阻塞操作.
-- b 通过给定的segment和hash,定位table的位置。返回第一个entry。
如果第一entry为null,则新建一个HashEntry。
如果第一个entry不为null。
4。size方法
ConcurrentHashMap的size()操作需要统计所有的Segment中的HashEntry数量,最大为Integer.MAX_VALUE。因为在统计个过程中,有可能出现多线程修改的问题。即便如此,ConcurrentHashMap首先会用无锁尝试3次,如果统计失败,再加锁统计。代码如下
估计的大概数量,不是精确数量。
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
// 首先尝试3次无锁的统计,如果失败,再进入加锁统计
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {// 加锁统计
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
// 尝试3次无锁统计,这里面通过统计前后的modCount值的和 变化,这个值在每个Segment中,每一次变更操作都会递增,类似于Segment的版本号
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
5。动态扩容 transfer??
> rehash也就是扩容操作,扩容之后的容量是之前的两倍,所以扩容之后的newCapacity也是2^n的一个值
private void rehash(HashEntry<K,V> node) {
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won't change.
* Statistically, at the default threshold, only about
* one-sixth of them need cloning when a table
* doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of
* concurrently traversing table. Entry accesses use plain
* array indexing because they are followed by volatile
* table write.
*/
/*
* 将table中每个节点重新分配到新的table中去。因为使用的是 *2的方式扩容,
* 每个元素在table中的索引要么为i(不变),要么是i+oldCapacity。
* 如:扩容前容量是16,当前HashEntry在table[]中的索引为3,则新的索引可能为3或者19。
* 在节点拷贝的过程中,有一些节点的next节点是不用调整的,就直接利用了。
* 据统计,在默认的threshold值时, 扩容只需要1/6的节点需要拷贝。
* 那些被替换掉的节点,在没有任何线程引用的时候,将会被GC回收。
* Entry accesses use plain array indexing because they are followed by volatile table write.
*/
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
// 若链表为单节点
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot // 重复利用一些扩容后,next不变的节点,这些节点在原先链表的尾部
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes // 对于next变化的节点重新计算hash(链表前面部分节点),然后重新插入
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node // 将需要put的新node插
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
-- 首先计算出newCapacity的容量;
-- 然后循环table[],重新分配每条链表上面的元素。因为使用的是 *2的方式扩容,每个元素在table中的索引要么为i(不变),要么是i+oldCapacity。如:扩容前容量是16,当前HashEntry在table[]中的索引为3,则新的索引可能为3或者19。
-- 拷贝过程中,如果为单链表则直接赋值;在节点拷贝的过程中,有一些节点的next节点是不用调整的(链表后端部分片段),就直接利用了;对于前端部分的片段,则重新hash,然后插入到对应的链表中。
-- 最后再将需要put进来的node,在扩容后的结构中插入。
6.弱一致性问题
8.ConcurrentHashMap jdk1.8
8.1与1.7相比的重大变化
- 取消了segment数组,直接用table保存数据,锁的粒度更小,减少并发冲突的概率。
- 存储数据时采用了链表+红黑树的形式,纯链表的形式时间复杂度为O(n),红黑树则为O(logn),性能提升很大。什么时候链表转红黑树?当key值相等的元素形成的链表中元素个数超过8个的时候。
8.2 主要数据结构和关键变量
Node类存放实际的key和value值。
8.3sizeCtl:
负数:表示进行初始化或者扩容,-1表示正在初始化,-N,表示有N-1个线程正在进行扩容
正数:0 表示还没有被初始化,>0的数,初始化或者是下一次进行扩容的阈值
TreeNode 用在红黑树,表示树的节点, TreeBin是实际放在table数组中的,代表了这个红黑树的根。
8.4 源码
8.4.1 初始化做了什么事?
只是给成员变量赋值,put时进行实际数组的填充
8.4.2 在get和put操作中,是如何快速定位元素放在哪个位置的?
get()方法
put()方法
数组的实际初始化
8.4.3 扩容操作
transfer()方法进行实际的扩容操作,table大小也是翻倍的形式,有一个并发扩容的机制。
以上是关于并发编程—6ConcurrentHashMap1.7 & 1.8的主要内容,如果未能解决你的问题,请参考以下文章