多线程进阶学习12------ConcurrentHashMap详解

Posted 四维大脑

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程进阶学习12------ConcurrentHashMap详解相关的知识,希望对你有一定的参考价值。

JDK 7 HashMap 并发死链

注意:要在 JDK 7 下运行,否则扩容机制和 hash 的计算方法都变了

import java.io.*;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.StringTokenizer;
import java.util.concurrent.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;

public class Test 


    public static void main(String[] args) 
        // 测试 java 7 中哪些数字的 hash 结果相等
        System.out.println("长度为16时,桶下标为1的key");
        for (int i = 0; i < 64; i++) 
            if (hash(i) % 16 == 1) 
                System.out.println(i);
            
        
        System.out.println("长度为32时,桶下标为1的key");
        for (int i = 0; i < 64; i++) 
            if (hash(i) % 32 == 1) 
                System.out.println(i);
            
        
        // 1, 35, 16, 50 当大小为16时,它们在一个桶内
        final HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
        // 放 12 个元素
        map.put(2, null);
        map.put(3, null);
        map.put(4, null);
        map.put(5, null);
        map.put(6, null);
        map.put(7, null);
        map.put(8, null);
        map.put(9, null);
        map.put(10, null);
        map.put(16, null);
        map.put(35, null);
        map.put(1, null);
        System.out.println("扩容前大小[main]:" + map.size());
        new Thread() 
            @Override
            public void run() 
                // 放第 13 个元素, 发生扩容
                map.put(50, null);
                System.out.println("扩容后大小[Thread-0]:" + map.size());
            
        .start();
        new Thread() 
            @Override
            public void run() 
                // 放第 13 个元素, 发生扩容
                map.put(50, null);
                System.out.println("扩容后大小[Thread-1]:" + map.size());
            
        .start();
    

    final static int hash(Object k) 
        int h = 0;
        if (0 != h && k instanceof String) 
            return sun.misc.Hashing.stringHash32((String) k);
        
        h ^= k.hashCode();
        h ^= (h >>> 20) ^ (h >>> 12);
        return h ^ (h >>> 7) ^ (h >>> 4);
    


死链复现

调试工具使用 idea

在 HashMap 源码 590 行加断点

int newCapacity = newTable.length;

断点的条件如下,目的是让 HashMap 在扩容为 32 时,并且线程为 Thread-0 或 Thread-1 时停下来

newTable.length==32 &&
 (
 Thread.currentThread().getName().equals("Thread-0")||
 Thread.currentThread().getName().equals("Thread-1")
 )

断点暂停方式选择 Thread,否则在调试 Thread-0 时,Thread-1 无法恢复运行

运行代码,程序在预料的断点位置停了下来,输出

长度为16时,桶下标为1的key 
1 
16 
35 
50 
长度为32时,桶下标为1的key 
1 
35 
扩容前大小[main]:12

接下来进入扩容流程调试

在 HashMap 源码 594 行加断点

Entry<K,V> next = e.next; // 593
if (rehash) // 594
// ...

这是为了观察 e 节点和 next 节点的状态,Thread-0 单步执行到 594 行,再 594 处再添加一个断点(条件
Thread.currentThread().getName().equals("Thread-0"))

这时可以在 Variables 面板观察到 e 和 next 变量,使用 view as -> Object 查看节点状态

e (1)->(35)->(16)->null 
next (35)->(16)->null

在 Threads 面板选中 Thread-1 恢复运行,可以看到控制台输出新的内容如下,Thread-1 扩容已完成

newTable[1] (35)->(1)->null
扩容后大小:13

这时 Thread-0 还停在 594 处, Variables 面板变量的状态已经变化为

e (1)->null 
next (35)->(1)->null

为什么呢,因为 Thread-1 扩容时链表也是后加入的元素放入链表头,因此链表就倒过来了,但 Thread-1 虽然结果正确,但它结束后 Thread-0 还要继续运行

接下来就可以单步调试(F8)观察死链的产生了

下一轮循环到 594,将 e 搬迁到 newTable 链表头

newTable[1] (1)->null 
e (35)->(1)->null 
next (1)->null

下一轮循环到 594,将 e 搬迁到 newTable 链表头

newTable[1] (35)->(1)->null 
e (1)->null 
next null

再看看源码

e.next = newTable[1];
// 这时 e (1,35)
// 而 newTable[1] (35,1)->(1,35) 因为是同一个对象
newTable[1] = e; 
// 再尝试将 e 作为链表头, 死链已成
e = next;
// 虽然 next 是 null, 会进入下一个链表的复制, 但死链已经形成了

源码分析

HashMap 的并发死链发生在扩容时

void transfer(Entry[] newTable, boolean rehash)  
	 int newCapacity = newTable.length;
	 for (Entry<K,V> e : table) 
	 while(null != e) 
		 Entry<K,V> next = e.next;
		 // 1 处
		 if (rehash) 
		 	e.hash = null == e.key ? 0 : hash(e.key);
		 
		 int i = indexFor(e.hash, newCapacity);
		 // 2 处
		 // 将新元素加入 newTable[i], 原 newTable[i] 作为新元素的 next
		 e.next = newTable[i];
		 newTable[i] = e;
		 e = next;
		 
	 

假设 map 中初始元素是

原始链表,格式:[下标] (key,next)
[1] (1,35)->(35,16)->(16,null)
线程 a 执行到 1 处 ,此时局部变量 e 为 (1,35),而局部变量 next 为 (35,16) 线程 a 挂起
线程 b 开始执行
第一次循环
[1] (1,null)
第二次循环
[1] (35,1)->(1,null)
第三次循环
[1] (35,1)->(1,null)
[17] (16,null)
切换回线程 a,此时局部变量 e 和 next 被恢复,引用没变但内容变了:e 的内容被改为 (1,null),而 next 的内容被改为 (35,1) 并链向 (1,null)
第一次循环
[1] (1,null)
第二次循环,注意这时 e 是 (35,1) 并链向 (1,null) 所以 next 又是 (1,null)
[1] (35,1)->(1,null)
第三次循环,e 是 (1,null),而 next 是 null,但 e 被放入链表头,这样 e.next 变成了 352 处)
[1] (1,35)->(35,1)->(1,35)
已经是死链了

究其原因,是因为在多线程环境下使用了非线程安全的 map 集合
JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能
够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)

JDK 8 ConcurrentHashMap

重要属性和内部类

// 默认为 0
// 当初始化时, 为 -1
// 当扩容时, 为 -(1 + 扩容线程数)
// 当初始化或扩容完成后,为 下一次的扩容的阈值大小
private transient volatile int sizeCtl;
// 整个 ConcurrentHashMap 就是一个 Node[]
static class Node<K,V> implements Map.Entry<K,V> 
// hash 表
transient volatile Node<K,V>[] table;
// 扩容时的 新 hash 表
private transient volatile Node<K,V>[] nextTable;
// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
static final class ForwardingNode<K,V> extends Node<K,V> 
// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V> 
// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V> 
// 作为 treebin 的节点, 存储 parent, left, right
static final class TreeNode<K,V> extends Node<K,V> 

重要方法

// 获取 Node[] 中第 i 个 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
 
// cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)
 
// 直接修改 Node[] 中第 i 个 Node 的值, v 为新值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)

构造器分析
可以看到实现了懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) 
	 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
	 throw new IllegalArgumentException();
	 if (initialCapacity < concurrencyLevel) // Use at least as many bins
	 initialCapacity = concurrencyLevel; // as estimated threads
	 long size = (long)(1.0 + (long)initialCapacity / loadFactor);
	 // tableSizeFor 仍然是保证计算的大小是 2^n, 即 16,32,64 ... 
	 int cap = (size >= (long)MAXIMUM_CAPACITY) ?
	 MAXIMUM_CAPACITY : tableSizeFor((int)size);
	 this.sizeCtl = cap;

get 流程

public V get(Object key) 
	 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
	 // spread 方法能确保返回结果是正数
	 int h = spread(key.hashCode());
	 if ((tab = table) != null && (n = tab.length) > 0 &&
		 (e = tabAt(tab, (n - 1) & h)) != null) 
		 // 如果头结点已经是要查找的 key
		 if ((eh = e.hash) == h) 
			 if ((ek = e.key) == key || (ek != null && key.equals(ek)))
			 return e.val;
		 
	 	// hash 为负数表示该 bin 在扩容中或是 treebin, 这时调用 find 方法来查找
		 else if (eh < 0)
			 return (p = e.find(h, key)) != null ? p.val : null;
		 // 正常遍历链表, 用 equals 比较
		 while ((e = e.next) != null) 
			 if (e.hash == h &&
				 ((ek = e.key) == key || (ek != null && key.equals(ek))))
				 return e.val;
		 
	 
	 return null;

put 流程

    final V putVal(K key, V value, boolean onlyIfAbsent) 
        if (key == null || value == null) throw new NullPointerException();
        // 其中 spread 方法会综合高位低位, 具有更好的 hash 性
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K, V>[] tab = table; ; ) 
            // f 是链表头节点
            // fh 是链表头结点的 hash
            // i 是链表在 table 中的下标
            Node<K, V> f;
            int n, i, fh;
            // 要创建 table
            if (tab == null || (n = tab.length) == 0)
                // 初始化 table 使用了 cas, 无需 synchronized 创建成功, 进入下一轮循环
                tab = initTable();
                // 要创建链表头节点
            

多线程并发的学习

package com.example.lib;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedLongSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class MyLock implements Lock {
private Help help = new Help();

@Override
public void lock() {
help.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
help.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
return help.tryAcquire(1);
}

@Override
public boolean tryLock(long l, TimeUnit timeUnit) throws InterruptedException {
return help.tryAcquireNanos(1,timeUnit.toNanos(l));
}

@Override
public void unlock() {
help.release(1);
}

@Override
public Condition newCondition() {
return help.newCondition() ;
}

private class Help extends AbstractQueuedLongSynchronizer{
@Override
protected boolean tryAcquire(long l) {
//第一个线程进来,可以拿到锁,因此我们可以返回true

//如果第二个线程进来,拿不到锁,返回false
//如果当前进来的线程和当前保存的线程是同一个线程,则可以拿到锁的

//如何判断是第一个线程还是其他线程
long state = getState();
Thread thread = Thread.currentThread();
if(state == 0){
if(compareAndSetState(0, l)){
setExclusiveOwnerThread(thread);
return true;
}
}else if (getExclusiveOwnerThread() == thread){
setState(state+1);
return true;
}
return false;
}

@Override
protected boolean tryRelease(long l) {
//锁的获取和释放一一对应的,调用该线程一定是当前线程
if(Thread.currentThread() != getExclusiveOwnerThread()){
return false;
}

long state = getState() - l;
boolean flag = false;
if(state == 0){
setExclusiveOwnerThread(null);
flag = true;
}
setState(state);
return flag;
}

Condition newCondition(){
return new ConditionObject();
}
}
}






package com.example.lib;

public class MyClass {
private int value = 0;
private MyLock lock = new MyLock();

public void a(){
lock.lock();
System.out.println("a");
b();
lock.unlock();
}

public void b(){
lock.lock();
System.out.println("b");
lock.unlock();
}

public int next(){
lock.lock();
try {
Thread.sleep(300);
return value++;
} catch (InterruptedException e) {
throw new RuntimeException();
}finally {
lock.unlock();
}
}

public static void main(String[] args){
final MyClass myClass = new MyClass();
new Thread(new Runnable() {
@Override
public void run() {
while (true){
myClass.a();
//System.out.println(Thread.currentThread().getId()
//+ "" + myClass.next());
}
}
}).start();

}
}

以上是关于多线程进阶学习12------ConcurrentHashMap详解的主要内容,如果未能解决你的问题,请参考以下文章

Python学习笔记——进阶篇第八周———Socket编程进阶&多线程多进程

java进阶学习--java多线程

多线程进阶学习12------ConcurrentHashMap详解

Python学习笔记——进阶篇第八周———CPU运行原理与多线程

[多线程进阶]CAS与Synchronized基本原理

go语言学习笔记 — 进阶 — 并发编程:轻量级线程goroutine —— 并发与并行