Java并发-- ConcurrentHashMap如何实现高效地线程安全(jdk1.8)

Posted Hepburn Yang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发-- ConcurrentHashMap如何实现高效地线程安全(jdk1.8)相关的知识,希望对你有一定的参考价值。


文章目录

1.传统集合框架并发编程中Map存在的问题?

  • HashMap死循环,造成CPU100%负载
    HashMap进行存储时,如果size超过(当前最大容量*负载因子)时候会发生resize,而resize中又调用了又调用了transfer()方法,而这个方法实现的机制就是将每个链表转化到新链表,并且链表中的位置发生反转,而这在多线程情况下是很容易造成链表回路,从而发生死循环;
  • 元素丢失问题,多线程put操作,hash碰撞时候两个线程得到同样的bucketIndex可能会导致覆盖的情况,有一个元素会丢失;
  • 还有其他的,路过的评论区补充一下……

2.早期改进策略

  1. HashTable
    HashTable相比HashMap是线程安全的,因为HashTable所有的方法都是加了synchronized的,锁的是整个hashMap,也就是我们说的锁的粒度比较大,由于最基本的put,set操作都加了互斥锁,造成的结果就是同一时间点只能由一个线程put或只能get,并发操作时所有的put,get操作都必须等一个线程完了之后再操作,线程安全得到了保证,但大大降低了并发效率,在非高度的并发的场景可取,高度并发时往往不可取, 。
  2. jdk1.8以前的ConcurrentHashMap
    ConcurrentHashMap在jdk1.7及以前采用的是锁分段机制来保证HashMap的线程安全,锁分段也就是将HashMap内部分段,每段是一个segment, 对每个segment加锁( 可以理解为ConcurrentHashMap是一个segment数组 ),每个段里面包含多个HashEntry,和原HashMap类似,hash相同的entry也是以链表形式存放,这样锁的粒度相比HashTable就小了很多,值得注意的是,1.7的ConcurrentHashMap是通过继承ReentrantLock 来进行加锁的,不同于之前HashTable使用synchronize的加锁形式;通过锁住每个segment来保证每个segment内的操作的线程安全性,也就避免了HashTable的整体同步,一定程度上提升了性能;
    另外在构造的时候, Segment的数量由所谓的concurrentcyLevel决定, 默认是16; 和HashMap的初始容量一致, 也可以在相应构造函数直接指定。 同样是2的幂数值, 如果输入是类似15这种非幂值, 会被自动调整到16之类2的幂数值。所以,默认情况下此时的ConcurrentHashMap支持16个线程并发操作
  3. 除了以上两种方法意外,Collections本身也提供了一种安全机制,就是通过Map<K,V> synchronizedMap(Map<K,V> m)方法将其包装为一个线程安全的map,我们看一下它的put源码实现就清除了:
public V put(K key, V value) 
    synchronized (mutex) return m.put(key, value);


以上简单的说了早期如何保证HashMap的线程安全,下面详细分析一下jdk1.8如何保证线程安全

3.ConcurrentHashMap采取了哪些方法来提高并发表现(jdk1.8)?

相比1.7做了两个改进:
1.取消了锁分段的设计,直接使用Node 数组来保存数据,并且用Node数组来保存数据,并且采用Node数组元素作为锁来实现对每一行数据加锁来进一步减少并发冲突的概率。
2.引入了红黑树的设计,在原来的数组+链表的基础上新增了红黑树的设计,当链表的长度超过8的时候就将链表转为红黑树,此时查询的复杂度也降低到了O(logN), 提升了查询的性能。
3.这一点不知道算不算是改进,但是和1.7确实是不一样的,为了解决线程安全问题,这一版的ConcurrentHashMap采用了synchronzied和CAS的方式,至于为什么选用了synchronzied我猜是因为1.8的synchronzied也做了很多的优化,包括偏向锁到轻量级所到重量级锁膨胀,因此改进后的synchronzied相较于ReentrantLock的性能在某些情况下并不差或许会更优,所以这里才选择了synchronzied来加锁,cas无锁操作的特性我就不多说了,比较容易理解。
稍后我们分析put源码的时候会看到这部分变化的具体实现。
另外,关于1.8版本的synchronzied优化可以查看本系列中博客中的:
【Java并发】-- synchronized原理 (偏向锁,轻量级锁,重量级锁膨胀过程)
结构图:
这个和jdk1.8的hashmap结构一致,但增加了线程安全的实现,所以结构简单,但实现会复杂一些;

4.ConcurrentHashMap实现分析

4.1 ConcurrentHashMap中关键的属性

table:

 //装载Node的数组,作为ConcurrentHashMap的数据容器,采用懒加载的方式,
 //直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为2的幂次方。
  volatile Node<K,V>[] table:

nextTable

//扩容时使用,平时为null,只有在扩容的时候才为非null,
volatile Node<K,V>[] nextTable;

sizeCtl (不同场景有不同意义,肥肠重要!!!)

// 该属性用来控制table数组的大小,根据是否初始化和是否正在扩容有几种情况:
-------------------------
// 当值为负数时,-1这时表示数组有一个线程正在初始化,-n表示有n-1个线程正在进行扩容操作
// 注意:(扩容时可以多线程协作,但初始化只能有一个线程来完成)
-------------------------
// 当值为正数时:表示当前数组的临界值,也就是数组程度*负载因子得到的临界值,到达这个值就会进行扩容操作
// 当值为0时,是数组的默认初始值,此时还未被初始化。
volatile int sizeCtl;

sun.misc.Unsafe U

在ConcurrentHashMap的实现中也可以看到大量的cas操作,也就是U.compareAndSwapXXX类型的方法,调用这些方法去修改ConcurrentHashMap属性的时候就是利用了cas无锁算法来保证线程安全性,这是乐观锁的完美运用,cas是通过sun.misc.Unsafe类实现的,点到这个类之后我们发现所有的方法基本都是native的,也就是非java实现的接口; Unsafe类提供的方法是可以直接操作内存和线程的底层操作,该成员变量的获取是在静态代码块中:

 static 
    try 
        U = sun.misc.Unsafe.getUnsafe();
        .......
     catch (Exception e) 
        throw new Error(e);
    

4.2 ConcurrentHashMap中关键的CAS操作

tabAt

该方法获取对象中offset偏移地址对应的对象field的值, 简单来说也就是获取该方法用来获取table数组中索引为i的Node元素,但大家思考一下为什么不直接通过table[i]获取到第i个元素,而非要通过底层Unsafe类来进行table的操作呢?
因为我们虽然在table数组上加了volatile关键字来保证可见性,但是被volatile修饰的数组只针对数组的引用具有可先性,而不针对数组的元素,所以如果有其他个线程对这个数组的某个元素进行写操作的时候,不一定能保证可见性,当前线程也就不一定读到最新的值了。所以这里调用了Unsafe的getObjectVolatile方法保证每个元素都读到最新的值,同时也保证了性能。下面的casTabAt和setTabAt也是同理。

// 该方法用来获取table数组中索引为i的Node元素
 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) 
     return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
 

casTabAt

// 利用CAS操作设置table数组中索引为i的元素
 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                     Node<K,V> c, Node<K,V> v) 
     return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
 

setTabAt

// 该方法用来设置table数组中索引为i的元素
 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) 
     U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
 

4.3 ConcurrentHashMap核心方法
从整体来说为了解决线程安全的问题,ConcurrentHashMap使用了synchronzied和CAS的方式

put

put方法调用的是putVal来进行put操作,我们来分析一下putVal大致做了哪些事情来保证线程安全,下面是核心逻辑,一定要理解!!

  1. 首先用spread方法进行了一次重hash从而减小hash冲突的可能性;
  2. 调用initTable方法初始化table,已经初始化之后会跳过这一步;
  3. 判断是否可以直接将新值插入到table数组中,为什么需要先判断呢?这块其实分了三种情况;首先插入和更新两种,插入的又分为直接插入table和接入链表;如果待插入的位置table[i]刚好为null就可以直接插入。如果hash取模之后发现i已经有元素了,需要对比hash值是否相等,若相等则覆盖原有元素,若不相等则以链表的形式将当前节点next属性更改为新的节点,把他们连起来。这里多线程操作的情况下根据happenbefore规则 线程 A 的 casTabAt 操作,一定对线程 B 的 tabAt 操作可见;
  4. 判断是否正在扩容,如果正在扩容可以协助扩容(但有协助线程数量有限制,跟cpu的核数有关)
  5. 当table[i]为链表的头结点,在链表中插入新值,我们可以看这部分代码用synchronized 同步代码块包了起来,加了互斥锁来保证线程安全。
  6. 当table[i]为红黑树的根节点,在红黑树中插入新值
  7. 根据节点个数调整红黑树
  8. 对容量大小进行检查,超过了临界值需要扩容;
** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) 
    if (key == null || value == null) throw new NullPointerException();
    //1. 计算key的hash值
    int hash = spread(key.hashCode());
    int binCount = 0; // 用来记录链表的长度
    for (Node<K,V>[] tab = table;;) // 自旋,当出现线程竞争时不断自旋
        Node<K,V> f; int n, i, fh;
        //2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable(); // 初始化数组方法
        //3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可
        
        // 通过hash值对应的数组下标得到第一个节点;以volatile读的方式来读取table数组中的元素,
        // 保证每次拿到的数据都是最新的
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) 
        // 如果该下标返回的节点为空,则直接cas插入,cas失败则存在竞争,进入下一次循环
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        
        //4. 当前正在扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else 
            V oldVal = null;
            synchronized (f) 
                if (tabAt(tab, i) == f) 
                    //5. 当前为链表,在链表中插入新的键值对
                    if (fh >= 0) 
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) 
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) 
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) 
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            
                        
                    
                    // 6.当前为红黑树,将新的键值对插入到红黑树中
                    else if (f instanceof TreeBin) 
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) 
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        
                    
                
            
            // 7.插入完键值对后再根据实际大小看是否需要转换成红黑树
            if (binCount != 0) 
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            
        
    
    //8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容 
    addCount(1L, binCount);
    return null;

详细分析put的扩容操作

扩容部分有两个经典的设计:
1.高并发下的扩容
2.如何保证addCount的数据安全性以及性能

 // 调用传参
    addCount(1L, binCount);
    
 // 把当前ConcurrentHashMap的元素个数+1
 // 这个方法一共做了两件事,更新baseCount的值,检测是否进行扩容   
private final void addCount(long x, int check) 
    CounterCell[] as; long b, s;
    //利用CAS方法更新baseCount的值 

    /* 判断 counterCells 是否为空,
1. 如果为空,就通过 cas 操作尝试修改 baseCount 变量,对这个变量进行原子累加操
作(做这个操作的意义是:如果在没有竞争的情况下,仍然采用 baseCount 来记录元素个
数)
2. 如果 cas 失败说明存在竞争,这个时候不能再采用 baseCount 来累加,而是通过
CounterCell 来记录
*/
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) 
        CounterCell a; long v; int m;
        boolean uncontended = true; // 是否冲突标识,默认为没有冲突
        
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) 
            fullAddCount(x, uncontended);
            return;
        
        if (check <= 1)
            return;
        s = sumCount();
    
    //如果check值大于等于0 则需要检验是否需要进行扩容操作
    if (check >= 0) 
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) 
            int rs = resizeStamp(n);
            //
            if (sc < 0) 
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                 //如果已经有其他线程在执行扩容操作
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            
            //当前线程是唯一的或是第一个发起扩容的线程  此时nextTable=null
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        
    

CounterCells 解释

更新map的size值这里借用了分布式的思想,起到关键作用的是这里的CounterCell 数组,这个数组里面每个元素都存着一个value值,而最终map的size就是数组中所有value值相加得来的,详细可以查看sumCount的源码;
为什么如此设计呢?
一般的集和在进行put操作的时候,size的大小只要随着put操作i++即可,但是在多线程情况下i++的不安全结果也一定不准确,为了保证这个size共享变量的安全性势必会增加锁的设计,通过自旋,cas或synchronize锁等实现,但在竞争非常激烈的情况下如此这般设计一定会占据资源影响性能,所以这里采用了引入了CounterCells ,采用分布式的思想进行分片化处理,其实看到这里我是非常激动的,必须对Doug Lea大师真的致以最崇高的respect!具体如何实现呢?
注意这里:as[ThreadLocalRandom.getProbe() & m]
as是CounterCells 数组,ThreadLocalRandom是保证在多线程情况下Random生成随机数的线程安全;
实现逻辑:

  1. 计数表(CounterCells 数组)为空则直接调用 fullAddCount ;
  2. 从计数表中随机取出一个数组的位置为空,直接调用 fullAddCount
  3. 通过 CAS 修改 CounterCell 随机位置的值,如果修改失败说明出现并发情况,继续cas即可;
    举个简单的例子:
    比如说现在有三个线程ThreadA/B/C在并发进行put操作,ThreadLocalRandom.getProbe()会为他们生成三个随机数,范围是(0,m),m是CounterCell.length-1; 比如说是初始化的长度为2,此时假设为这三个线程生成了三个随机数0,1,0
    ThreadA拿到了0,ThreadB拿到了1,ThreadC拿到了0,此时他们会针对CounterCell数组对应下表的value进行+1的cas操作,ThreadA会找到CounterCell[0]对0下标处的value元素+1,默认为0,此时通过cas+1后变成1;因为ThreadC也拿到了0下标所以也会对CounterCell[0]进行cas+1操作,cas是无锁操作,ThreadC会一直cas重试,直到ThreadA操作完毕释放锁,于是CounterCell[0]中的value会经历两次+1的cas操作变成2;
    同理ThreadB会将CounterCell[1]处的value值cas更新为1,然后再调用sumCount将CounterCell数组中的所有value元素累加得到真正的size值;
    这样设计带来的好处?
    利用分片思维提高了负载能力,CounterCell的数组长度为多少,就可以支持多少个线程并发的去对size计数,相应的负载能力就会多少倍;CounterCell的默认初始值为2,也就是至少可以提升2倍的负载能力,CounterCell后期同样可以扩容,但扩容的契机我还有待研究,暂不多说。
transfer 扩容阶段

扩容的基本思想是跟hashMap是很像的,另外注意这里的并发扩容是是没有加锁的,所以这里支持并发扩容,效率是很高的,但是实现起来要复杂的多,所以这里也是ConcurrentHashMap 的精华之一;
首先判断是否需要扩容,也就是当更新后的键值对总数 baseCount >= 阈值 sizeCtl 时,进行
rehash,这里面会有两个逻辑。

  1. 如果当前正在处于扩容阶段,则当前线程会加入并且协助扩容
  2. 如果当前没有在扩容,则直接触发扩容操作
    resizeStamp
    这里先提一下resizeStamp这个扩容戳,是扩容时的重要标记;
static final int resizeStamp(int n) 
	return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));

Integer.numberOfLeadingZeros 这个方法是返回无符号整数 n 最高位非 0 位前面的 0 的个数
比如 10 的二进制是 0000 0000 0000 0000 0000 0000 0000 1010
那么这个方法返回的值就是 28
根据 resizeStamp 的运算逻辑,我们来推演一下,假如 n=16,那么 resizeStamp(16)=32796
转化为二进制是
[0000 0000 0000 0000 1000 0000 0001 1100]
接着再来看,当第一个线程尝试进行扩容的时候,会执行下面这段代码

    U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)

rs 左移 16 位,相当于原本的二进制低位变成了高位 1000 0000 0001 1100 0000 0000 0000 0000
然后再+2
=1000 0000 0001 1100 0000 0000 0000 0000 +10
=1000 0000 0001 1100 0000 0000 0000 0010

这样存储带来的好处??

  • 首先在 CHM 中是支持并发扩容的,也就是说如果当前的数组需要进行扩容操作,可以由多个线程来共同负责;
    第一个扩容的线程,执行 transfer 方法之前,
    会设置 sizeCtl =(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
    后续帮其扩容的线程,执行 transfer 方法之前,会设置 sizeCtl = sizeCtl+1
    每一个退出 transfer 的方法的线程,退出之前,会设置 sizeCtl = sizeCtl-1
    那么最后一个线程退出时:必然有
    sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2)
    == resizeStamp(n) << RESIZE_STAMP_SHIFT
    如果 sc - 2 不等于标识符左移 16 位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
  • 可以保证每次扩容都生成唯一的生成戳, 每次新的扩容,都有一个不同的 n(n是map的size),这个生成戳就是根据 n 来计算出来的一个数字, n 不同,这个数字也不同

第一个线程尝试扩容的时候,为什么是+2 ??
因为 1 表示初始化,2 表示一个线程在执行扩容,而且对 sizeCtl 的操作都是基于位运算的,
所以不会关心它本身的数值是多少,只关心它在二进制上的数值,而 sc + 1 会在
低 16 位上加 1。

多线程扩容要注意的问题?
在扩容的时候其他线程也可能正在添加元素,这时又触发了扩容怎么办? 可能大家想到的第
一个解决方案是加互斥锁,把转移过程锁住,虽然是可行的解决方案,但是会带来较大的性
能开销。因为互斥锁会导致所有访问临界区的线程陷入到阻塞状态,持有锁的线程耗时越长,
其他竞争线程就会一直被阻塞,导致吞吐量较低。而且还可能导致死锁

而 ConcurrentHashMap 并没有直接加锁,而是采用 CAS 实现无锁的并发同步策略,最精华
的部分是它可以利用多线程来进行协同扩容
简单来说,它把 Node 数组当作多个线程之间共享的任务队列,然后通过维护一个指针来划
分每个线程锁负责的区间,每个线程通过区间逆向遍历来实现扩容,一个已经迁移完的
bucket 会被替换为一个 ForwardingNode 节点,标记当前 bucket 已经被其他线程迁移完了。

transfer的源码分析

  private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) 
        int n = tab.length, stride;
        /*
        将 (n>>>3 相当于 n/8) 然后除以 CPU 核心数。如果得到的结果小于 16,那么就使用 16
        这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少
        的话,默认一个 CPU(一个线程)处理 16 个桶,也就是长度为 16 的时候,扩容的时候只会有一
        个线程来扩容
         */
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) <
                MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE;  
        //nextTab 未初始化, nextTab 是用来扩容的 node 数组
        if (nextTab == null)  // initiating
            try 
                @SuppressWarnings("unchecked")
            //新建一个 n<<1 原始 table 大小的 nextTab,也就是 32
                        Node<K,V>[] nt = (Node<K,V>[])new <

以上是关于Java并发-- ConcurrentHashMap如何实现高效地线程安全(jdk1.8)的主要内容,如果未能解决你的问题,请参考以下文章

java高并发?

java并发编程看啥书比较好

Java并发知识整理

java高并发,如何解决,啥方式解决,高并发

java 怎样处理高并发

Java并发总结-全景图