多线程(十五ConcurrentHashMap原理二类和方法分析)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程(十五ConcurrentHashMap原理二类和方法分析)相关的知识,希望对你有一定的参考价值。

ConcurrentHashMap的构造
ConcurrentHashMap,采用了一种“懒加载”的模式,只有到首次插入键值对的时候,才会真正的去初始化table数组。

构造方法:

1、空构造函数,默认桶大小16
技术图片
2、指定桶初始容量的构造器,必须是2次幂值

/**
 * 指定table初始容量的构造器.
 * tableSizeFor会返回大于入参(initialCapacity + (initialCapacity >>> 1) + 1)的  最小2次幂值
 */
public ConcurrentHashMap(int initialCapacity) 
    if (initialCapacity < 0)
        throw new IllegalArgumentException();

    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));

    this.sizeCtl = cap;

3、根据已有的Map构造
4、指定table初始容量和负载因子的构造器
5、指定table初始容量、负载因子、并发级别的构造器

常用字段介绍

/**
 * 最大容量.
 */
private static final int MAXIMUM_CAPACITY = 1 << 30;

/**
 * 默认初始容量
 */
private static final int DEFAULT_CAPACITY = 16;

/**
 * 负载因子,为了兼容JDK1.8以前的版本而保留。
 * JDK1.8中的ConcurrentHashMap的负载因子恒定为0.75
 */
private static final float LOAD_FACTOR = 0.75f;

/**
 * 链表转树的阈值,即链接结点数大于8时, 链表转换为树.
 */
static final int TREEIFY_THRESHOLD = 8;

/**
 * 树转链表的阈值,即树结点树小于6时,树转换为链表.
 */
static final int UNTREEIFY_THRESHOLD = 6;

/**
 * 在链表转变成树之前,还会有一次判断:
 * 即只有桶大小数量大于MIN_TREEIFY_CAPACITY,才会发生转换。
 * 这是为了避免在Table建立初期,多个键值对恰好被放入了同一个链表中而导致不必要的转化。
 */
static final int MIN_TREEIFY_CAPACITY = 64;

/**
 * 在树转变成链表之前,还会有一次判断:
 * 即只有桶的数量小于MIN_TRANSFER_STRIDE,才会发生转换.
 */
private static final int MIN_TRANSFER_STRIDE = 16;

/**
 * 用于在扩容时生成唯一的随机数.
 */
private static int RESIZE_STAMP_BITS = 16;

/**
 * 可同时进行扩容操作的最大线程数.
 */
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

static final int MOVED = -1;                    // 标识ForwardingNode结点
static final int TREEBIN = -2;                 // 标识红黑树的根结点
static final int RESERVED = -3;             // 标识ReservationNode结点()
static final int HASH_BITS = 0x7fffffff;    // usable bits of normal node hash

/**
 * CPU核心数,扩容时使用
 */
static final int NCPU = Runtime.getRuntime().availableProcessors();

/**
 * Node数组,标识整个Map,首次插入元素时创建,大小总是2的幂次.
 */
transient volatile Node<K, V>[] table;

/**
 * 扩容后的新Node数组,只有在扩容时才非空.
 */
private transient volatile Node<K, V>[] nextTable;

/**
 * 控制table的初始化和扩容.
 * 0  : 初始默认值
 * -1 : 有线程正在进行table的初始化
 * >0 : table初始化时使用的容量,或初始化/扩容完成后的threshold
 * -(1 + nThreads) : 记录正在执行扩容任务的线程数
 */
private transient volatile int sizeCtl;

/**
 * 扩容时需要用到的一个下标变量.
 */
private transient volatile int transferIndex;

/**
 * 计数基值,当没有线程竞争时,计数将加到该变量上。类似于LongAdder的base变量
 */
private transient volatile long baseCount;

/**
 * 计数数组,出现并发冲突时使用。类似于LongAdder的cells数组
 */
private transient volatile CounterCell[] counterCells;

/**
 * 自旋标识位,用于CounterCell[]扩容时使用。类似于LongAdder的cellsBusy变量
 */
private transient volatile int cellsBusy;

put方法

/**
 * 插入键值对,<K,V>均不能为null.
 */
public V put(K key, V value) 
    return putVal(key, value, false);
/**
     * 实际的插入操作
     *
     * @param onlyIfAbsent true:仅当key不存在时,才插入
     */
    final V putVal(K key, V value, boolean onlyIfAbsent) 
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());  // 再次计算hash值

        /**
         * 使用链表保存时,binCount记录table[i]这个桶中所保存的节点数;
         * 使用红黑树保存时,binCount==2,保证put后更改计数值时能够进行扩容检查,同时不触发红黑树化操作
         */
        int binCount = 0;

        for (Node<K, V>[] tab = table; ; )             // 自旋插入结点,直到成功
            Node<K, V> f;
            int n, i, fh;
            if (tab == null || (n = tab.length) == 0)                   // CASE1: 首次初始化table —— 懒加载
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)     // CASE2: table[i]对应的桶为null
                // 注意下上面table[i]的索引i的计算方式:[ key的hash值 & (table.length-1) ]
                // 这也是table容量必须为2的幂次的原因,读者可以自己看下当table.length为2的幂次时,(table.length-1)的二进制形式的特点 —— 全是1
                // 配合这种索引计算方式可以实现key的均匀分布,减少hash冲突
                if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null))) // 插入一个链表结点
                    break;
             else if ((fh = f.hash) == MOVED)                          // CASE3: 发现ForwardingNode结点,说明此时table正在扩容,则尝试协助数据迁移
                tab = helpTransfer(tab, f); // 迁移数据方法
            else                                                       // CASE4: 出现hash冲突,也就是table[i]桶中已经有了结点
                V oldVal = null;
                synchronized (f)               // 锁住table[i]结点
                    if (tabAt(tab, i) == f)    // 再判断一下table[i]是不是第一个结点, 防止其它线程的写修改
                        if (fh >= 0)           // CASE4.1: table[i]是链表结点
                            binCount = 1;
                            for (Node<K, V> e = f; ; ++binCount) 
                                K ek;
                                // 找到“相等”的结点,判断是否需要更新value值
                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) 
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                
                                Node<K, V> pred = e;
                                if ((e = e.next) == null)      // “尾插法”插入新结点
                                    pred.next = new Node<K, V>(hash, key,
                                            value, null);
                                    break;
                                
                            
                         else if (f instanceof TreeBin)   // CASE4.2: table[i]是红黑树结点
                            Node<K, V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) 
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            
                        
                    
                
                if (binCount != 0) 
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);     // 链表 -> 红黑树 转换
                    if (oldVal != null)         // 表明本次put操作只是替换了旧值,不用更改计数值
                        return oldVal;
                    break;
                
            
        
        addCount(1L, binCount);             // 计数值加1
        return null;
      

putVal一共有4种情况

1、首次插入第一个值,初始化table

private final Node<K, V>[] initTable() 
    Node<K, V>[] tab;
    int sc;
    while ((tab = table) == null || tab.length == 0)   //自旋直到初始化成功
        if ((sc = sizeCtl) < 0)         // sizeCtl<0 说明table已经正在初始化/扩容
            Thread.yield();
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1))   // 将sizeCtl更新成-1,表示正在初始化中
            try 
                if ((tab = table) == null || tab.length == 0) 
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);     //  0.75n,负载因子
                
             finally 
                sizeCtl = sc;               // 设置threshold = 0.75 * table.length
            
            break;
        
    
    return tab;

2、table[i]对应的桶为空,直接占用table[i]

3、ForwardingNode结点,说明此时table正在扩容,则尝试协助进行数据迁移

4、table[i]桶中已经有了结点,hash冲突了,有2种情况

4.1 当table[i]的结点类型为Node——链表结点时,就会将新结点以“尾插法”的形式插入链表的尾部。
4.2 当table[i]的结点类型为TreeBin——红黑树代理结点时,就会将新结点通过红黑树的插入方式插入。
/**
 * 尝试进行 链表 -> 红黑树 的转换.
 */
private final void treeifyBin(Node<K, V>[] tab, int index) 
    Node<K, V> b;
    int n, sc;
    if (tab != null) 

        // CASE 1: table的容量 < MIN_TREEIFY_CAPACITY(64)时,直接进行table扩容,不进行红黑树转换
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);

            // CASE 2: table的容量 ≥ MIN_TREEIFY_CAPACITY(64)时,进行链表 -> 红黑树的转换
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) 
            synchronized (b) 
                if (tabAt(tab, index) == b) 
                    TreeNode<K, V> hd = null, tl = null;

                    // 遍历链表,建立红黑树
                    for (Node<K, V> e = b; e != null; e = e.next) 
                        TreeNode<K, V> p = new TreeNode<K, V>(e.hash, e.key, e.val, null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    
                    // 以TreeBin类型包装,并链接到table[index]中
                    setTabAt(tab, index, new TreeBin<K, V>(hd));
                
            
        
    

get方法

/**
 * 根据key查找对应的value值
 *
 * @return 查找不到则返回null
 */
public V get(Object key) 
    Node<K, V>[] tab;
    Node<K, V> e, p;
    int n, eh;
    K ek;
    int h = spread(key.hashCode());     // 重新计算key的hash值
    if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) 
        if ((eh = e.hash) == h)        // CASE1、table[i]就是待查找的项,直接返回
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
         else if (eh < 0)              //CASE2、hash值<0, 说明遇到非链表结点, 调用对应节点的find方法查找
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null)   //始终可以按照链表方式查找
            if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        
    
    return null;

对于CASE2,重点看一下TreeBin结点的查找

1、TreeBin的查找

ConcurrentHashMap采用了一种类似读写锁的方式:当线程持有写锁(修改红黑树)时,如果读线程需要查找,不会像传统的读写锁那样阻塞等待,而是转而以链表的形式进行查找(TreeBin本身时Node类型的子类,所有拥有Node的所有字段)
/**
 * 从根结点开始遍历查找,找到“相等”的结点就返回它,没找到就返回null
 * 当存在写锁时,以链表方式进行查找
 */
final Node<K, V> find(int h, Object k) 
    if (k != null) 
        for (Node<K, V> e = first; e != null; ) 
            int s;
            K ek;
            /**
             * 两种特殊情况下以链表的方式进行查找:
             * 1. 有线程正持有写锁,这样做能够不阻塞读线程
             * 2. 有线程等待获取写锁,不再继续加读锁,相当于“写优先”模式
             */
            if (((s = lockState) & (WAITER | WRITER)) != 0) 
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                e = e.next;     // 链表形式
            

            // 读线程数量加1,读状态进行累加
            else if (U.compareAndSwapInt(this, LOCKSTATE, s, s + READER)) 
                TreeNode<K, V> r, p;
                try 
                    p = ((r = root) == null ? null :
                        r.findTreeNode(h, k, null));
                 finally 
                    Thread w;
                    // 如果当前线程是最后一个读线程,且有写线程因为读锁而阻塞,则唤醒写线程,尝试获取写锁
                    if (U.getAndAddInt(this, LOCKSTATE, -READER) == (READER | WAITER) && (w = waiter) != null)
                        LockSupport.unpark(w);
                
                return p;
            
        
    
    return null;

以上是关于多线程(十五ConcurrentHashMap原理二类和方法分析)的主要内容,如果未能解决你的问题,请参考以下文章

多线程(十四ConcurrentHashMap原理一节点)

Java多线程系列:ConcurrentHashMap的实现原理(JDK1.7和JDK1.8)

Java多线程系列:ConcurrentHashMap的实现原理(JDK1.7和JDK1.8)

Java多线程核心技术演进ConcurrentHashMap—Java进阶

面试必问之 ConcurrentHashMap 线程安全的具体实现方式

十二ConcurrentHashMap的实现原理解析