源码分析-ConcurrentHashMap

Posted 千念飞羽

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码分析-ConcurrentHashMap相关的知识,希望对你有一定的参考价值。

文档

doc文档

一个支持并发的提取和修改的散列表。这个类和hashtable准守相同的规范,并且每个版本都对应相同的功能规范。虽然这是一个线程安全的类,但是他并不依赖于一个整体的锁,没有一个锁可以锁住整个元素而静止所有访问。这个类和hashtable可以互操作。

提取操作如get。不会阻塞,所以有可能被更新方法锁覆盖,提取反映了最近完全操作的结果。对于聚合操作如果putAll和clear,并发的提取可能只能反映一部分也结果(聚合操作不是原子性的)。相似的迭代器和枚举返回散列表基于某个点的状态,或者迭代器和枚举创建时的状态。他们不会抛出
ConcurrentModificationException.然后迭代器被设计成没成只能被一个线程访问。

允许的并发数通过可选的concurrencyLevel来指定,默认值为16,内部单元的大小。散列表在内部分区使得运行指示数量的无争用的更新操作。由于表内元素的放置是随机的,因此并发的情况不可预测。理想情况下,你应该指定一个推荐的值允许多少个线程并发的访问表。使用大的值会浪费空间和时间,小的值会导致线程争用。但在一个数量级内的低估或者高估不会有非常明显的差异。如果只有一个线程允许修改,而其他线程将只读时,1是适当的值。调整散列表是一个非常慢的操作,所以尽可能预先估计给一个合适的大小传递给构造器。

这个表和他的视图和它的迭代器实现了Map和Iterator的全部功能。

和HashTable一样,不同于HashMap,这个类允许null作为键或者元素。

源码文档

基本的策略是将表分为几个小的分区。每个分区是一个并发可读的散列表。为了减小占用空间,只构建一个分区,而其他的分区在需要的时候在构建。为了保持懒惰构造的可见性,所有的分区表必须是volatile访问的。通过unsafe方法的segmnetAt实现。他提供了AtomicReferenceArrays的功能但是减少了中间的调用环节。此外,volatile写元素和键值对的next域的所操作使用了更快捷的懒惰集合的写模式。通过(putOrderedObject)因为写总是需要释放锁的。这样做维持了表序列的一致性更新。

历史记录:早起的实现严重依赖于final域,他避免了volatile的读操作但是存在大量的空间占用问题。一些遗留的设计用于兼容序列化的可用性(比如强制构造分区0);

概述

总的来说ConcurrentHashMap就是有一系列的Segment

静态域

    static final int DEFAULT_INITIAL_CAPACITY = 16;//默认初始大小
    static final float DEFAULT_LOAD_FACTOR = 0.75f;//默认装填因子
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;//默认Segment数量
    static final int MAXIMUM_CAPACITY = 1 << 30;//默认表的最大容量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;//Segment的最小数量
    static final int MAX_SEGMENTS = 1 << 16;//Segment的最大容量
    static final int RETRIES_BEFORE_LOCK = 2;//上锁前Entry获取次数,用于size和containsValue。

    private static final sun.misc.Unsafe UNSAFE;//以下的类都用于UNSAFE。这几个写的内容有几个部分是出于序列化的时候的兼容性需要。
    private static final long SBASE;
    private static final int SSHIFT;
    private static final long TBASE;
    private static final int TSHIFT;
    private static final long HASHSEED_OFFSET;
    private static final long SEGSHIFT_OFFSET;
    private static final long SEGMASK_OFFSET;
    private static final long SEGMENTS_OFFSET;

非静态域

    final int segmentMask;//hashCode的高位来选择应该散列到哪个Segment中。
    final int segmentShift;//hashCode的移位大小。
    final Segment<K,V>[] segments;//Segments 数组

    transient Set<K> keySet;//以下为视图项。
    transient Set<Map.Entry<K,V>> entrySet;
    transient Collection<V> values;

方法

构造器

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) 
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;//这个两个参数使用用于计算当前的hashCode应该属于哪一个segment。
        int ssize = 1;
        while (ssize < concurrencyLevel) //ssize取当不小于设定值的2的阶乘数。
            ++sshift;
            ssize <<= 1;
        
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;//c用来计算segment中Entry[]的大小。
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)//同样cap也是Segment中Entry[]的大小,也需要是2的阶乘数。
            cap <<= 1;
        // create segments and segments[0]仅仅为了兼容性讨论。UNSAFE.putOrderedObject。也是出于兼容性需要。
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    

HashEntry

    static final class HashEntry<K,V> 
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) 
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        

        final void setNext(HashEntry<K,V> n) 
            UNSAFE.putOrderedObject(this, nextOffset, n);//putOrderedObject可以储存文件到指定位置,但是不提供可见性,如果要提供可见性需要对域用修饰符volatile修饰。
        

        // Unsafe mechanics
        static final sun.misc.Unsafe UNSAFE;
        static final long nextOffset;
        static 
            try 
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = HashEntry.class;
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
             catch (Exception e) 
                throw new Error(e);
            
        
    

HashEntry整体上比较简单,但是有两点需要说明一下。

首先,HashEntry和HashMap中的Entry是不同的,HashEntry并不是Map中的Entry。这里并没有实现任何接口。因为Map中的entry功能是比较多的。而这里的Entry只是提供了一个链表容器的功能。而且这样的设计也比较好,Map中虽然有Entry的接口但是未必要去实现。

其次,域的可见性的问题。这里这四个域都可以保持可见性,当然volatile的可见性是显而易见的。final本身也是提供一定可见性的。这是final一个比较灵活运用的地方。比如同样的set或者map中容器内的元素,如果元素的域是final的,则可以保证这个容器在存放这样的元素时是线程安全的,即使容器本身没有同步。因为同样的final不可以被设置两变。这个在之前《并发编程实战》上说过。当然从语义上说这里key和hashcode也不应该是可变的,而且对于Entry的访问操作也进行了同步,因为不是所有的域都是final的

Segment

Segment源码文档

Segment会保存表的Entry列表,并且会始终保持一致的状态。所以通过对table域加上volatile的修饰符,read操作无锁的进行读取。在膨胀过程中,需要保存重复的节点,以保证这些旧的列表可以被读取者进行读取。

这个类仅仅对于修改操作定义了锁。除非特意注明,这个class方法执行per-segment 版本的的currentHashMap方法。其他方法被集成进ConcurrentHashMap方法。这些修改方法通过scanAndLock和scanAndLockForPut方法实现受控的自旋锁。这个主要是为了减少缓存未命中的情况(这在hashTable中是很常见的问题)。我们并不真的使用这样的方式去找到node。node因为他们必须在有锁的情况下获得以确保更新的一致性。但是这样做通常比进行重新定位要快的多。scanAndLockForput 创建了一个新的节点如果需要放置的节点没有被找到的情况下。

        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

这是标志了在尝试获取锁之前最小的后期数量。在scanAndLock和scanAndLockForput 中有用。具体在方法中说明。

        transient volatile HashEntry<K,V>[] table;
        transient int count;
        transient int modCount;
        transient int threshold;//table的的大小如果超过这个值就需要进行膨胀了。
        final float loadFactor;

put方法

首先来看put方法

        final V put(K key, int hash, V value, boolean onlyIfAbsent) 
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try 
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;//散列的方式下面统一说。这里只要知道是为了计算下标就好。
                HashEntry<K,V> first = entryAt(tab, index);//找到第一个Entry
                for (HashEntry<K,V> e = first;;) //只要遍历去找所有的Entry看是否有符合条件的。
                    if (e != null) 
                        K k;
                        if ((k = e.key) == key ||//判断是否符合当前的条件
                            (e.hash == hash && key.equals(k))) 
                            oldValue = e.value;
                            if (!onlyIfAbsent) //onlyIfAbsent表示只有在不存在的时候修改,为false就是可以修改已存在的值。
                                e.value = value;
                                ++modCount;
                            
                            break;//退出循环
                        
                        e = e.next;
                    
                    else //当遍历之后没有发现,则需要进行插入Entry的操作。
                        if (node != null)//node是在加锁之前遍历了节点没有发现节点而新建了节点返回的,因此只要把这个档期节点设置为首节点就好。注意这里仅仅是设置了node的next,因为还没有进行size的检查。
                            node.setNext(first);
                        else//node为空说明直接获得了锁,没有构造新的节点。所以需要先构造新的节点。
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)//判断当前的table的大小是否需要进行膨胀。
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);设置table的首节点为noe
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    
                
             finally 
                unlock();
            
            return oldValue;
        

这里还包括两个小的方法:

        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) 
            HashEntry<K,V> first = entryForHash(this, hash);//table中首节点的位置。
            HashEntry<K,V> e = first;//e为迭代器。
            HashEntry<K,V> node = null;//node为当没有找到节点的时候需要返回的新构造的Entry。
            int retries = -1; // negative while locating node//当前重试的次数,当为-1的时候说明正在经历定位元素的节点。当这个值增长到一定数值的时候,MAX_SCAN_RETRIES则说明当前列表比较稳定了,开始上锁。
            while (!tryLock()) //只要没有获得锁就一直进行循环
                HashEntry<K,V> f; // to recheck first below//用来标记f节点是否是首节点。如果不是则说明出现不一致情况需要重新获取f并重新进行扫描。
                if (retries < 0) //retries为-1,表示当前还没有实现过重试。
                    if (e == null) //e为null说明则说明null已经迭代到尾部了。
                        if (node == null) // speculatively create node //需要创建新的元素。
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;//
                    
                    else if (key.equals(e.key))//找到一个已存在的元素。也将重试次数置为0,开始等待当前列表是否为0。等待上锁。
                        retries = 0;
                    else//找下一个元素
                        e = e.next;
                
                else if (++retries > MAX_SCAN_RETRIES) //在两次重试的过程中链表没有发生变化,说明比较稳定,开始上锁。上锁成功后退出循环。
                    lock();
                    break;
                
                else if ((retries & 1) == 0 &&// 再次查询当前hash值对应的talbe的位置是否还是先前保存的first。如果不是则说明列表被修改,重置参数。
                         (f = entryForHash(this, hash)) != first) 
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                
            
            return node;
        

rehash的文档
在一新的table中重分类节点,因为我们使用2的阶层的拓展,所以每一个元素的位置要么在原先的位置,要么需要后移2的阶乘倍。通过找到就节点可以重用的情况来避免clone的发生。统计意义上来说,在默认的的域值下,只有大约六分之一的节点需要克隆。替换的节点只要没有任何读取者的引用指向题目,则是可以被垃圾回收器回收的。(可能由于并发的遍历操作),Entry的访问使用数组下标因为题目被volatile修饰。

        private void rehash(HashEntry<K,V> node) 

            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];//这里有一个疑问问什么要在这里加上一个强制类型转换。
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) 
                HashEntry<K,V> e = oldTable[i];//e代表了旧表中的散列表table下标i位置的头节点,当然如果是空的,则代表table的该位置没有散列项。
                if (e != null) 
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list//对于只有一个散列项的元素只要移动单独的节点就好了。
                        newTable[idx] = e;
                    else  // Reuse consecutive sequence at same slot
                        HashEntry<K,V> lastRun = e;//从这里开始的11行。目的是为了找到链表结尾散列到相同新下标的节点。对于这部分元素进行整体移动,就免去了重新构造新节点的代价。
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) 
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) 
                                lastIdx = k;
                                lastRun = last;
                            
                        
                        newTable[lastIdx] = lastRun;
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) //这部分就是讲除去最后一小段已经集体移动的节点之外,对于之前的所有节点进行复制放入新table的位置中。
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        
                    
                
            
            int nodeIndex = node.hash & sizeMask; // add the new node//把需要添加的新元素添加到原表
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;//对table赋新的表;
        

首先segment再散列或者说膨胀只是对单个Segment进行的。所以如果一个表进行多次变换后Segment中table的大小很可能是不一样的。当然如果散列函数做的比较好应该各个Segment的大小应该是比较接近的。
其次对于原segment的中的原实际上是进行了复制,或者移动的,不过由于只有在put操作中才会进行再散列,而且put又是上了锁的,所以可以保证程序被正确同步,但是对于读操作得到的引用未必是当前的引用了。这一点需要注意。
最后,这中间找链表的最后一段进行复制的操作。这个是从统计意义上说更效率。

remove

final V remove(Object key, int hash, Object value) 
            if (!tryLock())
                scanAndLock(key, hash);//如果上锁识别,则扫描链table中的hash表以保持缓存命中。
            V oldValue = null;
            try 
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;//计算下标
                HashEntry<K,V> e = entryAt(tab, index);//e为tab[index]的链表首节点
                HashEntry<K,V> pred = null;//为单项链表的某节点的前驱节点
                while (e != null) 
                    K k;
                    HashEntry<K,V> next = e.next;
                    if ((k = e.key) == key ||//如果如果找到对映的节点则准备进行设置。
                        (e.hash == hash && key.equals(k))) 
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) //这里不太明白为什么会有value为空的情况Concurrent并不运行value为null。然后进行删除,需要考虑e是头结点的情况
                            if (pred == null)
                                setEntryAt(tab, index, next);
                            else
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            oldValue = v;
                        
                        break;
                    
                    pred = e;
                    e = next;
                
             finally 
                unlock();
            
            return oldValue;
        

这里比较常见的判断方法是先用key==号判断,也就是两者是同一个节点,或者使用key.equals和hashcode来判断。这样比较全面,效率兼顾安全。大部分的hash方面的问题都是这样做的。

此外我其实还有一点不是很明确。就是关于volatile的修饰后内存可见性的问题。当然volatile原理上说是内存缓存禁止标志位。但是从这里来看似乎并不完全是这样,或者我原先的理解有一定问题。我本来理解就是既然没有缓存,就每次读取直接从内存中读取数据了。
但是从这里来看scanAndLock没有任何影响,主要的目的就是在于使缓存刷新,也就是说不停的从内存中读取。但是上锁之后实际上还是进行了读操作的,那上锁之后的读操作到底是从缓存中读取的还是从内存中读取的呢?我理解应该是内存。如果是内存那这样扫描还有什么意义呢。。到目前为止我还是不是很理解。But this code come from Doug Lea。所以大概还是我哪里理解的有问题。。

        private void scanAndLock(Object key, int hash) 
            // similar to but simpler than scanAndLockForPut
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            int retries = -1;
            while (!tryLock()) 
                HashEntry<K,V> f;
                if (retries < 0) 
                    if (e == null || key.equals(e.key))
                        retries = 0;
                    else
                        e = e.next;
                
                else if (++retries > MAX_SCAN_RETRIES) 
                    lock();
                    break;
                
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) 
                    e = first = f;
                    retries = -1;
                
            
        

散列过程

  1. hashSeed的产生过程:

hashSeed的产生过程基本上和HashMap中的是类似的是需要根据sun.misc.Hashing.randomHashSeed(instance)产生的,这需要虚拟机产生启动后才能使用,所以使用一个静态类来初始化。此外这里还有一个是可以通过jdk.map.althashing.threshold来控制的域值需要使用一个静态的Holder类来进行处理。之前在分析HashMap的时候没有详细说,这里尽我所能说明白怎么去控制这些参数以及他们发挥作用的原理。对于String需要单独产生散列种子。

我们从后往前说,首先产生散列种子需要的是一个随机数。这个随机数是由randomHashSeed针对每个ConcurrentHashMap的实例产生的。这里分两种情况,默认情况下返回0,如果设置了ALTERNATIVE_HASHING(并且虚拟机已经启动,则会使用sun.misc.Hashing.randomHashSeed来产生一个随机数)。这里为什么不直接产生,是为了给使用者提供一些自定义的功能,在虚拟机启动的时候可以通过设定一些参数来控制散列种子的产生。这是java的惯用手法凡是根hash产生有关的部分都使用了类似的方式。

    private transient final int hashSeed = randomHashSeed(this);

    private static int randomHashSeed(ConcurrentHashMap instance) 
        if (sun.misc.VM.isBooted() && Holder.ALTERNATIVE_HASHING) 
            return sun.misc.Hashing.randomHashSeed(instance);
        

        return 0;
    

其次这里的ALTERNATIVE_HASHING是根据什么产生的。Holder静态内部了用于产生这个ALTERNATIVE_HASHING。

不同的HashMap中使用的ALTERNATIVE_HASHING的方法都有些不同,这里主要是控制设定的如果阈值,如果阈值过小则需要随机产生散列种子。所以通常情况下都会对String 做单独处理。如果在启动jvm的时候设置了jdk.map.althashing.threshold,Holder会比较jdk.map.althashing.threshold与MAXIMUM_CAPACITY的大小。如果他小于ConcurrentHashMap的最大值MAXIMUM_CAPACITY,则为ALTERNATIVE_HASHINGtrue。至于为什么这样做我现在还没法回答。如果不设置则默认为Integer.MAX_VALUE,也就是不需要设置。换句话说,如果设定的域值小于java的ConcurrentHashMap的最大值MAXIMUM_CAPACITY则需要重新计算散列种子。

    private static class Holder 

        /**
        * Enable alternative hashing of String keys?
        *
        * <p>Unlike the other hash map implementations we do not implement a
        * threshold for regulating whether alternative hashing is used for
        * String keys. Alternative hashing is either enabled for all instances
        * or disabled for all instances.
        */
        static final boolean ALTERNATIVE_HASHING;

        static 
            // Use the "threshold" system property even though our threshold
            // behaviour is "ON" or "OFF".
            String altThreshold = java.security.AccessController.doPrivileged(
                new sun.security.action.GetPropertyAction(
                    "jdk.map.althashing.threshold"));

            int threshold;
            try 
                threshold = (null != altThreshold)
                        ? Integer.parseInt(altThreshold)
                        : Integer.MAX_VALUE;

                // disable alternative hashing if -1
                if (threshold == -1) 
                    threshold = Integer.MAX_VALUE;
                

                if (threshold < 0) 
                    throw new IllegalArgumentException("value must be positive integer.");
                
             catch(IllegalArgumentException failed) 
                throw new Error("Illegal value for 'jdk.map.althashing.threshold'", failed);
            
            ALTERNATIVE_HASHING = threshold <= MAXIMUM_CAPACITY;
        
    
  1. hashCode的产生过程:

ConcurrentHashMap的产生过程和HashMap中是有所不同的,因为ConcurrentHashM实际上是需要根据hashCode的位来计算其属于哪个Segment的。所以对每一位的散列都需要进行精细的控制。中间这一系列的位移操作主要目的就是使得散列效果被平均的分配到各个位上。因为如果是直接用散列的话也许低位不一样,但是高位是相似的,这样使用高位去定位Segment的时候回把大量的Entry定位到一个Segment中,并发性能降低。

此外对于String需要单独的产生随机数种子,这是String的特性决定的。

    private int hash(Object k) 
        int h = hashSeed;

        if ((0 != h) && (k instanceof String)) 
            return sun.misc.Hashing.stringHash32((String) k);
        

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    
  1. 定位到Segment

Segment是不同的默认情况下Segment通过下面的式子计算出来的,默认情况下segmentShift为28,segmentMask为15,也就是说仅仅使用高四位来进行散列到16个Segment中。

int j = (hash >>> segmentShift) & segmentMask;

方法

put

    public V put(K key, V value) 
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;// 计算散列到那个Segment。
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck//获得Segment,但是这里不太明白工作原理。为何要多出这三行,因为是final类的程序所以这里应该是没有可见性问题才对。而且很奇怪的是在在remove中就没有这部分,而是直接使用SegmentForHash
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);//确定了segment之后会把任务委托给Segment的put来做。这部分之前说过这里不详细说明了。
    

不过这里UNSAFE的问题还是有一些需要说明。UNSAFE理解的不太透彻尝试解释一下,有问题以后再修改。
首先,UNSAFE获得的偏移量、首地址、或者增量地址都是针对一个具体的*.class来说的。这些变量都是固定的,可能对于不同的虚拟机会有所不同,但是一旦获得之后就是常量了。但是如果在使用这些常量进行获得、交换或者替换等原子性操作的时的时候需要绑定具体的实例。

首先看下这几个UNSAFE获得常量:

    static 
        int ss, ts;
        try 
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class tc = HashEntry[].class;
            Class sc = Segment[].class;
            TBASE = UNSAFE.arrayBaseOffset(tc);//获得是该类的首地址
            SBASE = UNSAFE.arrayBaseOffset(sc);
            ts = UNSAFE.arrayIndexScale(tc);//获得是该类的增量地址
            ss = UNSAFE.arrayIndexScale(sc);
            HASHSEED_OFFSET = UNSAFE.objectFieldOffset(//获得的是域的偏移地址
                ConcurrentHashMap.class.getDeclaredField("hashSeed"));
            SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentShift"));
            SEGMASK_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segmentMask"));
            SEGMENTS_OFFSET = UNSAFE.objectFieldOffset(
                ConcurrentHashMap.class.getDeclaredField("segments"));
         catch (Exception e) 
            throw new Error(e);
        
        if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)//这里主要是判断Segments和table是不是2的指数。
            throw new Error("data type scale not a power of two");
        SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);//这个计算的是偏移量。实际上在后续的计算中使用SSHIFT而不使用ss,因为位操作速度更快。这样就需要保证但是这里我不明的一点是既然是根据单独的类生成了那应该是保证是保证是固定的值为何还要判断是否是2的阶乘。此外这里为何一定要求是2的阶乘。总之问题多多。主要还是java内存怎么设置的不太理解。ConcurrentHashMap中大量运用UNSAFE来设置和获取,不太明白这样做和直接读取有什么区别。
        TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
    

remove、replace

两者方法类似,就是定位Segment然后委托给Segment来操作就好。

    public V remove(Object key) 
        int hash = hash(key);
        Segment<K,V> s = segmentForHash(hash);
        return s == null ? null : s.remove(key, hash, null);
    

迭代器

迭代器的内容都差不多。因为是map所以有很多衍生的类,这里只看下最主要的,因为实现相对比较简单,我也没有看到非常规的问题。

    abstract class HashIterator 
        int nextSegmentIndex;
        int nextTableIndex;
        HashEntry<K,V>[] currentTable;
        HashEntry<K, V> nextEntry;
        HashEntry<K, V> lastReturned;

        HashIterator() 
            nextSegmentIndex = segments.length - 1;
            nextTableIndex = -1;
            advance();
        

        /**
         * Set nextEntry to first node of next non-empty table
         * (in backwards order, to simplify checks).
         */
        final void advance() //advance的任务是将节点设置到下一个链表的头结点,这里有两种情况一种是当前table[]中的某个链表遍历完,也有可能是Segment中的所有链表遍历完毕,所以这里要两种情况讨论。Map总的来说是从后往前遍历的。每次切换Segment都要切换nextTableIndex 为下一个talbe的长度。当所有Segment遍历完毕也就结束遍历了。
            for (;;) 
                if (nextTableIndex >= 0) 
                    if ((nextEntry = entryAt(currentTable,
                                             nextTableIndex--)) != null)
                        break;
                
                else if (nextSegmentIndex >= 0) 
                    Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);
                    if (seg != null && (currentTable = seg.table) != null)
                        nextTableIndex = currentTable.length - 1;
                
                else
                    break;
            
        

        final HashEntry<K,V> nextEntry() //nextEntry会依照当前节点向后移动,直到移动到null说明当前的链表遍历完成需要移动到下一个链表的头结点这个任务由advance完成。
            HashEntry<K,V> e = nextEntry;
            if (e == null)
                throw new NoSuchElementException();
            lastReturned = e; // cannot assign until after null check
            if ((nextEntry = e.next) == null)
                advance();
            return e;
        

其他

视图比较麻烦也没有特别要说的这里不详细说了。
序列化和之前的兼容性有关也比较麻烦。以后有机会再说吧。

以上是关于源码分析-ConcurrentHashMap的主要内容,如果未能解决你的问题,请参考以下文章

ConcurrentHashMap实现原理及源码分析

ConcurrentHashMap源码分析(1.8)

JDK源码ConcurrentHashMap源码分析

ConcurrentHashMap源码分析

ConcurrentHashMap 扩容分析拾遗

Java小白进阶系列——ConcurrentHashMap源码解析文章总目录