ConcurrentHashMap 源码浅析 1.7
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentHashMap 源码浅析 1.7相关的知识,希望对你有一定的参考价值。
- 简介
(1) 背景
HashMap死循环:HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry.
HashTable效率低下:HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下.因为当一个线程访问HashTable的同步方法,其它线程也访问HashTable的同步方法时,会进入阻塞或轮询状态.如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法获取元素,所以竞争越激烈效率越低.
(2) 简介
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么多线程访问容器里不同的数据段时,线程间不会存在竞争,从而可以有效提高并发访问效率,这就是ConcurrentHash所使用的锁分段技术.首先将数据分成一段一段地储存,然后给每一段配一把锁,当一个线程占用锁访问其中一段数据时,其它段的数据也能被其它线程访问. -
结构
ConcurrentHash是由Segments数组结构和HashEntry数组结构组成.Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的色;HashEntry则用于存储键值对数据.一个ConcurrentHashMap里包含一个Segment组.Segment的结构和HashMap类似,是一种数组加链表的结构.一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护者一个HashEntry数组里面的元素,当对HashEntry数组的数据进行修改时,必须先获得与它对应的Segment锁,如下图所示.
-
基本成员
default_initial_capacitymap默认容量,必须是2的冥/** * 默认的初始容量 16 */ static final int DEFAULT_INITIAL_CAPACITY = 16;
default_load_factor默认负载因子(存储的比例)
/** * 默认的负载因子 */ static final float DEFAULT_LOAD_FACTOR = 0.75f;
default_concurrency_level默认并发数量,segments数组量
/** * 默认的并发数量,会影响segments数组的长度 */ static final int DEFAULT_CONCURRENCY_LEVEL = 16;
maximum_capacitymap最大容量
/** * 最大容量,构造ConcurrentHashMap时指定的值超过,就用该值替换 * ConcurrentHashMap大小必须是2^n,且小于等于2^30 */ static final int MAXIMUM_CAPACITY = 1 << 30;
min_segment_table_capacityHashEntry[]默认容量
/** * 每个segment中table数组的长度,必须是2^n,至少为2 */ static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
max_segments最大并发数,segments数组最大量
/** * 允许最大segment数量,用于限定concurrencyLevel的边界,必须是2^n */ static final int MAX_SEGMENTS = 1 << 16;
retries_before_lock重试次数,在加锁之前
/** * 非锁定情况下调用size和contains方法的重试次数,避免由于table连续被修改导致无限重试 */ static final int RETRIES_BEFORE_LOCK = 2;
segmentMask计算segment位置的掩码(segments.length-1)
/** * 用于segment的掩码值,用于与hash的高位进行取& */ final int segmentMask;
segmentShift
/** * 用于算segment位置时,hash参与运算的位数 */ final int segmentShift;
segmentssegment数组
/** * segments数组 */ final Segment<K,V>[] segments;
HashEntry存储数据的链式结构
static final class HashEntry<K,V> { // hash值 final int hash; // key final K key; // 保证内存可见性,每次从内存中获取 volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } /** * 使用volatile语义写入next,保证可见性 */ final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); }
Segment继承ReentrantLock锁,用于存放HashEntry[]
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; /** * 对segment加锁时,在阻塞之前自旋的次数 * */ static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; /** * 每个segment的HashEntry table数组,访问数组元素可以通过entryAt/setEntryAt提供的volatile语义来完成 * volatile保证可见性 */ transient volatile HashEntry<K,V>[] table; /** * 元素的数量,只能在锁中或者其他保证volatile可见性之间进行访问 */ transient int count; /** * 当前segment中可变操作发生的次数,put,remove等,可能会溢出32位 * 它为chm isEmpty() 和size()方法中的稳定性检查提供了足够的准确性. * 只能在锁中或其他volatile读保证可见性之间进行访问 */ transient int modCount; /** * 当table大小超过阈值时,对table进行扩容,值为(int)(capacity *loadFactor) */ transient int threshold; /** * 负载因子 */ final float loadFactor; /** * 构造方法 */ Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; }
- 构造方法
有参构造/** * ConcurrentHashMap 构造方法 * @param initialCapacity 初始化容量 * @param loadFactor 负载因子 * @param concurrencyLevel 并发segment,segments数组的长度 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); // 大于最大segments容量,取最大容量 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments // 2^sshift = ssize 例如:sshift = 4,ssize = 16 // 根据concurrencyLevel计算出ssize为segments数组的长度 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { // 第一次 满足 ++sshift; // 第一次 1 ssize <<= 1; // 第一次 ssize = ssize << 1 (1 * 2^1) } // segmentShift和segmentMask的定义 this.segmentShift = 32 - sshift; // 用于计算hash参与运算位数 this.segmentMask = ssize - 1; // segments位置范围 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // 计算每个segment中table的容量 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // HashEntry[]默认 容量 int cap = MIN_SEGMENT_TABLE_CAPACITY; // 确保cap是2^n while (cap < c) cap <<= 1; // create segments and segments[0] // 创建segments并初始化第一个segment数组,其余的segment延迟初始化 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
无参构造使用默认参数
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
} - 基本方法
一些UNSAFE方法
HashEntry
setNext/** * 使用volatile语义写入next,保证可见性 */ final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); }
entryAt get HashEntry
/** * 获取给定table的第i个元素,使用volatile读语义 */ static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { return (tab == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); }
setEntryAt set HashEntry
/** * 设置给定的table的第i个元素,使用volatile写语义 */ static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) { UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e); }
put 插入元素
- 总结
以上是关于ConcurrentHashMap 源码浅析 1.7的主要内容,如果未能解决你的问题,请参考以下文章
java并发:jdk1.8中ConcurrentHashMap源码浅析