ConcurrentHashMap核心源码分析

Posted 可持续化发展

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentHashMap核心源码分析相关的知识,希望对你有一定的参考价值。

ConcurrentHashMap的TreeBin

putVal 中调用 treeifyBin;treeifyBin中调用new TreeBin<K,V>(hd)
putVal 中调用 TreeBin的putTreeVal方法
get 中调用了TreeBin的find方法

private final void treeifyBin(Node<K,V>[] tab, int index){…}
treeifyBin方法的作用是将当前桶位的Node链表变为一个 TreeBin对象。
大体上的逻辑:
如果当前桶位 有数据,且是普通node数据,就用synchronized加锁头节点,
    双向链表中当前节点的前置引用 是class TreeNode<K,V> extends Node<K,V>
的TreeNode<K,V> prev 属性。
    双向链表中当前节点的后置引用 是class Node<K,V> implements Map.Entry<K,V>
的volatile Node<K,V> next 属性。
    利用Node链表 + 尾插法,生成TreeNode双向链表 hd。
    new TreeBin<K,V>(hd); new 一个TreeBin对象,形参为双向链表
    利用setTabAt方法,把当前桶位设为TreeBin对象。

    putVal方法调用putTreeVal方法之前,当前线程会对这个桶位的TreeBin对象加synchronized锁。上锁后,下一个写线程调用putVal方法,如果尝试给同一个TreeBin对象加锁的话,会加锁失败,然后,自旋。因为上一个线程还没有释放synchronized锁。进而说明,写是独占状态,以散列表来看,真正进入到TreeBin中的写线程 同一时刻 只有一个线程。
    又因为当两个线程访问同一个对象的方法时,如果A线程先持有object对象的Lock锁,B线程可以异步调用object对象中的非synchronized方法。所以,当有写线程在访问TreeBin对象的时候,多个读线程可以调用同一个TreeBin对象的find方法。

    当有读线程在访问的时候,写线程如果是做插入操作,会把节点插进去。但如果插进去的节点,破坏了平衡性(即插入节点的父节点是红色),则需要通过CAS的方式改lockstate(从0改到1) ,但会改失败(因为有读线程在访问TreeBin),之后会被LockSupport.park。但数据还是插进去了呀。只不过得等读线程读完后,再去做自平衡操作。读线程 完事后 会检查 lockState ,发现写线程 等待,就会将写线程再唤醒。

static final class TreeBin<K,V> extends Node<K,V> {
        /**TreeBin逻辑上引用了双向链表和红黑树,但物理上这两个结构里面的节点对象是同一个对象 */


        //指向红黑树 根节点 
        TreeNode<K,V> root;
        //指向双向链表的头节点
        volatile TreeNode<K,V> first;
        //等待者线程(当前lockState是读锁状态)
        volatile Thread waiter;
        /**
         * 1.写锁状态 写是独占状态,以散列表来看,真正进入到TreeBin中的写线程 同一时刻 只有一个线程。 1
         * 2.读锁状态 读锁是共享,同一时刻可以有多个线程 同时进入到 TreeBin对象中获取数据。 每一个线程 都会给 lockStat + 4
         * 3.等待者状态(写线程在等待),当TreeBin中有读线程目前正在读取数据时,写线程无法修改数据,那么就将lockState的最低2位 设置为 0b 10 = WAITER = 2
         */
        volatile int lockState;

        // values for lockState
        //
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; // increment value for setting read lock

        //tieBreakOrder方法的作用是根据形参,返回 -1 或 1
        static int tieBreakOrder(Object a, Object b) {
            ....
        }
        /**
            为了解决AVL树的性能问题,才有了红黑树。都是二叉搜索树的变异。
            红黑树的构建,里面的排序,是受dir变量控制的。dir的值是根据当前节点和插入节点的hash值的比较来确定的
         */
        TreeBin(TreeNode<K,V> b) {
            //设置节点hash为-2 表示此节点是TreeBin节点
            super(TREEBIN, null, null, null);
            //使用first 引用 treeNode双向链表
            this.first = b;
            //r 红黑树的根节点引用
            TreeNode<K,V> r = null;

            这个for循环的主要逻辑就是遍历TreeNode双向链表来生成红黑树,生成的过程中会自平衡(调用balanceInsertion(r, x)//x表示遍历的当前节点,这个 x 就是要插入的节点
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                //将TreeNode节点的next对象转为TreeNode类型。因为TreeNode的next属性是继承的Node<K,V>静态内部类
                next = (TreeNode<K,V>)x.next;
                //强制设置当前插入节点的左右子树为null
                x.left = x.right = null;
                //条件成立:说明当前红黑树 是一个空树,那么设置插入元素 为根节点
                if (r == null) {
                    .....
                }
                else {
                    //非第一次循环,都会来带else分支,此时红黑树已经有数据了
                    ......
                    //kc 表示 插入节点key的class类型
                    Class<?> kc = null;
                    //p 的意义是 找到插入节点的父节点。p 是 本次for (;;)循环中的当前红黑树节点
                    TreeNode<K,V> p = r;
                    for (;;) {
                        dir的值要么是-1,要么是 1,不可能是0。用来控制 x 是插入当前节点的左子树还是右子树。                        
                        因为红黑树的插入是类似二叉搜索树的插入逻辑的。就是:
                        如果插入节点的hash值大于 当前p节点的hash,就将dir设为 -1,
                            则插入节点可能需要插入到当前节点的左子节点 或者 继续在左子树上查找。
                        如果插入节点的hash值大于 当前p节点的hash,就将dir 设为 1,
                            则插入节点可能需要插入到当前节点的右子节点 或者 继续在右子树上查找。
                        如果插入节点的hash == p节点的hash,
                            则用tieBreakOrder方法计算出dir,来决定 插入位置 或 查找方向
                        插好后,调用自平衡算法,进行左旋或右旋。balanceInsertion(r, x)
                        //ph p表示 为查找插入节点的父节点的一个临时节点的hash

                        //xp 保留 插入节点的 父节点引用
                        TreeNode<K,V> xp = p;
                        //条件成立:说明当前p节点 即为插入节点的父节点,这个p节点有一个子树为null了。
                        //条件不成立:说明p节点 底下还有层次,需要将p指向 p的左子节点 或者 右子节点,表示继续向下搜索。
                        if ((p = (dir <= 0) ? p.left : p.right) == null) {
                            开始在红黑树中插入 x 节点
                            //插入节点后,红黑树性质 可能会被破坏,所以需要调用 平衡方法
                            r = balanceInsertion(r, x);
                            break;
                        }
                    }
                }     
            }
            //将r 赋值给 TreeBin对象的 root引用。
            this.root = r;
            assert checkInvariants(root);
        }
        /**find方法的作用:查找节点。一般是读线程在get 中调用了TreeBin的find方法 */
        final Node<K,V> find(int h 查找的hash值, Object k 查找的key) {
            
            if (k != null) {
                //e 表示循环迭代的当前节点   迭代的是first引用的链表
                for (Node<K,V> e = first; e != null; ) {
                    //s 保存的是lock临时状态
                    //ek 链表当前节点 的key
                    int s; K ek;
                    //(WAITER|WRITER) => 0010 | 0001 => 0011
                    //lockState & 0011 != 0 条件成立(xx01 xx10 xx11):
                    //说明当前TreeBin 有等待者线程(是先有读,再有写,再有读的情况,这个写会变为等待者,等到读线程都读完后,最后一个读线程将等待的写线程唤醒) 
                    //	或者 目前有写线程,将lockstate从0变为了1,要做自平衡操作,加了锁(是先有写,再有读的情况。)
                    //这两种情况下,读线程会按双向链表的逻辑找节点,并且没有让lockstate +4)
                    if (((s = lockState) & (WAITER|WRITER)) != 0) {
                        读线程去TreeNode的双向链表中查找 与形参hash、key一致的节点。
                        如果本次循环找到了,就返回当前节点 e;
                        如果本次循环没找到,就移动 e 引用。e = e.next;                       
                    }
                    //前置条件:当前TreeBin中 等待者线程 或者 写线程 都没有。这个时候,才会给lockstate +4
                    //通过CAS的方式,让这个TreeBin对象的lockstate属性 +4.
                    //条件成立:说明添加读锁成功 读锁是共享锁 读与读之间不阻塞
                    else if (U.compareAndSwapInt(this, LOCKSTATE, s,
                                                 s + READER)) {
                        TreeNode<K,V> r, p;
                        try {
                            调用TreeNode的findTreeNode方法,去红黑树上查找节点。 这个try块里面就会查询完。
                            p = ((r = root) == null ? null :
                                 r.findTreeNode(h, k, null));
                        } finally {
                            //w 表示等待者线程
                            Thread w;
                            //U.getAndAddInt(this, LOCKSTATE, -READER) == (READER|WAITER)
                            //U.getAndAddInt方法返回的是AddInt之前的数
                            //1.当前线程查询红黑树结束,释放当前线程的读锁 就是让 lockstate 值 - 4
                            //(READER|WAITER) = 0110 => 表示当前只有一个线程在读,且“有一个线程在等待”
                            //当前读线程为 TreeBin中的最后一个读线程。

                            //2.(w = waiter) != null 说明有一个写线程在等待读操作全部结束。
                            //如果当前线程是最后一个读线程了 且 有等待者线程,就要LockSupport.unpark 去唤醒写线程。
                            if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                                (READER|WAITER) && (w = waiter) != null)
                                //使用unpark 让 写线程 恢复运行状态。
                                LockSupport.unpark(w);
                        }
                        //返回查找结果
                        return p;
                    }
                }
            }
            return null;

        }
        //该方法要么是往红黑树 和 双向链表 中添加节点,要么是替换操作。
        /**
            
         */
        final TreeNode<K,V> putTreeVal(int h, K k, V v) {
            Class<?> kc = null;
            boolean searched = false;
            for (TreeNode<K,V> p = root;;) {
                p 是循环中的当前红黑树节点
            
                if(红黑树为null){
                    则直接用 h,k,v 生成TreeNode 作为红黑树的根,链表的头节点
                }
                else if(p.hash > h){
                    dir = -1; 往p的左子树去找
                }
                else if(p.hash < h){
                    dir = -1;往p的右子树去找 
                }
                else if((pk = p.key) == k || (pk != null && k.equals(pk))){
                    如果红黑树的当前节点的hash、key 与 h,k 一致,
                    则说明put方法插入的节点与红黑树节点发生冲突了,put方法做替换操作。
                    return p;
                }
                //前置条件:p 的hash == h ,但p的key 与 k不相等
                else if(k所属的类没有实现Comparable接口 || k所属的类实现了Comparable接口但 k..compareTo(pk) == 0 ){
                    if (!searched) {
                        TreeNode<K,V> q, ch;
                        searched = true;
                        调用红黑树的查找方法,TreeNode对象的findTreeNode方法,去当前节点的左右子树中查找 与 h、k都一致的节点
                        如果找到了,就return发生冲突的节点引用,说明这是一个替换操作。
                        如果没找到,跳出这个if块。
                    }
                    利用tieBreakOrder方法计算dir
                }
                /**
                上面这个大的if块的作用:
                    如果发现是替换操作,就返回冲突节点的引用。
                    如果当前节点p的hash、key 和h、k不一致,就 计算 dir ,继续寻找插入节点的父节点。
                    插入节点的父节点的特征为 
                    比如,dir=1 ,我发现 当前节点p的右子节点为null,则当前节点p 为插入节点的父节点
                         dir = -1,同理。
                */
                
                //保留
                TreeNode<K,V> xp = p;
                //if 条件中 移动了当前红黑树节点 p,并判断p 是否为插入节点的父节点
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    //找到插入位置了
                    //当前循环节点xp 是待插入节点的父节点,即 x 节点的爸爸
                    //x 表示插入节点
                    //f 老的头结点
                    TreeNode<K,V> x, f = first;
                    //(h, k, v, f 链表逻辑上的插入节点的后置引用, xp 红黑树逻辑上的插入节点的父节点);
                    /**
                        TreeBin逻辑上引用了双向链表和红黑树,但物理上这两个结构里面的节点对象是同一个TreeNode对象.
                        (h, k, v, f 链表逻辑上的插入节点的后置引用, xp 红黑树逻辑上的插入节点的父节点);
                        插入操作:
                        对于双向链表逻辑,采用的是头插法。
                        对于红黑树逻辑,就是一步一步找嘛,有种二叉搜索树的插入的感觉。

                     */
                    first = x = new TreeNode<K,V>(h, k, v, f, xp);

                    //条件成立:说明链表有数据
                    if (f != null)
                        //设置老的头结点的前置引用为 当前的头结点。
                        f.prev = x;
                    
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    
                    if (!xp.red)
                        //如果
                        x.red = true;
                    else {
                        //表示 当前新插入节点后,新插入节点 与 父节点 形成 “红红相连”。
                        //因为红黑树的插入节点,必须为红色
                        lockRoot();
                        try {
                            //平衡红黑树,使其再次符合规范。
                            root = balanceInsertion(root, x);
                        } finally {
                            unlockRoot();
                        }
                    }
                    break;
                }                 
            }           
        }
       
        private final void lockRoot() {
            //条件成立:说明lockState 并不是 0,说明此时有其它读线程在treeBin红黑树中读取数据。
            if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
                contendedLock(); // offload to separate method
        }
        private final void unlockRoot() {
            lockState = 0;
        }
        private final void contendedLock() {
            boolean waiting = false;
            //表示lock值
            int s;
            for (;;) {
                //~WAITER = 11111....01
                //条件成立:说明目前TreeBin中没有线程在访问 红黑树
                //条件不成立:有线程在访问红黑树
                CASE1:
                if (((s = lockState) & ~WAITER) == 0) {
                    //条件成立:说明写线程 抢占锁成功 ,把lockstate从2 改为了 1,(即...10 -> ...01)
                    if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
                        if (waiting)
                            //设置TreeBin对象waiter 引用为null
                            waiter = null;
                        return;
                    }
                }
                CASE2:
                //lock & 0000...10 = 0, 条件成立:说明lock 中 waiter 标志位 为0,此时当前线程可以设置为1了,然后将当前线程挂起。
                //挂起线程只能挂起一个,当一个写线程在写的时候,又来一个写线程,这个写线程就会被挂起。如果再来一个写线程,就会一直自旋。
                else if ((s & WAITER) == 0) {
                	
                    if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
                        waiting = true;
                        waiter = Thread.currentThread();
                    }
                }
                CASE3:
                //条件成立:说明当前线程在CASE2中已经将 treeBin.waiter 设置为了当前线程,并且将lockState 中表示 等待者标记位的地方 设置为了1
                //这个时候,就让当前线程 挂起。。
                else if (waiting)
                    LockSupport.park(this);
            }
        }        
    }



代码上看,有一种情况是这样的:
    先有写线程访问TreeBin,成功加了锁,在做自平衡操作。但这时来了个读线程,读线程一看有写线程,就按双向链表的逻辑查找,自旋了几次但还没找到,此时写线程又退出了。读线程再次自旋的时候,就会将lockstate +4。就去按红黑树的逻辑,从红黑树的根节点重新开始查找了。

以上是关于ConcurrentHashMap核心源码分析的主要内容,如果未能解决你的问题,请参考以下文章

高阶源码分析:ConcurrentHashMap

Java并发集合类ConcurrentHashMap底层核心源码解析

ConcurrentHashMap源码简单分析

ConcurrentHashMap源码分析_JDK1.8版本

ConcurrentHashMap 源码分析

Java多线程核心技术演进ConcurrentHashMap—Java进阶