concurrenthashmap怎么实现的分段锁

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了concurrenthashmap怎么实现的分段锁相关的知识,希望对你有一定的参考价值。

参考技术A 最大的区别就是ConcurrentHashMap是线程安全的,hashMap不是线程安全的。 为什么线程安全呢: ConcurrentHashMap代码中可以看出,它引入了一个“分段锁”的概念,具体可以理解为把一个大的Map拆分成N个小的HashTable

一文就懂ConcurrentHashMap实现原理

文章目录

ConcurrentHashMap介绍

哈希表就是一种以键-值(key -indexed) 存储数据的结构,我们只要输入待查 找的值即 key,即可查找到其对应的值。

哈希表是一种非常高效的数据结构 ,设计优良的哈希函数可以使其上的增删改查操作达到O(1)级别。Java为我们提供了一个现成的哈希结构 ,那就是HashMap类 ,在前面的文章中我曾经介绍过HashMap类 ,知道它的所有方法都未进行同步 ,因此在多线程环境中是不安全的。

为此 ,Java为我们提供了另外一个HashTable类 ,它对于多线程同步的处理非常简单粗暴 ,那就是在HashMap的基础上对其所有方法都使用synchronized关键字进行加锁。

例如下面截取的几个方法:

这种方法虽然简单 ,但导致了一个问题 ,那就是在同一时间内只能由一个线程去操作哈希表。即使这些线程都只是进行读操作也必须要排队 ,这在竞争激烈的多线程环境中极为影响性能。本篇介绍的ConcurrentHashMap就是为了解决这个问题的 ,它的内部使用分段锁将锁进行细粒度化 ,从而使得多个线程能够同时操作哈希表,这样极大的提高了性能。

补:ConcurrentHashMap 与 HashMap 等的区别

1.HashMap

我们知道 HashMap 是线程不安全的 ,在多线程环境下 ,使用 Hashmap 进行 put 操作会引起死循环 ,导致 CPU 利用率接近 100% ,所以在并发情况下不能 使用 HashMap

2.HashTable

HashTable 和 HashMap 的实现原理几乎一样 ,差别无非是

HashTable 不允许 key 和 value 为 null

HashTable 是线程安全的

但是 HashTable 线程安全的策略实现代价却太大了 ,简单粗暴 ,get/put 所有 相关操作都是 synchronized 的 ,这相当于给整个哈希表加了一把大锁。

多线程访问时候 ,只要有一个线程访问或操作该对象 ,那其他线程只能阻塞 ,相 当于将所有的操作串行化 ,在竞争激烈的并发场景中性能就会非常差。

3.ConcurrentHashMap

主要就是为了应对 hashmap 在并发环境下不安全而诞生的 ,ConcurrentHashMap 的设计与实现非常精巧 ,大量的利用了 volatile ,final , CAS 等 lock-free 技术来减少锁竞争对于性能的影响。

我们都知道 Map 一般都是数组+链表结构 ( JDK1.8 该为数组+红黑树) 。

ConcurrentHashMap 避免了对全局加锁改成了局部加锁操作 ,这样就极大地 提高了并发环境下的操作速度 ,由于ConcurrentHashMap 在 JDK1.7 和 1.8 中的实现非常不同 ,接下来我们首先得谈谈 JDK 在 1.7 和 1.8 中的实现原理的区别

ConcurrentHashMap实现原理

JDK1.7 版本实现原理

在 JDK1.7 中 ConcurrentHashMap 采用了数组+Segment+分段锁的方式实现

Segment(分段锁)

ConcurrentHashMap 中的分段锁称为 Segment ,它即类似于 HashMap 的结构 ,即内部拥有一个 Entry 数组 ,数组中的每个元素又是一个链表,同时又是一个ReentrantLock。

Segment 继承了ReentrantLock:

内部结构

ConcurrentHashMap 使用分段锁技术 ,将数据分成一段一段的存储 ,然后给 每一段数据配一把锁 ,当一个线程占用锁访问其中一个段数据的时候 ,其他段的 数据也能被其他线程访问 ,能够实现真正的并发访问。

在 jdk 1.7 中,ConcurrentHashMap 是由 Segment 数据结构和 HashEntry 数组结构构成,采取分段锁来保证安全性。Segment 是 ReentrantLock 重入锁,在 ConcurrentHashMap 中扮演锁的角色,HashEntry 则用于存储键值对数据。一个 ConcurrentHashMap 里包含一个 Segment 数组,一个Segment 里包含一个 HashEntry 数组,Segment 的结构和 HashMap 类似,是一个数组和链表结构。

如下图是JDK 1.7中ConcurrentHashMap 的内部结构图 :

那么从上面的结构我们可以了解到 ,ConcurrentHashMap 定位一个元素的过程需要进行两次 Hash 操作。

第一次 Hash 定位到 Segment ,第二次 Hash 定位到元素所在的链表的头部

JDK1.7 原理结构的优缺点

优点

  • 写操作的时候可以只对元素所在的 Segment 进行加锁即可 ,不会影响到其他的 Segment ,这样 ,在最理想的情况下,ConcurrentHashMap 可以最高同时支持 Segment 数量大小的写操作 ( 刚好这些写操作都非常平均地分布在所有的 Segment 上) 。

缺点

  • 这一种结构的带来的副作用是 Hash 的过程要比普通的 HashMap 要长

所以 ,通过这一种结构 ,ConcurrentHashMap 的并发能力可以大大的提高。

JDK1.8版本实现原理

JDK8 中 ConcurrentHashMap 参考了 JDK8 HashMap 的实现 ,采用了Node数组+ 链表+红黑树的实现方式来设计 ,内部大量采用 CAS 操作

CAS

CAS 是 compare and swap 的缩写 ,即我们所说的比较交换。 cas 是一种基于 锁的操作 ,而且是乐观锁。在 java 中锁分为乐观锁和悲观锁。悲观锁是将资源锁住 ,等一个之前获得锁的线程释放锁之后 ,下一个线程才可以访问。而乐观锁 采取了一种宽泛的态度 ,通过某种方式不加锁来处理资源 ,比如通过给记录加 version 来获取数据 ,性能较悲观锁有很大的提高。

CAS 操作包含三个操作数 —— 内存位置 ( V ) 、预期原值 ( A ) 和新值(B)。 如果内存地址里面的值和 A 的值是一样的 ,那么就将内存里面的值更新成 B。 CAS 是通过无限循环来获取数据的 ,若果在第一轮循环中 ,a 线程获取地址里面 的值被 b 线程修改了 ,那么 a 线程需要自旋 ,到下次循环才有可能机会执行。

Node

JDK8 中彻底放弃了 Segment 转而采用的是 Node ,其设计思想也不再是 JDK1.7 中的分段锁思想。

Node:保存 key,value 及 key 的 hash 值的数据结构。

注意:其中 value 和 next 都用 volatile修饰,用来保证并发的可见性

Node的源码如下:

/*
Key-value entry:
此类永远不会导出为用户可变的Map.Entry(即,一个支持setValue;参见下面的MapEntry),
但可以用于批量任务中使用的只读遍历。具有负散列字段的节点的子类是特殊的,并且包含空键和值(但从不导出)。
否则,键和vals永远不会为空。
*/
static class Node<K,V> implements Map.Entry<K,V> 
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    Node(int hash, K key, V val, Node<K,V> next) 
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    
    public final K getKey()        return key; 
    public final V getValue()      return val; 
    public final int hashCode()    return key.hashCode() ^ val.hashCode(); 
    public final String toString() return key + "=" + val; 
    public final V setValue(V value) 
        throw new UnsupportedOperationException();
    
	// 其他函数......

Java 8 ConcurrentHashMap 结构基本上和 Java 8 的 HashMap 一样,不过保证线程安全性。

在 JDK8 中 ConcurrentHashMap 的结构 ,由于引入了红黑树 ,使得ConcurrentHashMap 的实现非常复杂 ,我们都知道 ,红黑树是一种性能非常 好的二叉查找树 ,其查找性能为 O ( logN ) ,但是其实现过程也非常复杂 ,而且可读性也非常差 ,Doug Lea 的思维能力确实不是一般人能比的 ,早期完全采用链表结构时 Map 的查找时间复杂度为 O( N ) ,JDK8 中 ConcurrentHashMap 在链表的长度大于某个阈值的时候会将链表转换成红黑树进一步提高其查找性能。

链表与红黑树转化简图如下:

JDK 1.8中的实现:

JDK1.8 的实现已经摒弃了 Segment 的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 Synchronized 和 CAS 来操作,整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。

下图是其内部结构的示意图:

TreeBin是红黑树

JDK1.7与1.8区别总结

其实可以看出 JDK1.8 版本的 ConcurrentHashMap 的数据结构已经接近 HashMap ,相对而言 ,ConcurrentHashMap 只是增加了同步的操作来控制并发 ,从 JDK1.7 版本的 ReentrantLock+Segment+HashEntry ,到 JDK1.8 版 本中 synchronized+CAS+HashEntry+红黑树。

  1. 数据结构 :取消了 Segment 分段锁的数据结构 ,取而代之的是数组+链表+ 红黑树的结构。

  2. 保证线程安全机制 :JDK1.7 采用 segment 的分段锁机制实现线程安全 ,其中 segment 继承自 ReentrantLock。JDK1.8 采用 CAS+Synchronized 保证线程 安全。

  3. 锁的粒度 :原来是对需要进行数据操作的 Segment 加锁 ,现调整为对每个数 组元素加锁 ( Node ) 。

  4. 链表转化为红黑树:定位结点的 hash 算法简化会带来弊端,Hash 冲突加剧,因 此在链表节点数量大于 8 时 ,会将链表转化为红黑树进行存储。

  5. 查询时间复杂度 :从原来的遍历链表 O(n) ,变成遍历红黑树 O(logN)。

JDK1.7 ConcurrentHashMap源码分析

因为看过HashMap1.8的,ConcurrentHashMap用的是CAS和同步来控制并发,所以就先不看了

成员变量

ConcurrentHashMap成员变量如下:

//默认初始化容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//默认加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f ;
//默认并发级别
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//集合最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//分段锁的最小数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//分段锁的最大数量
static final int MAX_SEGMENTS = 1 << 16;
//加锁前的重试次数
static final int RETRIES_BEFORE_LOCK = 2;
//分段锁的掩码值
final int segmentMask;
//分段锁的移位值
final int segmentShift;
//分段锁数组
final Segment<K,V>[] segments;

先不用强记,后面将会在具体场景中来介绍到这些成员变量的作用。像Segment数组代表分段锁集合 ,并发级别则代表分段锁的数量(也意味有多少线程可以同时操作) ,初始化容量代表整个容器的容量 ,加载因子代表容器元素可以达到多满的一种程度这些变量我们已经大概知道了。

分段锁

这是ConcurrentHashMap中最重要的一个概念:Segment是ConcurrentHashMap的静态内部类 ,可以看到它继承自ReentrantLock ,因此它在本质上是一个锁。

它在内部持有 一个HashEntry数组(哈希表) ,并且保证所有对该数组的增删改查方法都是线程安全的 ,具体是怎样实现的后面具体方法会讲到。

所有对ConcurrentHashMap的增删改查操作都可以委托Segment来进行 ,因此ConcurrentHashMap能够保证在多线程环境下是安全的。又因为不同的Segment是不同的锁 ,所以多线程可以同时操作不同的Segment ,也就意味着多线程可以同时操作ConcurrentHashMap ,这样就能避免HashTable的缺陷 ,从而极大的提高性能。

//分段锁
static final class Segment<K,V> extends ReentrantLock implements Serializable 
    //自旋最大次数
    static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1 ;
    //哈希表
    transient volatile HashEntry<K,V>[] table;
    //元素总数
    transient int count;
    //修改次数
    transient int modCount;
    //元素阀值
    transient int threshold;
    //加载因子
    final float loadFactor;
    //省略以下内容
    ...

Segment内部也包含了其他方法来实现并发Hash数据的基本操作:

初始化构造器

ConcurrentHashMap有多个构造器 ,下面贴出的是它的核心构造器 ,其他构造器都通过调用它来完成初始化。

核心构造 器需要传入三个参数 :

  • 初始容量
  • 加载因子
  • 并发级别

在前面介绍成员变量时我们可以知道他们是有默认值的,默认的初始容量为16 , 加载因子为0.75f ,并发级别为16。

现在我们看到核心构造器的代码 ,首先是通过传入的concurrencyLevel来计算出ssize ,ssize是Segment数组的长

注意:ssize必须保证是2的幂 ,这样就可以通过hash&ssize-1来计算分段锁在数组中的下标。

所以,因为传入的concurrencyLevel 不能保证是2的幂 ,所以不能直接用它来当作Segment数组的长度 ,因此我们要找到一个最接近 concurrencyLevel的2的幂 ,用它来作为数组的长度。

例如:现在传入的concurrencyLevel=15 ,通过上面代码可以计算出ssize=16 ,那么。24 = 16,所以sshift=4。

接下来立马可以算出segmentShift=16 ,segmentMask=15。注意这里的segmentShift是分段锁的移位值 ,segmentMask是分段锁的掩码值 ,这两个值是用来计算分段锁在数组中的下标 ,在下面我们会讲到。

在算出分段锁的个 数ssize之后 ,就可以根据传入的总容量来计算每个分段锁的容量 ,它的值c = initialCapacity / ssize

分段锁的容量也就是HashEntry数组的长度 ,同样也必须保证是2的幂 ,而上面算出的c的值不能保证这一点 ,所以不能直接用c作为HashEntry数组 的长度 ,需要另外找到一个最接近c的2的幂 ,将这个值赋给cap ,然后用cap来作为HashEntry数组的长度。

到现在,我们有了 ssize和cap ,就可以新建分段锁数组Segment[]和元素数组HashEntry[]了。注意 ,与JDK1.6不同是的 ,在JDK1.7中只新建了 Segment数组 ,并没有对它初始化 ,初始化Segment的操作留到了插入操作时进行。

全部源码如下:

//核心构造器
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) 
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 
        throw new IllegalArgumentException();
    
    //确保并发级别不大于限定值
    if (concurrencyLevel > MAX_SEGMENTS) 
        concurrencyLevel = MAX_SEGMENTS;
    
    int sshift = 0;
    int ssize = 1;
    //保证ssize为2的幂, 且是最接近的大于等于并发级别的数
    while (ssize < concurrencyLevel) 
        ++sshift;
        ssize <<= 1;
    
    //计算分段锁的移位值

    this.segmentShift = 32 - sshift;
    //计算分段锁的掩码值
    this.segmentMask = ssize - 1;
    //总的初始容量不能大于限定值
    if (initialCapacity > MAXIMUM_CAPACITY) 
        initialCapacity = MAXIMUM_CAPACITY;
    
    //获取每个分段锁的初始容量
    int c = initialCapacity / ssize;
    //分段锁容量总和不小于初始总容量
    if (c * ssize < initialCapacity) 
        ++c;
    
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    //保证cap为2的幂, 且是最接近的大于等于c的数
    while (cap < c) 
        cap <<= 1;
    
    //新建一个Segment对象模版
    Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), (HashEntry<K, V>[]) new HashEntry[cap]);
    //新建指定大小的分段锁数组
    Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
    //使用UnSafe给数组第0个元素赋值
    UNSAFE.putOrderedObject(ss, SBASE, s0);
    this.segments = ss;

定位锁和定位元素

定位锁和定位元素有如下两个重要的步骤

a. 通过哈希码计算分段锁在数组中的下标 :(h >>> segmentShift) & segmentMask。

b. 通过哈希码计算元素在数组中的下标 :(tab.length - 1) & h。

现在我们假设传给构造器的两个参数为initialCapacity= 128, concurrencyLevel= 16。根据计算可以得到ssize= 16, sshift=4 ,segmentShift=28 ,segmentMask= 15。 同样 ,算得每个分段锁内的HashEntry数组的长度为8 ,所以tab.length- 1=7。根据这些值 ,我们通过下图来解释如何根据同一个哈希码来定位分段锁和元素。

可以看到分段锁和元素的定位都是通过元素的哈希码来决定的。定位分段锁是取哈希码的高位值(从32位处取起) ,定位元素是 取的哈希码的低位值。现在有个问题 ,它们一个从32位的左端取起 ,一个从32位的右端取起 ,那么会在某个时刻产生冲突吗?我们在成员变量里可以找到MAXIMUM_CAPACITY = 1 << 30 ,MAX_SEGMENTS = 1 << 16 ,这说明定位分段锁和定位 元素使用的总的位数不超过30 ,并且定位分段锁使用的位数不超过16 ,所以至少还隔着2位的空余 ,因此是不会产生冲突的。

全部源码如下:

//根据哈希码获取分段锁
@SuppressWarnings("unchecked")
private Segment<K,V> segmentForHash(int h) 
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u) ;

//根据哈希码获取元素
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) 
    HashEntry<K,V>[] tab;
    return (seg == null || (tab = seg.table) == null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE) ;

get 查找元素

虽然Segment对象持有的HashEntry数组引用是volatile类型的 ,但是数组内的元素引用不是volatile类型的 ,因此多线程对数组元素的修改是不安全的 ,可能会在数组中读取到尚未构造完成的对象。

在JDK1.6中是通过第二次加锁读取来保证安全的 ,而JDK1.7中通过UnSafe的getObjectVolatile方法来读取同样也是为了保证这一点。

使用getObjectVolatile方法读取数组元素需要先获得元素在数组中的偏移量 ,在这里根据哈希码计算得到分段锁在数组中的偏移量为u ,然后通过偏移量u来尝试读取分段锁。 由于分段锁数组在构造时没进行初始化 ,因此可能读出来一个空值 ,所以需要先、进行判断。

在确定分段锁和它内部的哈希表都不为空之后 ,再通过哈希码读取HashEntry数组的元素 ,这时获得的是链表的头结点。之后再从头到尾的对链表进行遍历查找,如果找到对应的值就将其返回 ,否则就返回 null。

以上就是整个查找元素的过程。

源码如下:

//根据key获取value
public V get(Object key) 
    Segment<K, V> s;
    HashEntry<K, V>[] tab;
    //使用哈希函数计算哈希码
    int h = hash(key);
    //根据哈希码计算分段锁的索引
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    //获取分段锁和对应的哈希表
    if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) 
        //根据哈希码获取链表头结点, 再对链表进行遍历
        for (HashEntry<K, V> e = (HashEntry<K, V>) UNSAFE.getObjectVolatile
             (tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) 
            K k;
            //根据key和hash找到对应元素后返回value值
            if ((k = e.key) == key || (e.hash == h && key.equals(k))) 
                return e.value;
            
        
    
    return null;

put 插入元素

ConcurrentHashMap中有两个添加键值对的方法 :

  • 通过put方法添加时如果存在则会进行覆盖
  • 通过putIfAbsent方法添加时如 果存在则不进行覆盖

这两个方法都是调用分段锁的put方法来完成操作 ,只是传入的最后一个参数不同而已。

如下是这两个方法的源码:

//向集合添加键值对(若存在则替换)
@SuppressWarnings("unchecked")
public V put(K key, V value) 
    Segment<K, V> s;
    //传入的value不能为空
    if (value == null) throw new NullPointerException();
    //使用哈希函数计算哈希码
    int hash = hash(key);
    //根据哈希码计算分段锁的下标
    int j = (hash >>> segmentShift) & segmentMask;
    //根据下标去尝试获取分段锁
    if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) 
        //获得的分段锁为空就去构造一个
        s = ensureSegment(j);
    
    //调用分段锁的put方法
    return s.put(key, hash, value, false);



//向集合添加键值对(不存在才添加)
@SuppressWarnings("unchecked")
public V putIfAbsent(K key, V value) 
    Segment<K, V> s;
    //传入的value不能为空
    if (value == null) throw new NullPointerException();
    //使用哈希函数计算哈希码
    int hash = hash(key);
    //根据哈希码计算分段锁的下标
    int j = (hash >>> segmentShift) & segmentMask;
    //根据下标去尝试获取分段锁
    if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) 
        //获得的分段锁为空就去构造一个
        s = ensureSegment(j);
    
    //调用分段锁的put方法
    return s.put(key, hash, value, true);


在上面代码中 我们可以看到首先是根据key的哈希码来计算出分段锁在数组中的下标 ,然后根据下标使用UnSafe类getObject方法来读取分段锁。 由于在构造ConcurrentHashMap时没有对Segment数组中的元素初始化 ,所以可能读到一个空值 ,这时会先通过 ensureSegment方法新建一个分段锁。

获取到分段锁之后再调用它的put方法完成添加操作 ,源码如下Segment的put操作源码如下:

//添加键值对
final V put(K key, int hash, V value, ConcurrentHashMap的实现原理是分段锁?你Out了

ConcurrentHashMap的实现原理是分段锁?你Out了

ConcurrentHashMap

ConcurrentHashMap面试题

一文就懂ConcurrentHashMap实现原理

一文就懂ConcurrentHashMap实现原理