ConcurrentHashMap源码解析-Java7

Posted 寻觅beyond

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentHashMap源码解析-Java7相关的知识,希望对你有一定的参考价值。

目录

一.ConcurrentHashMap的模型图

二.源码分析-类定义

  2.1 极简ConcurrentHashMap定义

  2.2 Segment内部类

  2.3 HashEntry内部类

  2.4 ConcurrentHashMap的重要常量

三.常用接口源码分析

  3.1 ConcurrentHashMap构造方法

  3.2 map.put操作

  3.3 创建新segment

  3.4 segment.put操作

  3.5 segment.rehash扩容

  3.6 map.get操作

  3.7 map.remove操作

  3.8 map.size操作

 

  原文地址:https://www.cnblogs.com/-beyond/p/13157083.html

一.ConcurrentHashMap的模型图

  本文所有的介绍都是针对Java7而言!!!!!

  下面是ConcurrentHashMap的结构图,ConcurrentHashMap有一个segments数组,每个segment中又有一个table数组,该数组的每个元素时HashEntry类型。

   

  可以简单的理解为ConcurrentHashMap是多个HashMap组成,每一个HashMap是一个segment,就如传说中一样,ConcurrentHashMap使用分段锁的方式,这个“段”就是segment。

  ConcurrentHashMap之所以能够保证并发安全,是因为支持对不同segment的并发修改操作,比如两个线程同时修改ConcurrentHashMap,一个线程修改第一个segment的数据,另一个线程修改第二个segment的数据,两个线程可以并发修改,不会出现并发问题;但是多个线程同一个segment进行并发修改,则需要先获取该segment的锁后再修改,修改完后释放锁,然后其他要修改的线程再进行修改。

  那么,ConcurrentHashMap可以支持多少并发量(写)呢?这个也就是问,ConcurrentHashMap最多能支持多少线程并发修改,其实也就是segment的数量,只要修改segment的这些线程不是修改同一个segment,那么这些线程就可以并行执行,这也就是ConcurrentHashMap的并发量(segment的数量)。

  注意,ConcurrentHashMap创建完成后,也就是segment的数量、并发级别已经确定,则segment的数量和并发级别都不能再改变了,即使发生扩容,也是segment中的table进行扩容,segment的数量保持不变。

 

二.源码分析-类定义

2.1 极简ConcurrentHashMap定义

  下面是省略了大部分代码的ConcurrentHashMap定义:

package java.util.concurrent;

import java.util.AbstractMap;
import java.util.concurrent.locks.ReentrantLock;

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {

    final Segment<K, V>[] segments;

    /**
     * segment分段的定义
     */
    static final class Segment<K, V> extends ReentrantLock implements Serializable {

        transient volatile HashEntry<K, V>[] table;
    }

    /**
     * 存放的元素节点
     */
    static final class HashEntry<K, V> {

    }
}

 

2.2 Segment内部类

  ConcurrentHashMap内部有一个segments属性,是Segment类型的数组,Segment类和HashMap很相似(Java7的HashMap),维持一个数组,数组的每个元素是HashEntry类型(可以理解为HashMap的节点),当发生hash冲突后,则使用拉链法(头插法)来解决冲突。

  而Segment数组的定义如下,省略了方法:

static final class Segment<K, V> extends ReentrantLock implements Serializable {
    static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    private static final long serialVersionUID = 2249069246763182397L;
    
    // segment的负载因子(segments数组中的所有segment负载因子都相同)
    final float loadFactor;
    
    // segment中保存元素的数组
    transient volatile HashEntry<K, V>[] table;
   
    // 该segment中的元素个数
    transient int count;
    
    // modify count,该segment被修改的次数
    transient int modCount;
    
    // segment的扩容阈值
    transient int threshold;
}

  注意每个Segment都有负载因子、以及扩容阈值,但是后面可以看到,其实segments数组中的每一个segment,负载因子和扩容阈值都相同,因为创建的时候,都是使用0号segment的负载因子和扩容阈值。

 

2.3 HashEntry内部类

  Segment类中有一个table数组,table数组的每个元素都是HashEntry类型,和HashMap的Entry类似,源码如下:(省略了方法)

/**
 * map中每个元素的类型
 */
static final class HashEntry<K, V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K, V> next;
}

 

2.4 ConcurrentHashMap的一些常量

  ConcurrentHashMap中有很多常量,

// 默认初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;

// 默认的负载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;

// 默认的并发级别(同时支持多少线程并发修改)
// 因为JDK7中ConcurrentHashMap中是用分段锁实现并发,不同分段的数据可以进行并发操作,同一个段的数据不能同时修改
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;

// 每一个分段的数组容量初始值
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

// 最多能有多少个segment
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

// 尝试对整个map进行操作(比如说统计map的元素数量),可能需要锁定全部segment;
// 这个常量表示锁定所有segment前,尝试的次数
static final int RETRIES_BEFORE_LOCK = 2;

  

三.常用接口源码分析

3.1 ConcurrentHashMap构造方法

  ConcurrentHashMap有多个构造方法,但是内部其实都是调用同一个进行创建:

public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity) {
    this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

/**
 * 统一调用的构造方法
 *
 * @param initialCapacity  初始容量
 * @param loadFactor       负载因子
 * @param concurrencyLevel 并发级别
 */
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    // 校验参数合法性
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {
        throw new IllegalArgumentException();
    }

    // 对并发级别进行调整,不允许超过segment的数量(超过segment其实是没有意义的)
    if (concurrencyLevel > MAX_SEGMENTS) {
        concurrencyLevel = MAX_SEGMENTS;
    }

    // 根据concurrencyLevel确定sshift和ssize的值
    int ssize = 1; // ssize是表示segment的数量,ssize是不小于concurrencyLevel的数,并且是2的n次方
    int sshift = 0;// sshift是ssize转换为2进制后的位数,比如ssize为16(1000),则sshift为4
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 比如concurrencyLevel默认为16,走完循环,sshift为4,ssize为16
    // 如果concurrentLevel为8,则sshift为3,ssize为8

    // segment的shift偏移量
    this.segmentShift = 32 - sshift;
    // segment掩码
    this.segmentMask = ssize - 1;

    // 对传入的初始容量进行调整(不允许大于最大容量)
    if (initialCapacity > MAXIMUM_CAPACITY) {
        initialCapacity = MAXIMUM_CAPACITY;
    }

    // 假设传入的容量为128,并发级别为16,则initialCapacity为128,ssize为16
    int c = initialCapacity / ssize;
    // c可以理解为根据传入的初始容量,计算出每个segment中的数组容量
    if (c * ssize < initialCapacity) {
        ++c;
    }

    // cap表示的是每个segment中的数组容量
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 如果默认的每个segment中的数组长度小于上面计算出来的每个segment的数组长度,则将容量翻倍
    while (cap < c) {
        cap <<= 1;
    }

    // 创建一个segment,作为segments数组的第一个segment
    Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), new HashEntry[cap]);

    // 创建segments数组
    Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];

    // 将s0赋值给segments数组的第一个元素(偏移量为0)
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]

    // 复制segment数组
    this.segments = ss;
}

/**
 * 传入map,将map中的元素加入到ConcurrentHashMap中
 * 注意使用默认的负载因子(0.75)和默认的并发级别(16)
 * 初始容量取map容量和默认容量的较大值
 */
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY),
            DEFAULT_LOAD_FACTOR,
            DEFAULT_CONCURRENCY_LEVEL);
    putAll(m);
}

  

3.2 map.put操作

  map.put,map就是指ConcurrentHashMap,其实就是确定HashEntry应该放入哪个segment中的哪个位置,所以可分为3步:

  1.首先需要确定放入哪个segment;

  2.确定segment后,再确定HashEntry应该放入segment的哪个位置;

  3.进行覆盖覆盖或者插入。

/**
 * put操作,注意key或者value为null时,会抛出NPE
 */
@SuppressWarnings("unchecked")
public V put(K key, V value) {
    Segment<K, V> s;
    if (value == null) {
        throw new NullPointerException();
    }

    // 计算key的hash
    int hash = hash(key);

    // hash值右移shift位后 与 mask掩码进行取与,确定数据应该放到哪个segments数组的哪一个segment中
    int j = (hash >>> segmentShift) & segmentMask;

    // 判断计算出的segment数组位置上的segment是否为null,如果为null,则进行创建segment
    if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
        s = ensureSegment(j);
    }

    // 创建segment后,将数据put到segment中,调用的segment.put()
    return s.put(key, hash, value, false);
}

  

3.3 创建新segment

  上面put的时候,如果发现segment为null,则会进行调用ensureSegment进行创建segment,代码如下:

/**
 * 扩容(创建)segment
 *
 * @param k 需要扩容或者需要创建的segment位置
 * @return 返回扩容后的segment
 */
@SuppressWarnings("unchecked")
private Segment<K, V> ensureSegment(int k) {
    final Segment<K, V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // 传入的index,对应的偏移量
    Segment<K, V> seg;

    // 判断是否需要扩容或者创建segment,如果获取到segment不为null,则返回segment
    if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
        Segment<K, V> proto = ss[0]; // “原型设计模式”,使用segments数组的0号位置segment
        int cap = proto.table.length;// 0号segment的table长度
        float lf = proto.loadFactor; // 0号segment的负载因子
        int threshold = (int) (cap * lf); // 0号segment的扩容阈值

        // 创建一个HashEntry的数组,数组容量和0号segment相同
        HashEntry<K, V>[] tab = (HashEntry<K, V>[]) new HashEntry[cap];

        // 再次检测,指定的segment是不是为null,如果为null才进行下一步操作
        if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
            // 创建segment
            Segment<K, V> s = new Segment<K, V>(lf, threshold, tab);

            // 使用CAS将新创建的segment填入指定的位置
            while ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) {
                    break;
                }
            }
        }
    }

    // 返回新增的segment
    return seg;
}

  上面需要注意的是:

  1.创建segment中的table数组时,是使用0号segment的table属性(长度、负载因子、阈值);

  2.创建segment前会进行再check,check发现segment的确为null时,再进行创建segment;

  3.利用CAS来将创建的segment填入segments数组中;

 

3.4 segment.put操作

  当确定HashEntry应该放到哪个segment后,就开始调用segment的put方法,如下:

/**
 * 在确定应该存放到那个segment后,就segment.put()将k-v存入segment中
 *
 * @param key          put的key
 * @param hash         hash(key)的值
 * @param value        put的value
 * @param onlyIfAbsent true:key对应的Entry不进行覆盖,false:key对应的entry存在与否,都进行put覆盖
 * @return
 */
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 先获取锁(ReentrantLock,内部使用非公平锁)
    HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K, V>[] tab = table;

        // 根据hash值计算出在segment的table中的位置
        int index = (tab.length - 1) & hash;

        // 定位到segment的table的index位置(第一个节点)
        HashEntry<K, V> first = entryAt(tab, index);

        // 从第一个节点开始遍历
        for (HashEntry<K, V> e = first; ; ) {
            // 节点不为空,则判断是否key是否相同(相同HashEntry)
            if (e != null) {
                K k;
                // 比较是否key是否相等(判断put的key是否已经存在)
                if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                    // 如果key已经存在,则进行覆盖,如果onlyIsAbsent为false(不关心key对应的Entry是否存在)
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆盖旧值,修改计数加1
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            } else {
                // 头插法,将put的k-v创建新HashEntry,放到first的前面
                if (node != null) {
                    node.setNext(first);
                } else {
                    node = new HashEntry<K, V>(hash, key, value, first);
                }

                // segment中table元素数量加1
                int c = count + 1;

                // 加1后的size大于扩容阈值,且数组的长度小于最大容量,则进行rehash
                if (c > threshold && tab.length < MAXIMUM_CAPACITY) {
                    // 扩容,并进行rehash
                    rehash(node);
                } else {
                    // 将节点放入数组中的指定位置
                    setEntryAt(tab, index, node);
                }

                // 修改次数加一,修改segment的table元素个数
                ++modCount;
                count = c;

                // 旧值为null
                oldValue = null;
                break;
            }
        }
    } finally {
        // 释放锁
        unlock();
    }
    return oldValue;
}

  梳理一下,大致在做下面几件事:

  1.先获取锁(ReetrantLock,内部使用非公平锁NonFairSync);

  2.获取到锁后根据hash计算出在table的位置;

  3.遍历table的HashEntry的链表,如果有相同key的,则进行覆盖,如果没有,在进行头插法;

  4.插入链表后,确定是否需要扩容,需要则执行rehash;

  5.修改计数(count、modCount),并且释放锁。

 

3.5 segment.rehash扩容

  segment扩容时,会将该segment的容量扩容为之前的2倍,并且将各位置的链表节点元素进行rehash。

/**
 * 将segment的table容量扩容一倍(newCap=oldCap*2),注意只会扩容该node所在的segment
 *
 * @param node segment[i]->链表的头结点
 */
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K, V> node) {
    HashEntry<K, V>[] oldTable = table;
    int oldCapacity = oldTable.length;

    // 新容量为旧容量的2倍
    int newCapacity = oldCapacity << 1;

    // 设置新的扩容阈值
    threshold = (int) (newCapacity * loadFactor);

    // 申请新数组,数组长度为新容量值
    HashEntry<K, V>[] newTable = (HashEntry<K, V>[]) new HashEntry[newCapacity];

    // 循环遍历segment的数组(旧数组)
    int sizeMask = newCapacity - 1; // 新的掩码
    for (int i = 0; i < oldCapacity; i++) {
        // 获取第i个位置的HashEntry节点,如果该节点为null,则该位置为空,不作处理
        HashEntry<K, V> e = oldTable[i];
        if (e != null) {
            HashEntry<K, V> next = e.next;

            // 计算新位置
            int idx = e.hash & sizeMask;

            // next为null,表示该位置只有一个节点,直接将节点复制到新位置
            if (next == null) {   //  Single node on list
                newTable[idx] = e;
            } else { // Reuse consecutive sequence at same slot
                HashEntry<K, V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K, V> last = next; last != null; last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // 从头结点开始,开始将节点拷贝到新数组中
                for (HashEntry<K, V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K, V> n = newTable[k];
                    newTable[k] = new HashEntry<K, V>(h, p.key, v, n);
                }
            }
        }
    }

    // 将头结点添加到segment的table中
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

  为啥扩容的时候没有加锁呀?

  其实已经加锁了,只不过不是在rehash中加锁!!!因为rehash是在map.put中调用,put的时候已经加锁了,所以rehash中不用加锁。

  

3.6 map.get操作

  get操作,先定位到segment,然后定位到segment的具体位置,进行获取

/**
 * 从ConcurrentHashMap中获取key对应的value,不需要加锁
 */
public V get(Object key) {
    Segment<K, V> s;
    HashEntry<K, V>[] tab;

    // 计算key的hash
    int h = hash(key);

    // 计算key处于哪一个segment中(index)
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

    // 获取数组中该位置的segment,如果该segment的table不为空,那么就继续在segment中查找,否则返回null,因为未找到
    if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {

        // tab指向segment的table数组,通过hash计算定位到table数组的位置(然后开始遍历链表)
        HashEntry<K, V> e;
        for (e = (HashEntry<K, V>) UNSAFE.getObjectVolatile(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            // 判断key和hash是否匹配,匹配则证明找到要查找的数据,将数据返回
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    
    return null;
}

  

3.7 map.remove操作

   删除节点,和get的流程差不多,先定位到segment,然后确定segment的哪个位置(哪条链表),遍历链表,找到后进行删除。

/**
 * 删除map中key对应的元素
 */
public V remove(Object key) {
    // 计算key的hash
    int hash = hash(key);

    // 根据hash确定segment
    Segment<K, V> s = segmentForHash(hash);

    // 调用segment.remove进行删除
    return s == null ? null : s.remove(key, hash, null);
}

/**
 * 删除segment中key对应的hashEntry
 * 如果传入的value不为空,则会在value匹配的时候进行删除,否则不操作
 */
final V segmeng.remove(Object key, int hash, Object value) {
    // 获取锁失败,则不断自旋尝试获取锁
    if (!tryLock()) {
        scanAndLock(key, hash);
    }

    V oldValue = null;
    try {
        HashEntry<K, V>[] tab = table;
        // 定位到segment中table的哪个位置
        int index = (tab.length - 1) & hash;
        HashEntry<K, V> e = entryAt(tab, index);
        HashEntry<K, V> pred = null;

        // 遍历链表
        while (e != null) {
            K k;
            HashEntry<K, V> next = e.next;
            // 如果key和hash都匹配
            if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                V v = e.value;
                // 如果没有传入value,则直接删除该节点
                // 如果传入了value,比如调用的map.remove(key,value),则要value匹配才会删除,否则不操作
                if (value == null || value == v || value.equals(v)) {
                    if (pred == null) { // 头结点就是要找删除的元素,next为null,则将null赋值数组的该位置
                        setEntryAt(tab, index, next);
                    } else { // 
                        pred.setNext(next);
                    }

                    // 修改次数加一,map数量减一
                    ++modCount;
                    --count;
                    oldValue = v;
                }
                break;
            }

            // 不匹配时,pred保存当前一次检测的节点,e指向下一个节点
            pred = e;
            e = next;
        }
    } finally {
        unlock();// 释放锁
    }
    return oldValue;
}

  

3.8 map.size操作

  ConcurrentHashMap的size(),需要统计每一个segment中的元素数量(也就是count值),因为不同的segment允许并发修改,统计过程中可能会出现修改操作,导致统计不准确,所以要想准确统计整个map的元素数量,可以这样做:通过加锁的方式来解决(将所有的segment都加锁),这样就能保证元素不会变化了,这是我们的想法。

  而ConcurrentHashMap是这样解决的,先尝试不加锁进行统计两次,这两次统计,不止会统计每个segment的元素数量,还会统计每个segment的modCount(修改次数),进行汇总;如果两次统计的modCount总量相同,也就说明两次统计期间没有修改操作,证明统计的元素总量正确;如果两次modCount总量不相同,表示有修改操作,则进行重试,如果重试后,发现还是不准确(modCount不匹配),那么就尝试为所有的segment加锁,再进行统计。

  源码如下:

/**
 * 获取ConcurrentHashMap中的元素个数,如果元素个数超过Integer.MAX_VALUE,则返回Integer.MAX_VALUE
 */
public int size() {
    final Segment<K, V>[] segments = this.segments;
    int size;           // 返回元素数量(统计结果->元素的总量)
    boolean overflow;   // 标志是否溢出(是否超过Integer.MAX_VALUE)
    long sum;           // 所有segment的modCount总量次数(整个map的修改次数)
    long last = 0L;     // previous sum,上一轮统计的modCount总量
    int retries = -1;   // 记录重试的次数

    try {
        // 此处for循环主要用于控制重试
        for (; ; ) {
            // 重试的次数如果达到RETRIES_BEFORE_LOCK,则强制获取所有segment的锁
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j) {
                    ensureSegment(j).lock();
                    // 强制获取segment的table第i个位置,并加锁
                }
            }

            sum = 0L;
            size = 0;
            overflow = false;
            // 开始对segments中的每一个segment中进行统计
            for (int j = 0; j < segments.length; ++j) {
                // 获取第j个segment
                Segment<K, V> seg = segmentAt(segments, j);
                // 如果segment不为空,则进行统计
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    // size累加
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }

            // 如果本次统计的modCount总量和上次一样,则表示在这两次统计之间没有进行过修改,则不再重试
            if (sum == last) {
                break;
            }
            // 记录本次统计的modCount总量
            last = sum;
        }
    } finally {
        // 判断是否加了锁(如果retries大于RETRIES_BEFORE_LOCK),则证明加了锁,于是进行释放锁
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

  

 

以上是关于ConcurrentHashMap源码解析-Java7的主要内容,如果未能解决你的问题,请参考以下文章

ConcurrentHashMap源码解析-Java7

ConcurrentHashMap源码解析

concurrentHashMap源码解析

concurrentHashMap源码解析

concurrentHashMap源码解析

ConcurrentHashMap -1.8 源码解析