hashmap&concurrenthashMap
Posted carey_OoO
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hashmap&concurrenthashMap相关的知识,希望对你有一定的参考价值。
让HashMap陷入死循环
多线程下HashMap陷入死循环的场景,get/put都会陷入死循环的风险
public class HashMapDead {
private static Logger logger = Logger.getLogger("hashMapDead");
private static long Cycle = 100000;
public static void main(String[] args) throws InterruptedException {
Map<String,String> map = new HashMap<String,String>();
CountDownLatch latch = new CountDownLatch(1);
int i = 0;
for (; i < 3; i++) {
new Thread(new Write(map,i*Cycle,latch)).start();
}
new Thread(new Read(map,(i)*Cycle,latch)).start();
latch.countDown();
System.out.println(" main end");
}
private static class Write implements Runnable {
private Map<String, String> map;
private long startIdx;
private CountDownLatch latch;
public Write(Map map, long start, CountDownLatch latch) {
this.map = map;
this.startIdx = start;
this.latch = latch;
}
/**
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
logger.warning(e.toString());
}
System.out.println(Thread.currentThread().getName() + "recover");
long end = startIdx + Cycle;
for (; startIdx < end; startIdx++) {
String tmp = String.valueOf(startIdx);
map.put(tmp, tmp);
}
System.out.println(Thread.currentThread().getName() + "over");
}
}
private static class Read implements Runnable {
private Map<String, String> map;
private long size;
private CountDownLatch latch;
public Read(Map map, long end, CountDownLatch latch) {
this.map = map;
this.size = end;
this.latch = latch;
}
/**
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
logger.warning(e.toString());
}
System.out.println(Thread.currentThread().getName() + "recover");
while (true) {
Random random = new Random();
long l = random.nextLong() % size - 1;
String result = map.get(String.valueOf(l));
System.out.println(System.currentTimeMillis() + ": " + result);
}
}
}
}
ConcurrentHashMap
- 内部结构 基于jdk1.7
final Segment<K,V>[] segments;
采用了segment[]数组 分段,segment类似于一个HashMap。segment数组是不可以增长的,在默认情况下是为16。在每个segment中HashEntry[]数组是可以增长的,和hashMap的resize一样。但是HashEntry中的resize是安全的,不会出现死循环的情况
ConcurrentHashMap通过两次散列,把数据存储到HashEntry中。第一散列定位到哪个segment,第二次散列到segment中哪个HashEntry。hash算法决定着ConcurrentHashMap的效率,目的是把key均匀地散列到segment和HashEntry中。
put 写上锁
* 先定位到某一个segment,
* 然后操作特定的segment.put(此处会有锁;不同的segment,锁都独立)
Segment继承了ReentrantLock,在每次修改segment的时候都会上锁
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
//定位到那个HashEntry,
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
//插入键值对,这个方法是自己抽取的
putKeyValue();
} finally {
unlock();
}
return oldValue;
}
scanAndLockForPut(key,hash,value)
的作用获取到锁,如果元素不存在,则先创建出一个HashEntry,供后续使用。这是在首次trylock失败的时候才会进行,猜测的目的就是为了创建出不存在的HashEntry。
scanAndLockForPut方法的代码
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {//第一次检索,遇到了相同key,或者检索完链表,将retries设置为0
if (e == null) {//链表的最后一个元素
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {//超过后直接堵塞线程,让出计算能力
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {//检查被resize了,重新检索链表,HashEntry[]是volatile的,有可见性保证,此处的代码才可行
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
putKeyValue这段代码在执行的时候线程就已经获取到了锁()
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
//找到了相同的key
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
//对于putIfAbsent的支持
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
//遍历完了整个链表没有找到元素
else {
//在scanAndLockForPut中创建出来了
if (node != null)
node.setNext(first);//直接放在链表的最前面,链表很适合实现stack
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
//如果当前segment的数量超过了阈值,且segment中HashEntry数组的长度没有超过最大长度,重hash,此时node在原来的HashEntry[]并访问不了
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);//改了HashEntry[]中第index个元素,而且是插入在头部,必须写回
++modCount;
count = c;
oldValue = null;
break;
}
rehash(node)函数 这个是将segment中table扩大两倍,并且将node放到新的table中。
rehash是在锁的保护中执行的(在put的内部),所以是安全。并且不会修改旧的table的结构,这是不会死循环的原因
/**
* Doubles size of table and repacks entries, also adding the
* given node to new table
*/
@SuppressWarnings("unchecked")
//
private void rehash(HashEntry<K,V> node) {
/*
* Reclassify nodes in each list to new table. Because we
* are using power-of-two expansion, the elements from
* each bin must either stay at same index, or move with a
* power of two offset. We eliminate unnecessary node
* creation by catching cases where old nodes can be
* reused because their next fields won‘t change.
* Statistically, at the default threshold, only about
* one-sixth of them need cloning when a table
* doubles. The nodes they replace will be garbage
* collectable as soon as they are no longer referenced by
* any reader thread that may be in the midst of
* concurrently traversing table. Entry accesses use plain
* array indexing because they are followed by volatile
* table write.
*/
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;//新的容量扩大两倍
threshold = (int)(newCapacity * loadFactor);//扩容因子是决定下一次rehash的大小,容量的占比
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
//找到能复用的最后那段
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
//将剩下的链表重新添加到新的table中(只会涉及到两个HashEntry,因为是x2 扩容)
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
//对于老的table rehash完成,加入新的node,直接加载链表的头
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
remove 操作
Concurrenthashmap先是找到了哪一个segment,然后委派给了segment.remove(锁保护)。定位segment中哪个HashEntry,接下来就是链表操作。
count和modCount虽然都是在锁保护的情况下修改的,但是读都没用到锁
/**
* Remove; match on key only if value null, else match both.
*/
final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);//感觉这里的scanAndLock没有任何作用,还是直接lock好了
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);//定位到那个hashEntry,开头
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)//如果是链表头,直接将第二个元素设为表头
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
get 读无锁,弱一致性
先定位到哪个segment,然后用定位到segment中的哪个HashEntry,遍历查找。因为是无锁的,所以无法保证线程A先调用put(k1,v1),线程B后调用get(k1)一定能取到v1。这个是concurrentHashmap的设计决定的,将锁封装在内部(各个segment上),对ConcurrentHashMap整个加锁就变成了hashTable。如果要保证强一致性,就只能使用HashTable,或者Collections.synchronizedMap,一个装饰器
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
size
size会先尝试无锁的计算整个map的size,将所有的segment中的count加起来,如果连续两次计算出来的值都相同就认为map在这个时间窗口内没有做过修改。如果尝试3次失败后就直接锁住所有的segments,累加所有的count
/**
* Returns the number of key-value mappings in this map. If the
* map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
*
* @return the number of key-value mappings in this map
*/
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn‘t retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
containsXxx
1. containsKey
实现方式和get()原理相同,先定位那个segment,然后定位哪个HashEntry,然后遍历
2. containsValue
实现方式和size()原理类似,检索value只能遍历整个map。
containsKey实现,结构上和get一样
public boolean containsKey(Object key) {
Segment<K,V> s; // same as get() except no need for volatile value read
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return true;
}
}
return false;
}
containsValue会先在无锁的情况下遍历整个map多次,如果连续两次map的modCount没变就认为这个map没被修改过。直接结束了,否则就会锁住全部的segment遍历。这里也体现了尽量无锁的操作
public boolean containsValue(Object value) {
// Same idea as size()
if (value == null)
throw new NullPointerException();
final Segment<K,V>[] segments = this.segments;
boolean found = false;
long last = 0;
int retries = -1;
try {
outer: for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
long hashSum = 0L;
int sum = 0;
for (int j = 0; j < segments.length; ++j) {
HashEntry<K,V>[] tab;
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null && (tab = seg.table) != null) {
for (int i = 0 ; i < tab.length; i++) {
HashEntry<K,V> e;
for (e = entryAt(tab, i); e != null; e = e.next) {
V v = e.value;
if (v != null && value.equals(v)) {
found = true;
break outer;
}
}
}
sum += seg.modCount;
}
}
if (retries > 0 && sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return found;
}
HashEntry的resize安全,HashMap的死循环
hashmap中resize的代码
决定是否要resize的标准是当前的size是否超过了table[]的长度 * loadFactor,默认的loadFactor是0.75,那就意味着HashMap还没有填充满就要被重新散列,不知道为什么? ConcurrentHashMap中segment的设计也是如此。
resize关键代码
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}
Entry[] newTable = new Entry[newCapacity];
transfer(newTable, initHashSeedAsNeeded(newCapacity));
table = newTable;
threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1);
}
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);
e.next = newTable[i];//这里改变了原先的结构,如果出现了回环就会出现死循环
newTable[i] = e;
e = next;
}
}
}
HashMap中Entry的next不是volatile的,没有可见性保证。如果把next设置为volatile就能可以保证不是死循环。如果每次也使用new的方式 也能避免出现回环。但是HashMap使用的场景就是单线程。一次volatile的读写是普通的45倍左右,在自己的机子上测试。 table的size是2的整数次幂,是为了将取模操作直接转换成位与,加快速度。2^n-1刚好为为n个1
public V put(K key, V value) {
if (table == EMPTY_TABLE) {
inflateTable(threshold);
}
if (key == null)
return putForNullKey(value);
int hash = hash(key);
int i = indexFor(hash, table.length);
//查找是否已经存在了,这里如果并发写入也可能会出现死循环。resize的时候出现了回环
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
modCount++;
addEntry(hash, key, value, i);
return null;
}
在HashMap在put的时候也会出现死循环,因为先会判断当前的key是否已经存在,也会遍历table[i]上的链表。
ConcurrentHashMap中segment中rehash时,不能复用的都是直接new出来的,所以不会打乱之前segment中table[i]中顺序,不会出现回环。
迭代器实现 弱一致性
提供了一个基础的迭代器,供key,value,entry的迭代器使用。基本思路是遍历全部的segements和每个segment中的HashEntrys。都是倒序遍历(i–的方式),当HashEntry或者Segment遍历完,就跳到他的上一个。因为这个是不同步的,所以可能在遍历的时候可能会出现比真实的少值(rehash扩容的时候);如果i++,可能会出现一个键值对在迭代器被出现了两次,也是在rehash的时候出现的。
abstract class HashIterator {
int nextSegmentIndex;
int nextTableIndex;
HashEntry<K,V>[] currentTable;
HashEntry<K, V> nextEntry;
HashEntry<K, V> lastReturned;
HashIterator() {
nextSegmentIndex = segments.length - 1;
nextTableIndex = -1;
advance();
}
/**
* Set nextEntry to first node of next non-empty table
* (in backwards order, to simplify checks).
*/
final void advance() {
for (;;) {
if (nextTableIndex >= 0) {
if ((nextEntry = entryAt(currentTable,
nextTableIndex--)) != null)
break;
}
else if (nextSegmentIndex >= 0) {
Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);
if (seg != null && (currentTable = seg.table) != null)
nextTableIndex = currentTable.length - 1;
}
else
break;
}
}
final HashEntry<K,V> nextEntry() {
HashEntry<K,V> e = nextEntry;
if (e == null)
throw new NoSuchElementException();
lastReturned = e; // cannot assign until after null check
if ((nextEntry = e.next) == null)
advance();
return e;
}
public final boolean hasNext() { return nextEntry != null; }
public final boolean hasMoreElements() { return nextEntry != null; }
public final void remove() {
if (lastReturned == null)
throw new IllegalStateException();
ConcurrentHashMap.this.remove(lastReturned.key);
lastReturned = null;
}
}
keyIterator,valueIterator, EntryIterator都是继承这个父类,在他的基础上实现了next方法就ok了(其实就是调用父类的nextEntry.xxx)。内部类可以共享外部类的属性,迭代器就是用了这个特点。用了final类不允许继承。
HashMap的迭代器可以理解为强一致性的,因为map在迭代遍历的时候,如果被修改了就会抛出CurrentModifyException。具体实现是基础迭代器的构造函数中传入一个当前map的modCount记为pre,迭代器在取下一个值的时候会检查当前的modCount和pre是否一致
final Entry<K,V> nextEntry() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
Entry<K,V> e = next;
if (e == null)
throw new NoSuchElementException();
if ((next = e.next) == null) {//一个entry的链表遍历完,换到下一个
Entry[] t = table;
while (index < t.length && (next = t[index++]) == null)
;
}
current = e;
return e;
}
sun.misc.Unsafe
很多操作都使用了Unsafa这个类,具体的用法待补充
Segment内部属性
volitole HashEntry[] table;这个设置为volatile是为了保证可见性,一写多读,在修改table的时候都是有锁保护的,保证了竞态条件;volatile保证了写了立即对读线程生效。
int count; 这两个或许设置成volatile也更加合适,虽然在真正计算的时候会有重试保证,但因为不是volatile也一定能保证,重试失败,会全部加锁segment,分别读取count,后解锁
int modCount; 在迭代器中是不管modCount的,增删改操作会立即在迭代器生效,并且无感知,不像HashMap会有异常抛出
以上是关于hashmap&concurrenthashMap的主要内容,如果未能解决你的问题,请参考以下文章
HashMap与ConcurrentHashMap的区别(转)