ConcurrentHashMap源码分析

Posted JeffCoding

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentHashMap源码分析相关的知识,希望对你有一定的参考价值。

一、ConcurrentHashMap出现的原因

  我们之前学过HashMap,也知道HashMap不是线程安全的,在多线程环境下,HashMap的put方法有可能引起死循环。于是HashTable这个类出现,它在大量的方法前都加了内置锁Synchronized,这就保证了它的线程安全性,但是这种方法太极端,导致效率低下。当一个线程访问了HashTable的同步方法时,其它线程就只能等待该线程释放锁。在这种情况,针对多线程的情况,ConcurrentHashMap应运而生。

二、ConcurrentHashMap的数据结构

  ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问
这里写图片描述
(图片来自:http://www.cnblogs.com/dolphin0520/p/3920373.html
  和HashMap不同之处是,HashMap是使用一个数组来连接各个Entry链,而ConcurrentHashMap则是使用了Segment数组(继承ReentrantLock)来链接各个HashEntry数组,然后各个HashEntry数组中连接各自的HashEntry链。(这个要很注意)每一个Segment都有一个锁,所以这可以达到并发得访问各个Segment的Entry。
  这里写图片描述
  

三、ConcurrentHashMap的定义

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {

  ConcurrentHashMap继承自AbstractMap,实现了ConcurrentMap接口,使得它具有Map的属性,同时又有多线程相关的属性

ConcurrentHashMap的成员变量:

    //初始的容量
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    //初始的加载因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //初始的并发等级(下面会叙述作用)
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //最大容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
    //最小的segment数量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    //最大的segment数量
    static final int MAX_SEGMENTS = 1 << 16; 
    //
    static final int RETRIES_BEFORE_LOCK = 2;

四、ConcurrentHashMap的构造函数

   //通过指定的容量,加载因子和并发等级创建一个新的ConcurrentHashMap
   public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
         //对容量,加载因子和并发等级做限制
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        //限制并发等级不可以大于最大等级
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // 下面即通过并发等级来确定Segment的大小
        //sshift用来记录向左按位移动的次数
        int sshift = 0;
        //ssize用来记录Segment数组的大小
        int ssize = 1;
        //Segment的大小为大于等于concurrencyLevel的第一个2的n次方的数
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }

        this.segmentShift = 32 - sshift;
        //segmentMask的值等于ssize - 1(这个值很重要)
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        //c记录每个Segment上要放置多少个元素
        int c = initialCapacity / ssize;
        //假如有余数,则Segment数量加1
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;

        //创建第一个Segment,并放入Segment[]数组中,作为第一个Segment
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

  对于容量和加载因子:我在HashMap那篇文章已经讲解得很清楚: Java容器(四):HashMap(Java 7)的实现原理
  并发等级(concurrencyLevel):用来确定Segment的数量,Segment的个数为大于等于concurrencyLevel的 第一个 2的n次方的数,例如当concurrencyLevel为12,13,14,15,16时,此时的Segment的数量为16
  segmentMask:这个为什么要为Segment数组的长度 -1, 这个也在HashMap中讲过了,主要是为了让低位为1,这样在做&运算确定Segment的索引时能够更加分散

五、ConcurrentHashMap的put操作

    public V put(K key, V value) {
        Segment<K,V> s;
        //ConcurrentHashMap的key和value都不能为null
        if (value == null)
            throw new NullPointerException();

        //这里对key求hash值,并确定应该放到segment数组的索引位置
        int hash = hash(key);
        //j为索引位置,思路和HashMap的思路一样,这里不再多说
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        //这里很关键,找到了对应的Segment,则把元素放到Segment中去
        return s.put(key, hash, value, false);
    }

  得到hash值向右按位移动segmentShift位,然后再与segmentMask做&运算得到segment的索引j。例如concurrencyLevel等于16,则sshift等于4,则segmentShift为28。hash值是一个32位的整数,将其向右移动28位就变成这个样子:
  0000 0000 0000 0000 0000 0000 0000 xxxx,然后再用这个值与segmentMask做&运算,也就是取最后四位的值。这个值确定Segment的索引。
  其实大体和HashMap相似

  下面看看具体如何插入到Segment中的

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //这里是并发的关键,每一个Segment进行put时,都会加锁
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                //tab是当前segment所连接的HashEntry数组
                HashEntry<K,V>[] tab = table;
                //确定key的hash值所在HashEntry数组的索引位置
                int index = (tab.length - 1) & hash;
                //取得要放入的HashEntry链的链头
                HashEntry<K,V> first = entryAt(tab, index);
                //遍历当前HashEntry链
                for (HashEntry<K,V> e = first;;) {
                    //如果链头不为null
                    if (e != null) {
                        K k;
                        //如果在该链中找到相同的key,则用新值替换旧值,并退出循环
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        //如果没有和key相同的,一直遍历到链尾,链尾的next为null,进入到else
                        e = e.next;
                    }
                    else {//如果没有找到key相同的,则把当前Entry插入到链头

                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        //此时数量+1
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            //如果超出了限制,要进行扩容
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                //最后释放锁
                unlock();
            }
            return oldValue;
        }

  我们来重新理一理思路:
  1. 首先对key进行第1次hash,通过hash值确定segment的位置
  2. 然后在segment内进行操作,获取锁
  3. 接着获取当前segment的HashEntry数组,然后对key进行第2次hash,通过hash值确定在HashEntry数组的索引位置
  4. 然后对当前索引的HashEntry链进行遍历,如果有重复的key,则替换;如果没有重复的,则插入到链头
  5. 关闭锁

  可见,在整个put过程中,进行了2次hash操作,才最终确定key的位置。

五、ConcurrentHashMap的remove操作

    public V remove(Object key) {
        //求key的hash
        int hash = hash(key);
        //求得hash对应的Segment
        Segment<K,V> s = segmentForHash(hash);
        //在segment内进行删除
        return s == null ? null : s.remove(key, hash, null);
    }
     final V remove(Object key, int hash, Object value) {
            //获取锁
            if (!tryLock())
                scanAndLock(key, hash);
            V oldValue = null;
            try {
                //tab是当前segment所连接的HashEntry数组
                HashEntry<K,V>[] tab = table;
                //确定key的hash值所在HashEntry数组的索引位置
                int index = (tab.length - 1) & hash;
                //取得要放入的HashEntry链的链头
                HashEntry<K,V> e = entryAt(tab, index);
                //pred用来记录待删除节点的前一个节点
                HashEntry<K,V> pred = null;
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next;
                    //当找到了待删除及节点的位置
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) {
                            //如果待删除节点的前节点为null,即待删除节点时链头节点,此时把该位置指向第2个结点就行了
                            if (pred == null)
                                setEntryAt(tab, index, next);
                            //如果有前节点,则待删除节点的前节点的next指向待删除节点的的下一个节点,删除成功
                            else
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                //最后关闭锁
                unlock();
            }
            return oldValue;
        }

  我们来理一理思路:
  1. 首先对key进行第1次hash,通过hash值确定segment的位置
  2. 然后在segment内进行操作,获取锁
  3. 接着获取当前segment的HashEntry数组,然后对key进行第2次hash,通过hash值确定在HashEntry数组的索引位置
  4. 用一个HashEntry引用来记录待删除节点的前一个节点,然后删除待删除节点
  5. 关闭锁

  更多关于ConcurrentHashMap的文章:
  Java并发编程:并发容器之ConcurrentHashMap
  [Java并发包学习八]深度剖析ConcurrentHashMap

以上是关于ConcurrentHashMap源码分析的主要内容,如果未能解决你的问题,请参考以下文章

ConcurrentHashmap核心源码分析

两万五千字的ConcurrentHashMap底层原理和源码分析

ConcurrentHashMap源码解析_04 transfer方法源码分析(难点)

ConcurrentHashMap源码解析_04 transfer方法源码分析(难点)

ConcurrentHashMap源码解析文章总目录

ConcurrentHashMap实现原理及源码分析