ConcurrentHashmap核心源码分析

Posted 可持续化发展

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentHashmap核心源码分析相关的知识,希望对你有一定的参考价值。

以下内容为本人学习小刘老师的源码课程后(哔哩哔哩搜索,小刘讲源码),自己整理的笔记。仅供自己学习之用。

    concurrenthashmap的扩容,数据迁移的动作是从下往上走的(以数组来看)。这样的好处是为了避免和迭代操作发生冲突。比如说,我在迭代访问concurrenthashmap的数据。现在从下往上迁移到数组的第11个节点了,我读到第10个桶位了。我要读第11个节点,就发现拿到了一个FWD节点,接着就到新表上去读。如果迁移是从0号桶位开始往下走的话,可能会和迭代操作产生冲突。
    concurrenthashmap是一个高并发的容器。如果concurrenthashmap的统计采用AtomicLong的话,会有CAS竞争,性能会很低。所以采用了LongAdder,有化整为零的思想。

常量Constants

  • 散列表数组最大限制
    private static final int MAXIMUM_CAPACITY = 1 << 30;

  • 散列表默认值
    private static final int DEFAULT_CAPACITY = 16;

  • 负载因子,JDK1.8中 ConcurrentHashMap 是固定值
    private static final float LOAD_FACTOR = 0.75f;

  • 树化阈值,指定桶位 链表长度达到8的话,有可能发生树化操作。
    static final int TREEIFY_THRESHOLD = 8;

  • 红黑树转化为链表的阈值
    static final int UNTREEIFY_THRESHOLD = 6;

  • 联合TREEIFY_THRESHOLD控制桶位是否树化,只有当table数组长度达到64且 某个桶位 中的链表长度达到8,才会真正树化
    static final int MIN_TREEIFY_CAPACITY = 64;

  • 线程迁移数据最小步长,控制线程迁移任务最小区间的一个值
    多个线程同时去扩容时,给每个线程分配的每次扩容任务最小的区间参数,桶位的跨度
    private static final int MIN_TRANSFER_STRIDE = 16;

  • 扩容相关,计算扩容时 生成的一个 标识戳
    private static int RESIZE_STAMP_BITS = 16;

  • 65535 表示参与并发扩容时最大线程数
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

  • 当node节点的hash值为 -1 时,表示当前节点为FWD节点
    static final int MOVED = -1; // hash for forwarding nodes

  • 当node节点的hash值为 -2 时,表示当前节点已经树化,且当前节点为TreeBin对象,TreeBin对象代理操作红黑树
    static final int TREEBIN = -2; // hash for roots of trees
    static final int RESERVED = -3; // hash for transient reservations

  • 0x7fffffff -> 0111 1111 1111 1111 1111 1111 1111 1111 它可以将一个负数,通过位与运算后得到一个正数,但不是取绝对值
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

  • 当前系统的CPU数量
    static final int NCPU = Runtime.getRuntime().availableProcessors();

成员属性Fields

* 散列表,长度一定是2次方数
 */
transient volatile Node<K,V>[] table;

* The next table to use; non-null only while resizing.
 * 扩容过程中,会将第一个触发扩容的线程新创建的更大的table 赋值给nextTable 来保持引用
 * (因为是并发扩容的,得让其他线程知道将数据迁移到哪里),扩容结束之后,会把新表赋给table,并把nextTable被设置为Null
 * 将老数据迁移到新表中
 */
private transient volatile Node<K,V>[] nextTable;

* LongAdder 中的 baseCount 未发生竞争时 或者 当前LongAdder处于加锁状态时,增量累到到baseCount中
 */
private transient volatile long baseCount;

* sizeCtl < 0
 * 1. -1 表示当前table正在初始化(有线程在创建table数组),当前线程需要自旋等待..
 * 2. 表示当前table数组正在进行扩容 ,16位表示:扩容的标识戳   低16位表示:(1 + nThread) 
 * 其中,nThread为当前参与并发扩容的线程数量
 * sizeCtl = 0,表示创建table数组时 使用DEFAULT_CAPACITY为大小
 * sizeCtl > 0
 * 1. 如果table未初始化,表示初始化大小
 * 2. 如果table已经初始化,表示下次扩容时的 触发条件(阈值),即下次扩容的扩容阈值。0.75*数组长度
 */
private transient volatile int sizeCtl;

* 迁移的当前下标。并发扩容过程中,迁移数据,为了协调多个线程之间扩容工作,在对象级别用这个属性来记录当前数据迁移工作的进度。
 * 并发扩容过程中,记录当前进度。所有线程都需要从transferIndex中分配区间任务,去执行自己的任务。
 */
private transient volatile int transferIndex;

 * LongAdder中的cellsBuzy 0表示当前LongAdder对象无锁状态,1表示当前LongAdder对象加锁状态
 */
private transient volatile int cellsBusy;

* LongAdder中的cells数组,当baseCount发生竞争后,会创建cells数组,
 * 线程会通过计算hash值 取到 自己的cell ,将增量累加到指定cell中
 * 获取当前散列表中元素个数,这是一个期望值 = sum(cells) + baseCount
 */
private transient volatile CounterCell[] counterCells;

静态代码块

// Unsafe mechanics
private static final sun.misc.Unsafe U;
/**表示sizeCtl属性在ConcurrentHashMap中内存偏移地址,知道concurrenthashmap对象的基础地址后,加上这个偏移量,就可以访问sizectl的内存了*/
private static final long SIZECTL;
/**表示transferIndex属性在ConcurrentHashMap中内存偏移地址*/
private static final long TRANSFERINDEX;
/**表示baseCount属性在ConcurrentHashMap中内存偏移地址*/
private static final long BASECOUNT;
/**表示cellsBusy属性在ConcurrentHashMap中内存偏移地址*/
private static final long CELLSBUSY;
/**表示cellValue属性在CounterCell中内存偏移地址*/
private static final long CELLVALUE;
/**表示数组第一个元素的偏移地址, 一个数组内存块拿到后,它内存前部分有一个信息头部,这个头部之后,才是第一个数组元素的内存。abase为信息头部的偏移量*/
private static final long ABASE;
private static final int ASHIFT;

static {
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ConcurrentHashMap.class;
        SIZECTL = U.objectFieldOffset
            (k.getDeclaredField("sizeCtl"));
        TRANSFERINDEX = U.objectFieldOffset
            (k.getDeclaredField("transferIndex"));
        BASECOUNT = U.objectFieldOffset
            (k.getDeclaredField("baseCount"));
        CELLSBUSY = U.objectFieldOffset
            (k.getDeclaredField("cellsBusy"));
        Class<?> ck = CounterCell.class;
        CELLVALUE = U.objectFieldOffset
            (ck.getDeclaredField("value"));
        Class<?> ak = Node[].class;
        ABASE = U.arrayBaseOffset(ak);
        //表示数组单元所占用空间大小,scale 表示Node[]数组中每一个单元所占用空间大小
        int scale = U.arrayIndexScale(ak);
        //scale 必须是2的次方数
        //1 0000 & 0 1111 = 0
        //用于判断scale 是不是2的次方数
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        //numberOfLeadingZeros() 这个方法是返回当前数值转换为二进制后,从高位到低位开始统计,看有多少个0连续在一块。
        //8 => 1000 numberOfLeadingZeros(8) = 28
        //4 => 100 numberOfLeadingZeros(4) = 29
        //ASHIFT = 31 - 29 = 2 ??
        //ASHIFT 是为了寻找Node[]数组的元素地址的。
        //ABASE + (5 << ASHIFT) 就可以访问数组中第5个元素了,之前是 ABASE + 5 * scale
        ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
    } catch (Exception e) {
        throw new Error(e);
    }
}

内部类

Node类部分分析

static class Node<K,V> implements Map.Entry<K,V> {
	//hash、key设置为final,一、是为了设置成功后,不能再改值。二是为了线程安全性,因为设为final就不能改了,只能读,只读不会出现线程安全性问题。
        final int hash;//当前节点的hash值
        final K key;
        //val、next用volatile修饰是 为了保证内存的可见性,
        volatile V val;
        volatile Node<K,V> next;//链表中指向下一个节点的引用

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        ........

}

TreeNode类部分分析

	/**
     * Nodes for use in TreeBins
     * 可以构成双向链表
     */
    static final class TreeNode<K,V> extends Node<K,V> {
    	TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        //双向链表的当前节点的前置引用
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
        Node<K,V> find(int h, Object k) {
            return findTreeNode(h, k, null);
        }
        .......
    }

ForwardingNode类部分分析

/**
     * A node inserted at head of bins during transfer operations.
     * 拿到这个节点后,说明当前table数组正在扩容,数据正在迁移中。
     * 读线程拿到这个节点,就调用find方法到新表中去查询
     * 写线程拿到这个节点,就要去帮concurrenthashmap做并发扩容
     */
    static final class ForwardingNode<K,V> extends Node<K,V> {
        //final 修饰 赋值之后就,不能再改了
        //指向更大的新创建的table
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);//MOVED=-1
            this.nextTable = tab;
        }

        Node<K,V> find(int h, Object k) {
            在nextTable上查找数据
            outer: for (;;) {
                //条件一:永远不成立
                //条件二:永远不成立
                //条件三:永远不成立
                //条件四:在新扩容表中 重新定位 hash 对应的头结点
                // 我在查询时发现老table的桶位中是 FWD节点。FWD节点指向一个更大的新table。我就去新table数组中定位桶位,发现新桶位是null。
                //  有可能数据迁移完后,有其他线程把数据删了。也有可能老表的桶位本来就是null。
                //true -> 1.在oldTable中 对应的桶位在迁移之前就是null
                //        2.扩容完成后,有其它写线程,将此桶位设置为了null
                if (k == null || tab == null || (n = tab.length) == 0 ||
                    (e = tabAt(tab, (n - 1) & h)) == null)
                    return null;
                
                //前置条件:扩容后的新表 对应hash的桶位一定不是null,e为此桶位的头结点
                //e可能为哪些node类型?
                //1.node 类型
                //2.TreeBin 类型
                //3.FWD 类型
                for (;;) {
                    //eh 新扩容后表指定桶位的当前节点的hash
                    //ek 新扩容后表指定桶位的当前节点的key
                    int eh; K ek;                    
                    
                    if ((eh = e.hash) == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        //扩容后的新表的命中桶位的节点 和 查询的key、hash都一致,返回当前节点
                        //不一定是桶位头节点,因为下面有e = e.next,e引用会指向下一个节点
                        return e;
                    //eh<0 ,表示当前节点可能为
                    //1.TreeBin 类型    2.FWD类型(新扩容的表,在并发很大的情况下,可能在此方法 再次拿到FWD类型..)
                    if (eh < 0) {
                        if (当前节点为ForwardingNode){
                            //如果在老表a中拿到FWD节点,就会去大的新表b查找数据。
                            //如果在新表b中,再次拿到FWD节点,说明并发量比较大,就会去更大的新表c中查找数据。
                            tab = ((ForwardingNode<K,V>)e).nextTable;
                            continue outer;
                        }
                        else{
                            //此桶位 为 TreeBin 节点,使用TreeBin.find 查找红黑树中相应节点。
                            return e.find(h, k);
                        }
                    }
                    //前置条件:当前节点的key、hash 和查询的key、hash不一致,并且此桶位是 链表桶位
                    //1.将当前节点的引用e 指向链表中的下一个元素
                    //2.判断此时的引用e 是否为空.
                    //如果为空,则说明遍历完链表后,我都没有找到和查询key、hash都一致的节点,return null  
                    //如果不空,则自旋,再次比较key和hash                 
                    if ((e = e.next) == null)
                        return null;
                }
            }
        }        
    }

内部小方法源码分析

static final int spread(int h)

 * 1100 0011 1010 0101 0001 1100 0001 1110
 * 0000 0000 0000 0000 1100 0011 1010 0101
 * 1100 0011 1010 0101 1101 1111 1011 1011
 * ---------------------------------------
 * 1100 0011 1010 0101 1101 1111 1011 1011
 * 0111 1111 1111 1111 1111 1111 1111 1111
 * 0100 0011 1010 0101 1101 1111 1011 1011
 */
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}
让hashcode的高16位 异或 hashcode,再位与 HASH_BITS。

tabAt方法

//通过 ((long)i << ASHIFT) + ABASE ,计算出指定元素的偏移量。这个方法是获取指定Node数组的指定索引的元素。到主存中获取数据
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

casTabAt方法

//该方法的作用是通过CAS的方式,给Node数组的指定元素设置值。Node<K,V> c 期望值,Node<K,V> v 新值
//如果期望值和内存中的值一致,则将内存中的值修改为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);//偏移量
}

setTabAt方法

static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

tableSizeFor方法

* 返回>=c的最小的2的次方数
 * c=28
 * n=27 => 0b 11011
 * 11011 | 01101 => 11111
 * 11111 | 00111 => 11111
 * ....
 * => 11111 + 1 =100000 = 32
 */
private static final int tableSizeFor(int c) {
    int n = c - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

resizeStamp方法

* 扩容时计算出一个扩容唯一标识戳,用来标识table数组是从多大扩容到多大的。
 * 多线程并发扩容,线程要拿到这个标识戳,且标识戳一致,才能参与到这个扩容中。
 * 16 -> 32
 * numberOfLeadingZeros(16) => 1 0000 =>27 =>0000 0000 0001 1011
 * |
 * (1 << (RESIZE_STAMP_BITS - 1)) => 1000 0000 0000 0000 => 32768
 * ---------------------------------------------------------------
 * 以上是关于ConcurrentHashmap核心源码分析的主要内容,如果未能解决你的问题,请参考以下文章

高阶源码分析:ConcurrentHashMap

Java并发集合类ConcurrentHashMap底层核心源码解析

ConcurrentHashMap源码简单分析

ConcurrentHashMap源码分析_JDK1.8版本

ConcurrentHashMap 源码分析

Java多线程核心技术演进ConcurrentHashMap—Java进阶