源码解析ConcurrentHashMap
Posted 空方块
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码解析ConcurrentHashMap相关的知识,希望对你有一定的参考价值。
本文基于JDK1.7
ConcurrentHashMap包含(segmentMask+1)个Segment,而每个Segment包含若干个HashEntry,各Segment写操作互不影响,实现了分段锁,提供了比Hashtable更好的并发性。
1.构造函数
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel)
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)//@part 1
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments,@part 2
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel)
++sshift;
ssize <<= 1;
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)//@part 3
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0],@part 4
Segment s0 =
new Segment(loadFactor, (int)(cap * loadFactor),
(HashEntry[])new HashEntry[cap]);
Segment[] ss = (Segment[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
Params:
initialCapacity:整个Concurrent初始化的初步容量,并不代表传进去多少,就初始化多少空间,@part 3进行第一步的纠正
static final int MAXIMUM_CAPACITY = 1 << 30;
奇怪的是1<<30而不是(Integer.MAX_VALUE=0x7fffffff),因为创建HashEntry的长度为cap,计算Map的长度public int size()返回的int,
因此有0x7fffffff > 1 << 30 >= cap * ssize >= initialCapacity,而ssize=2^n,Max(cap * ssize)=1 << 30 ,保证了不溢出。初始化的Map长度都是为(int)ceil(log2 initialCapacity),默认DEFAULT_INITIAL_CAPACITY=16。
loadFactor:Segment使用于重新初始化HashEntry的长度。默认DEFAULT_LOAD_FACTOR=0.75f。
concurrencyLevel:用于控制Segment[]的长度。
ssize是Segment[]的长度,ssize=2^n >= concurrencyLevel,默认DEFAULT_CONCURRENCY_LEVEL=16。
Note:
@part 1:
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
MAX_SEGMENTS是略显保守的值,作为Segment[]的最大长度,@Doug Lea应该有作出相应的估算测试的。
@part 2:计算segmentShift、segmentMask,用于定位hash定位到位置,常量,final修饰了,详细后面叙述。
@part 3:计算每个Segment需要初始化多大的HashEntry[cap],同时保证整个Map的size不超过int的上限。
@part 4:初始化Segment[],并通过UNSAFE来存放Segment[0]的Segment,使用putOrderedObject()为了性能,因为初始化肯定只有一条线程在处理,不需要考虑到其他线程。要使其他线程可见,可以使用putObjectVolatile(),但是这里没必要。Segment[]初始化之后,就不能改变了,因为使用final修饰,能够重新resize的是Segment里面的HashEntry[]。
2.Method get(Object key)
public V get(Object key)
Segment s; // manually integrate access methods to reduce overhead
HashEntry[] tab;
int h = hash(key);//@part1
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;//@part2
if ((s = (Segment)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null)
for (HashEntry e = (HashEntry) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) //@part3
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
return null;
@part 1:使用Wang/Jenkins 散列算法,抵御key散列码在较低或较高的碰撞。
private int hash(Object k)
int h = hashSeed;
if ((0 != h) && (k instanceof String))
return sun.misc.Hashing.stringHash32((String) k);
h ^= k.hashCode();
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
@part 2:用key来确定Hashtable在哪个Segment
(h >>> segmentShift) & segmentMask
取高(ssize-1)位,并作掩码。
Class sc = Segment[].class;
SBASE = UNSAFE.arrayBaseOffset(sc);
ss = UNSAFE.arrayIndexScale(sc);
SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
SSHIFT得出:log2 index,即(1 << SSHIFT)一个Segment在Segment[]的偏移地址。
SBASE得出:Segment[]数组换算因子。
((h >>> segmentShift) & segmentMask) << SSHIFT+ SBASE得出了该Segment在Segment[]的偏移地址
@part 3:定位HashEntry,并返回。
Class tc = HashEntry[].class;
TBASE = UNSAFE.arrayBaseOffset(tc);
ts = UNSAFE.arrayIndexScale(tc);
TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
HashEntry e = (HashEntry) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)
由偏移地址定位HashEntry,再通过e = e.next来逐一比对,返回结果。HashEntry在这里是一个链表,而不是单一一个HashEntry,HashEntry[]在在这里相当于二维。
Note:get方法在这里并没有使用到加锁,是为了提供并发性,但是存在弱一致性,下面解析。
2.Method put(K key, V value)
public V put(K key, V value)
Segment s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);//@part 1
return s.put(key, hash, value, false);//@part 2
@part 1:如果该Segment还不存在,就创建,初始化的时候只初始化了Segment[0]。
private Segment ensureSegment(int k)
final Segment[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment seg;
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null)
Segment proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry[] tab = (HashEntry[])new HashEntry[cap];
if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null) // recheck
Segment s = new Segment(lf, threshold, tab);
while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u))
== null)
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))//@Note
break;
return seg;
Note:使用Segment[0]作为模板,来初始化该Segment。这里并没有做加锁,而是使用了CAS。
@part 2:put操作。
final V put(K key, int hash, V value, boolean onlyIfAbsent)
HashEntry node = tryLock() ? null :
scanAndLockForPut(key, hash, value);//@NOTE2.1
V oldValue;
try
HashEntry[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry first = entryAt(tab, index);
for (HashEntry e = first;;)
if (e != null)
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k)))
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
++modCount;
break;
e = e.next;
else
if (node != null)
node.setNext(first);
else
node = new HashEntry(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);//@NOTE2.2
else
setEntryAt(tab, index, node);//@NOTE2.3
++modCount;
count = c;
oldValue = null;
break;
finally
unlock();
return oldValue;
@NOTE2.1:循环尝试加锁,尝试加锁次数有上限,上限如下,达到该值,使用lock()。可见当可用线程大于1时,采用64次循环,因为可能存在多条线程并发争夺资源,单条线程就没必要循环那么多次去循环获取锁,因为只是单条线程切换上下文处理而已,直接使用lock即可。
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
@NOTE2.2:HashEntry[] table达到了阀值,需要重新构造一个新的,直接覆盖原引用,方法get里面的@part3使用到了table.length,这里如果在get执行到@part3这一行的时候,该table发生put,引起table的rehash,table的引用发生了变化,put进去的元素也是该次get所不可见的,get方法使用的HashEntry[]的旧引用,拿到的只可能是旧的元素。这是get方法的弱一致性的原因之一。
@NOTE2.3:把新加入的元素放进table里面,该元素成为table该角标下的第一个元素,旧的元素成为新的元素的next。这里使用的是UNSAFE.putOrderedObject,放入的元素,不是立刻可见的。这也是get方法的弱一致性的原因之一。
3.Method boolean remove(Object key, Object value)
public boolean remove(Object key, Object value)
int hash = hash(key);
Segment s;
return value != null && (s = segmentForHash(hash)) != null &&//@part1
s.remove(key, hash, value) != null;//@part2
@part 1:通过掩码定位Segment。
@part 2:
final V remove(Object key, int hash, Object value)
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try
HashEntry[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry e = entryAt(tab, index);
HashEntry pred = null;
while (e != null)
K k;
HashEntry next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k)))
V v = e.value;
if (value == null || value == v || value.equals(v))
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
break;
pred = e;
e = next;
finally
unlock();
return oldValue;
加锁,将该节点的前引用和后引用都去掉,就去掉了该节点了。
4.Method public int size()
public int size()
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try
for (;;)
if (retries++ == RETRIES_BEFORE_LOCK) //@part 1
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j)
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null)
sum += seg.modCount;//@part 2
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
if (sum == last)//@part 3
break;
last = sum;
finally
if (retries > RETRIES_BEFORE_LOCK)
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
return overflow ? Integer.MAX_VALUE : size;
Params:
RETRIES_BEFORE_LOCK:自旋尝试次数。
static final int RETRIES_BEFORE_LOCK = 2;
@part 1:自旋次数大于RETRIES_BEFORE_LOCK,就进行加锁,进行计算CHM的大小。避免在高频率插入或删除的环境下,死循环。
@part 2:Segment的修改版本号seg.modCount和Segment的元素个数seg.count都是transient int 修饰的,修改这俩个值的时候,都是在加锁的状态下修改的。modCount只会自增,不会自减。
@part 3:自旋俩次,如果本次的结果跟上次的结果sum(修改版本号)一致,则跳出循环,返回大小。
NOTE:在非高频率插入或删除的环境,size()一般使用不加锁就可以得出结果。这是不错的实现!
5.summarize
1.在无锁的状态下,实现多线程可见性。
1.1.使用Unsafe方法。C++实现。
1.2.自旋使用volatile版本号比对。不使用volatile修饰,可以在性能上能提升,但是准确性并不能保证。
1.3.加锁。最暴力的方法。
2.CHM的存储数据上限能超过int的上限。
3.CHM的get存在弱一致性,注意使用环境。
4.CHM在修改数据时,会锁住当前数据所在的Segment,然后才对数据进行修改,修改版本号递增。
以上是关于源码解析ConcurrentHashMap的主要内容,如果未能解决你的问题,请参考以下文章