JDK1.8中的ConcurrentHashMap

Posted 愉悦滴帮主)

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK1.8中的ConcurrentHashMap相关的知识,希望对你有一定的参考价值。

目录

JDK1.8中的ConcurrentHashMap

前言

问题1:为什么在1.8中舍弃了分段锁的机制?

ConcurrentHashMap中的常量

ConcurrentHashMap的初始化方法

ConcurrentHashMap的put方法 

问题2:为什么要用TreeBin对象来代表整棵树?

size代码块 

addCount代码块 

 fullAddCount代码块

 

 


JDK1.8中的ConcurrentHashMap

前言

在JDK1.7中的ConcurrentHashMap中我们了解到ConcurrentHashMap采用分段锁的机制,实现并发的更新操作,它首先将数据分成一段一段地存储,然后给每一段数据配一个锁,当一个线程占用锁访问其中一段数据时,其他段的数据也能被其他线程访问,锁分段技术的使用大大了提高并发访问效率。底层由ReentrantLock+Segment+HashEntry组成。然而在jdk1.8中的实现已经抛弃了Segment分段锁机制,利用CAS+Synchronized来保证并发更新的安全,底层依然采用数组+链表+红黑树的存储结构。

问题1:为什么在1.8中舍弃了分段锁的机制?

其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树,相对而言,总结如下思考

  • JDK1.8的实现降低锁的粒度,JDK1.7版本锁的粒度是基于Segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry(首节点)

  • JDK1.8版本的数据结构变得更加简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,也就不需要Segment这种数据结构了,由于粒度的降低,实现的复杂度也增加了

  • JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个很漫长的过程,而红黑树的遍历效率是很快的,代替一定阈值的链表,这样形成一个最佳拍档

  • JDK1.8为什么使用内置锁synchronized来代替重入锁ReentrantLock,我觉得有以下几点
    1.因为粒度降低了,在相对而言的低粒度加锁方式,synchronized并不比ReentrantLock差,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势就没有了
    2.JVM的开发团队从来都没有放弃synchronized,而且基于JVM的synchronized优化空间更大,使用内嵌的关键字比使用API更加自然
    3.在大量的数据操作下,对于JVM的内存压力,基于API的ReentrantLock会开销更多的内存,虽然不是瓶颈,但是也是一个选择依据。

  • 转自:https://blog.csdn.net/q289336929/article/details/95742247

ConcurrentHashMap中的常量

最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;



默认初始容量。一定是2的幂次方数。最小1,最大MAXIMUM_CAPACITY。

private static final int DEFAULT_CAPACITY = 16;



//最大数组的大小(非2的幂)。toArray和相关方法需要。
MAX_ARRAY_SIZE = Integer。MAX_VALUE - 8;



 默认并发级别。未使用但定义是为了与该类的以前版本兼容。
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;



负载因子。
private static final float LOAD_FACTOR = 0.75f;



使用树而不是列表的bin计数阈值
static final int TREEIFY_THRESHOLD = 8;



调整操作。应该小于TREEIFY_THRESHOLD,在最多6个网格与收缩检测下去除。
static final int UNTREEIFY_THRESHOLD = 6;



可以树形化的最小表容量。(否则,如果一个bin中有太多节点,则会调整表的大小。建议至少设置为4 *TREEIFY_THRESHOLD调整大小和树形化阈值之间的冲突。
static final int MIN_TREEIFY_CAPACITY = 64;



每个传输步骤的最小重新绑定数。范围是细分为允许多个调整线程。这个值作为一个下界,以避免遇到调整器内存占用过多。取值为默认为DEFAULT_CAPACITY。
private static final int MIN_TRANSFER_STRIDE = 16;



在sizeCtl中用于生成stamp的位数。32位数组必须至少为6。
private static int RESIZE_STAMP_BITS = 16;



可以调整大小的最大线程数。必须符合32 - RESIZE_STAMP_BITS位。
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;



在sizeCtl中记录大小戳的位移位。
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;



 Node散列字段的编码。参见上面的解释。
static final int MOVED = -1;//转发节点的散列

static final int TREEBIN = -2;//哈希为树的根

static final int RESERVED = -3;//暂态保留

static final int HASH_BITS = 0x7fffffff;//正常节点散列的可用位



cpu数量
static final int NCPU = Runtime.getRuntime().availableProcessors();

ConcurrentHashMap的初始化方法

在高并发的场景下,initTable()方法通过cas(compareAndSwapInt)的机制来保证只有一个线程能够成功初始化。其他线程放弃本次循环的cpu竞争,进行自旋。但是这样有可能导致cpu占有率过高的问题?比如:一个线程本次循环放弃竞争,但是第二次循环又竞争到了cpu,依次叠加,就会导致cpu占有率过高甚至百分之百的问题。

    /**
     * 初始化表,使用大小记录在sizeCtl。
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); //释放cpu资源放弃本次循环的cpu竞争;只是自旋
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                       //取n的四分之三的值
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

ConcurrentHashMap的put方法 

不同点:1.8中的HashMap中table[index]的位置放入的是TreeNode(红黑树)的根节点。而1.8中的ConcurrentHashMap放入的是TreeBin对象,这个对象代表整颗红黑树。

问题2:为什么要用TreeBin对象来代表整棵树?

我们通过了解1.8的HashMap知道在插入新节点后,插入新节点所在的红黑树的根节点可能发生改变。我们假设在1.8的ConcurrentHashMap也与HashMap一样,那么首先会以该红黑树的根节点为对象加锁,那么有可能在插入新节点过后该红黑树的根节点发生改变,这样就会造成第二个线程同时在操作该红黑树的时候,原本线程未结束因为根节点对象改变导致线程2获取到了锁,从而导致产生数据冲突这样的一个问题。而TreeBin对象将整棵树包起来,从而对TreeBin作为锁对象,就不会因根节点改变而导致上述问题。 

final V putVal(K key, V value, boolean onlyIfAbsent) {
        //如果key或value为空则抛空指针异常
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        //遍历数组
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果数组为空,则初始化一个数组
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //如果table[index]位置为空,则创建Node对象插入其中
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //如果key位置的hash值为-1(MOVED),标志这其他线程在对数组进行扩容,本线程帮助其他线程一起扩容,加快效率。
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
           //key位置上不为空,存在链表或红黑树或单个Node对象
            else {
                V oldVal = null;
                //对要加入的Node对象上锁,在1.8中优化了synchronized ,所以效率没有以前那么低
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                       //如果是链表
                        if (fh >= 0) {
                            binCount = 1;
                            //遍历该链表
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //位置冲突则进行值覆盖,返回旧值
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //尾插法,插入到链表尾部
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                       //如果是红黑树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //判断链表长度是否大于等于8,是则转为红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    //返回旧值
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //数组中元素个数加一
        addCount(1L, binCount);
        return null;
    }

size代码块 

size方法最后返回 baseCount属性 加上CounterCell数组里面的所有值的和。

public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
   
//再看看 sumCount()
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

 CounterCell是一个静态内部类,里面的long属性是通过volatile 修饰,来保证并发安全。

@sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }
 /**
   * Table of counter cells. When non-null, size is a power of 2.
   * 计数器单元表。当非空时,大小是2的乘方。
  */
private transient volatile CounterCell[] counterCells;

 

addCount代码块 

addCount()方法分为两个部分,第一部分,如下

前提: addCount先判断CounterCell数组是否为空:
1、如果为空则对当前map对象cas操作 baseCount 加 1,cas成功了就跳过if,失败了就执行fullAddCount方法。
2、如果不为空则通过当前线程的hash值找到此线程在CounterCell数组中对应的位置,如果此位置的CounterCell对象为空,就执行fullAddCount方法。如果不为空就对此CounterCell对象cas操作value加1。如果成功return;失败就执行fullAddCount方法。

大概意思就是多个线程去调用put方法,也就是多个线程去size+1。在ConcurrentHashMap中通过baseCount去计数。在高并发的场景下,通过CAS机制去控制baseCount+1,也就是只有一个线程能够操作成功。其他线程通过 “随机数&table.length-1” 获取到CounterCell的数组下标,然后去操作CounterCell数组对应下标对象中的value属性,使其value属性+1。最后统计baseCount+CounterCell的数目。

第二部分:数组扩容,我们放在后面描述。 

那么为什么不直接for循环对当前map对象cas操作 baseCount 加 1,却要引入CounterCell数组?

因为for循环cas这种方式可以解决多线程并发问题,但因为cas的是当前map对象,所以同一时刻还是只有一个线程能cas成功,而对于引入CounterCell数组,cas的是当前线程对应在数组中特定位置的元素,也就是说如果位置不冲突,n个长度的CounterCell数组是可以支持n个线程同时cas成功的。

总结:以数组的形式去分散线程,防止多个线程去操作属性只有一个线程能够成功,从而导致浪费其他线程的资源,进而提高并发效率。

 private final void addCount(long x, int check) {
           CounterCell[] as; long b, s;
        //首先if判断counterCells为空并且对当前map对象cas操作baseCount + x成功,就跳过if里的操作,
        //因为都cas操作baseCount + x成功了,就不需要通过counterCells辅助了,简单明了。
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            //如果上面判断失败了,counterCells不为空 或者counterCells为空但cas失败了。
            //如果counterCells为空,直接执行fullAddCount。
            //如果不为空,判断当前线程在counterCells中的槽位是否为空,如果不为空,
            //对槽位中的CounterCell对象cas操作value加1,成功return,失败执行fullAddCount,如果槽位为空,直接执行fullAddCount
            if (as == null || (m = as.length - 1) < 0 ||
                //ThreadLocalRandom.getProbe()就相当于是 [当前线程的哈希值]
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  //cas对CounterCell对象中的value执行+x(也就是+1)操作
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        //判断size+1后数组是否需要扩容。
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

 fullAddCount代码块

private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        //如果当前线程hash值==0,则重新生成一个hash值。
        //当前线程生成的hash值不会改变
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        //循环
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            //如果counterCells已经被初始化了
            if ((as = counterCells) != null && (n = as.length) > 0) {
                //如果当前线程对应于counterCell数组中的槽位为空
                if ((a = as[(n - 1) & h]) == null) {
                    //如果没有其他线程操作对应counterCell数组中的槽位
                    if (cellsBusy == 0) { 
                        //counterCell数组中为空的槽位中创建一个CounterCell对象        
                        CounterCell r = new CounterCell(x); 
                        //如果将cellsBusy成功修改成1 
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try { 
                               //再次校验有没有其他线程操作该数组槽位
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    //标记创建成功
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                //wasUncontended一直为true
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //如果当前线程对应槽位已经存在CounterCell元素了,就对value+x
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                //不扩容条件
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                //为扩容做条件 
                else if (!collide)
                    collide = true;
               //一个线程循环两次,都没有加1成功的情况下,数组扩容
               //在collide = true的情况下,数组扩容
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            //扩展数组,长度变为两倍
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //当前线程重新生成的hash值
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //如果counterCells 没有被初始化,
            // cellsBusy==0表示没有其他线程在使用counterCells数组。
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {//表示该数组已经有线程在用
                boolean init = false;
                try {  
                    //初始化数组                         
                    if (counterCells == as) {
                        //初始化长度为2
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    //表示当前线程已经操作完成该数组
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //如果都不满足,则线程自旋去竞争baseCount和CounterCell
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

 

 

以上是关于JDK1.8中的ConcurrentHashMap的主要内容,如果未能解决你的问题,请参考以下文章

ConcurrentHashMap在jdk1.7和jdk1.8中的不同

JDK1.8中的HashMap.HashTable, ConcurrentHashMap有什么区别?

Java并发编程总结4——ConcurrentHashMap在jdk1.8中的改进

ConcurrentHashMap的JDK1.8实现

java并发:jdk1.8中ConcurrentHashMap源码浅析

JDK1.8 ConcurrentHashMap源码阅读