ConcurrentHashMap 源码浅析 1.7

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentHashMap 源码浅析 1.7相关的知识,希望对你有一定的参考价值。

  1. 简介

    (1) 背景
    HashMap死循环:HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry.
    HashTable效率低下:HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下.因为当一个线程访问HashTable的同步方法,其它线程也访问HashTable的同步方法时,会进入阻塞或轮询状态.如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法获取元素,所以竞争越激烈效率越低.
    (2) 简介
    HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么多线程访问容器里不同的数据段时,线程间不会存在竞争,从而可以有效提高并发访问效率,这就是ConcurrentHash所使用的锁分段技术.首先将数据分成一段一段地储存,然后给每一段配一把锁,当一个线程占用锁访问其中一段数据时,其它段的数据也能被其它线程访问.

  2. 结构

    ConcurrentHash是由Segments数组结构和HashEntry数组结构组成.Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的色;HashEntry则用于存储键值对数据.一个ConcurrentHashMap里包含一个Segment组.Segment的结构和HashMap类似,是一种数组加链表的结构.一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护者一个HashEntry数组里面的元素,当对HashEntry数组的数据进行修改时,必须先获得与它对应的Segment锁,如下图所示.

    技术图片

  3. 基本成员
    default_initial_capacitymap默认容量,必须是2的冥

    /**
     * 默认的初始容量 16
     */
    static final int DEFAULT_INITIAL_CAPACITY = 16;

    default_load_factor默认负载因子(存储的比例)

    /**
     * 默认的负载因子
     */
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    default_concurrency_level默认并发数量,segments数组量

    /**
     * 默认的并发数量,会影响segments数组的长度
     */
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    maximum_capacitymap最大容量

    /**
     * 最大容量,构造ConcurrentHashMap时指定的值超过,就用该值替换
     * ConcurrentHashMap大小必须是2^n,且小于等于2^30
     */
    static final int MAXIMUM_CAPACITY = 1 << 30;

    min_segment_table_capacityHashEntry[]默认容量

    /**
     * 每个segment中table数组的长度,必须是2^n,至少为2
     */
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

    max_segments最大并发数,segments数组最大量

    /**
     * 允许最大segment数量,用于限定concurrencyLevel的边界,必须是2^n
     */
    static final int MAX_SEGMENTS = 1 << 16;

    retries_before_lock重试次数,在加锁之前

    /**
     * 非锁定情况下调用size和contains方法的重试次数,避免由于table连续被修改导致无限重试
     */
    static final int RETRIES_BEFORE_LOCK = 2;

    segmentMask计算segment位置的掩码(segments.length-1)

    /**
     * 用于segment的掩码值,用于与hash的高位进行取&
     */
    final int segmentMask;

    segmentShift

    /**
     * 用于算segment位置时,hash参与运算的位数
     */
    final int segmentShift;

    segmentssegment数组

    /**
     * segments数组
     */
    final Segment<K,V>[] segments;

    HashEntry存储数据的链式结构

    static final class HashEntry<K,V> {
        // hash值
        final int hash;
        // key
        final K key;
        // 保证内存可见性,每次从内存中获取
        volatile V value;
        volatile HashEntry<K,V> next;
    
        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }
    
        /**
         * 使用volatile语义写入next,保证可见性
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

    Segment继承ReentrantLock锁,用于存放HashEntry[]

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        private static final long serialVersionUID = 2249069246763182397L;
    
        /**
         * 对segment加锁时,在阻塞之前自旋的次数
         *
         */
        static final int MAX_SCAN_RETRIES =
                Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    
        /**
         * 每个segment的HashEntry table数组,访问数组元素可以通过entryAt/setEntryAt提供的volatile语义来完成
         * volatile保证可见性
         */
        transient volatile HashEntry<K,V>[] table;
    
        /**
         * 元素的数量,只能在锁中或者其他保证volatile可见性之间进行访问
         */
        transient int count;
    
        /**
         * 当前segment中可变操作发生的次数,put,remove等,可能会溢出32位
         * 它为chm isEmpty() 和size()方法中的稳定性检查提供了足够的准确性.
         * 只能在锁中或其他volatile读保证可见性之间进行访问
         */
        transient int modCount;
    
        /**
         * 当table大小超过阈值时,对table进行扩容,值为(int)(capacity *loadFactor)
         */
        transient int threshold;
    
        /**
         * 负载因子
         */
        final float loadFactor;
    
        /**
         * 构造方法
         */
        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
  4. 构造方法
    有参构造
    /**
     * ConcurrentHashMap 构造方法
     * @param initialCapacity 初始化容量
     * @param loadFactor 负载因子
     * @param concurrencyLevel 并发segment,segments数组的长度
     */
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        // 大于最大segments容量,取最大容量
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        // 2^sshift = ssize 例如:sshift = 4,ssize = 16
        // 根据concurrencyLevel计算出ssize为segments数组的长度
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) { // 第一次 满足
            ++sshift;  // 第一次 1
            ssize <<= 1; // 第一次 ssize = ssize << 1 (1 * 2^1)
        }
        // segmentShift和segmentMask的定义
        this.segmentShift = 32 - sshift; // 用于计算hash参与运算位数
        this.segmentMask = ssize - 1; // segments位置范围
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        // 计算每个segment中table的容量
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        // HashEntry[]默认 容量
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        // 确保cap是2^n
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        // 创建segments并初始化第一个segment数组,其余的segment延迟初始化
        Segment<K,V> s0 =
                new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                        (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

    无参构造使用默认参数
    public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

  5. 基本方法

    一些UNSAFE方法
    HashEntry
    setNext

    /**
         * 使用volatile语义写入next,保证可见性
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

    entryAt get HashEntry

    /**
     * 获取给定table的第i个元素,使用volatile读语义
     */
    static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab == null) ? null :
                (HashEntry<K,V>) UNSAFE.getObjectVolatile
                        (tab, ((long)i << TSHIFT) + TBASE);
    }

    setEntryAt set HashEntry

    /**
     * 设置给定的table的第i个元素,使用volatile写语义
     */
    static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                       HashEntry<K,V> e) {
        UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
    }

    put 插入元素

  6. 总结

以上是关于ConcurrentHashMap 源码浅析 1.7的主要内容,如果未能解决你的问题,请参考以下文章

ConcurrentHashMap核心源码浅析

java并发:jdk1.8中ConcurrentHashMap源码浅析

浅析HashMap与ConcurrentHashMap的线程安全性

ConcurrentHashMap源码分析(1.8)

ConcurrentHashMap -1.8 源码解析

两万五千字的ConcurrentHashMap底层原理和源码分析