ConcurrentHashmap核心源码分析
Posted 可持续化发展
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ConcurrentHashmap核心源码分析相关的知识,希望对你有一定的参考价值。
以下内容为本人学习小刘老师的源码课程后(哔哩哔哩搜索,小刘讲源码),自己整理的笔记。仅供自己学习之用。
concurrenthashmap的扩容,数据迁移的动作是从下往上走的(以数组来看)。这样的好处是为了避免和迭代操作发生冲突。比如说,我在迭代访问concurrenthashmap的数据。现在从下往上迁移到数组的第11个节点了,我读到第10个桶位了。我要读第11个节点,就发现拿到了一个FWD节点,接着就到新表上去读。如果迁移是从0号桶位开始往下走的话,可能会和迭代操作产生冲突。
concurrenthashmap是一个高并发的容器。如果concurrenthashmap的统计采用AtomicLong的话,会有CAS竞争,性能会很低。所以采用了LongAdder,有化整为零的思想。
常量Constants
-
散列表数组最大限制
private static final int MAXIMUM_CAPACITY = 1 << 30; -
散列表默认值
private static final int DEFAULT_CAPACITY = 16; -
负载因子,JDK1.8中 ConcurrentHashMap 是固定值
private static final float LOAD_FACTOR = 0.75f; -
树化阈值,指定桶位 链表长度达到8的话,有可能发生树化操作。
static final int TREEIFY_THRESHOLD = 8; -
红黑树转化为链表的阈值
static final int UNTREEIFY_THRESHOLD = 6; -
联合TREEIFY_THRESHOLD控制桶位是否树化,只有当table数组长度达到64且 某个桶位 中的链表长度达到8,才会真正树化
static final int MIN_TREEIFY_CAPACITY = 64; -
线程迁移数据最小步长,控制线程迁移任务最小区间的一个值
多个线程同时去扩容时,给每个线程分配的每次扩容任务最小的区间参数,桶位的跨度
private static final int MIN_TRANSFER_STRIDE = 16; -
扩容相关,计算扩容时 生成的一个 标识戳
private static int RESIZE_STAMP_BITS = 16; -
65535 表示参与并发扩容时最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
-
当node节点的hash值为 -1 时,表示当前节点为FWD节点
static final int MOVED = -1; // hash for forwarding nodes -
当node节点的hash值为 -2 时,表示当前节点已经树化,且当前节点为TreeBin对象,TreeBin对象代理操作红黑树
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations -
0x7fffffff -> 0111 1111 1111 1111 1111 1111 1111 1111 它可以将一个负数,通过位与运算后得到一个正数,但不是取绝对值
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash -
当前系统的CPU数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
成员属性Fields
* 散列表,长度一定是2次方数
*/
transient volatile Node<K,V>[] table;
* The next table to use; non-null only while resizing.
* 扩容过程中,会将第一个触发扩容的线程新创建的更大的table 赋值给nextTable 来保持引用
* (因为是并发扩容的,得让其他线程知道将数据迁移到哪里),扩容结束之后,会把新表赋给table,并把nextTable被设置为Null
* 将老数据迁移到新表中
*/
private transient volatile Node<K,V>[] nextTable;
* LongAdder 中的 baseCount 未发生竞争时 或者 当前LongAdder处于加锁状态时,增量累到到baseCount中
*/
private transient volatile long baseCount;
* sizeCtl < 0
* 1. -1 表示当前table正在初始化(有线程在创建table数组),当前线程需要自旋等待..
* 2. 表示当前table数组正在进行扩容 ,高16位表示:扩容的标识戳 低16位表示:(1 + nThread)
* 其中,nThread为当前参与并发扩容的线程数量
* sizeCtl = 0,表示创建table数组时 使用DEFAULT_CAPACITY为大小
* sizeCtl > 0
* 1. 如果table未初始化,表示初始化大小
* 2. 如果table已经初始化,表示下次扩容时的 触发条件(阈值),即下次扩容的扩容阈值。0.75*数组长度
*/
private transient volatile int sizeCtl;
* 迁移的当前下标。并发扩容过程中,迁移数据,为了协调多个线程之间扩容工作,在对象级别用这个属性来记录当前数据迁移工作的进度。
* 并发扩容过程中,记录当前进度。所有线程都需要从transferIndex中分配区间任务,去执行自己的任务。
*/
private transient volatile int transferIndex;
* LongAdder中的cellsBuzy 0表示当前LongAdder对象无锁状态,1表示当前LongAdder对象加锁状态
*/
private transient volatile int cellsBusy;
* LongAdder中的cells数组,当baseCount发生竞争后,会创建cells数组,
* 线程会通过计算hash值 取到 自己的cell ,将增量累加到指定cell中
* 获取当前散列表中元素个数,这是一个期望值 = sum(cells) + baseCount
*/
private transient volatile CounterCell[] counterCells;
静态代码块
// Unsafe mechanics
private static final sun.misc.Unsafe U;
/**表示sizeCtl属性在ConcurrentHashMap中内存偏移地址,知道concurrenthashmap对象的基础地址后,加上这个偏移量,就可以访问sizectl的内存了*/
private static final long SIZECTL;
/**表示transferIndex属性在ConcurrentHashMap中内存偏移地址*/
private static final long TRANSFERINDEX;
/**表示baseCount属性在ConcurrentHashMap中内存偏移地址*/
private static final long BASECOUNT;
/**表示cellsBusy属性在ConcurrentHashMap中内存偏移地址*/
private static final long CELLSBUSY;
/**表示cellValue属性在CounterCell中内存偏移地址*/
private static final long CELLVALUE;
/**表示数组第一个元素的偏移地址, 一个数组内存块拿到后,它内存前部分有一个信息头部,这个头部之后,才是第一个数组元素的内存。abase为信息头部的偏移量*/
private static final long ABASE;
private static final int ASHIFT;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
//表示数组单元所占用空间大小,scale 表示Node[]数组中每一个单元所占用空间大小
int scale = U.arrayIndexScale(ak);
//scale 必须是2的次方数
//1 0000 & 0 1111 = 0
//用于判断scale 是不是2的次方数
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
//numberOfLeadingZeros() 这个方法是返回当前数值转换为二进制后,从高位到低位开始统计,看有多少个0连续在一块。
//8 => 1000 numberOfLeadingZeros(8) = 28
//4 => 100 numberOfLeadingZeros(4) = 29
//ASHIFT = 31 - 29 = 2 ??
//ASHIFT 是为了寻找Node[]数组的元素地址的。
//ABASE + (5 << ASHIFT) 就可以访问数组中第5个元素了,之前是 ABASE + 5 * scale
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
内部类
Node类部分分析
static class Node<K,V> implements Map.Entry<K,V> {
//hash、key设置为final,一、是为了设置成功后,不能再改值。二是为了线程安全性,因为设为final就不能改了,只能读,只读不会出现线程安全性问题。
final int hash;//当前节点的hash值
final K key;
//val、next用volatile修饰是 为了保证内存的可见性,
volatile V val;
volatile Node<K,V> next;//链表中指向下一个节点的引用
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
........
}
TreeNode类部分分析
/**
* Nodes for use in TreeBins
* 可以构成双向链表
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
//双向链表的当前节点的前置引用
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
.......
}
ForwardingNode类部分分析
/**
* A node inserted at head of bins during transfer operations.
* 拿到这个节点后,说明当前table数组正在扩容,数据正在迁移中。
* 读线程拿到这个节点,就调用find方法到新表中去查询
* 写线程拿到这个节点,就要去帮concurrenthashmap做并发扩容
*/
static final class ForwardingNode<K,V> extends Node<K,V> {
//final 修饰 赋值之后就,不能再改了
//指向更大的新创建的table
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);//MOVED=-1
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {
在nextTable上查找数据
outer: for (;;) {
//条件一:永远不成立
//条件二:永远不成立
//条件三:永远不成立
//条件四:在新扩容表中 重新定位 hash 对应的头结点
// 我在查询时发现老table的桶位中是 FWD节点。FWD节点指向一个更大的新table。我就去新table数组中定位桶位,发现新桶位是null。
// 有可能数据迁移完后,有其他线程把数据删了。也有可能老表的桶位本来就是null。
//true -> 1.在oldTable中 对应的桶位在迁移之前就是null
// 2.扩容完成后,有其它写线程,将此桶位设置为了null
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
//前置条件:扩容后的新表 对应hash的桶位一定不是null,e为此桶位的头结点
//e可能为哪些node类型?
//1.node 类型
//2.TreeBin 类型
//3.FWD 类型
for (;;) {
//eh 新扩容后表指定桶位的当前节点的hash
//ek 新扩容后表指定桶位的当前节点的key
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
//扩容后的新表的命中桶位的节点 和 查询的key、hash都一致,返回当前节点
//不一定是桶位头节点,因为下面有e = e.next,e引用会指向下一个节点
return e;
//eh<0 ,表示当前节点可能为
//1.TreeBin 类型 2.FWD类型(新扩容的表,在并发很大的情况下,可能在此方法 再次拿到FWD类型..)
if (eh < 0) {
if (当前节点为ForwardingNode){
//如果在老表a中拿到FWD节点,就会去大的新表b查找数据。
//如果在新表b中,再次拿到FWD节点,说明并发量比较大,就会去更大的新表c中查找数据。
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else{
//此桶位 为 TreeBin 节点,使用TreeBin.find 查找红黑树中相应节点。
return e.find(h, k);
}
}
//前置条件:当前节点的key、hash 和查询的key、hash不一致,并且此桶位是 链表桶位
//1.将当前节点的引用e 指向链表中的下一个元素
//2.判断此时的引用e 是否为空.
//如果为空,则说明遍历完链表后,我都没有找到和查询key、hash都一致的节点,return null
//如果不空,则自旋,再次比较key和hash
if ((e = e.next) == null)
return null;
}
}
}
}
内部小方法源码分析
static final int spread(int h)
* 1100 0011 1010 0101 0001 1100 0001 1110
* 0000 0000 0000 0000 1100 0011 1010 0101
* 1100 0011 1010 0101 1101 1111 1011 1011
* ---------------------------------------
* 1100 0011 1010 0101 1101 1111 1011 1011
* 0111 1111 1111 1111 1111 1111 1111 1111
* 0100 0011 1010 0101 1101 1111 1011 1011
*/
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
让hashcode的高16位 异或 hashcode,再位与 HASH_BITS。
tabAt方法
//通过 ((long)i << ASHIFT) + ABASE ,计算出指定元素的偏移量。这个方法是获取指定Node数组的指定索引的元素。到主存中获取数据
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
casTabAt方法
//该方法的作用是通过CAS的方式,给Node数组的指定元素设置值。Node<K,V> c 期望值,Node<K,V> v 新值
//如果期望值和内存中的值一致,则将内存中的值修改为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);//偏移量
}
setTabAt方法
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
tableSizeFor方法
* 返回>=c的最小的2的次方数
* c=28
* n=27 => 0b 11011
* 11011 | 01101 => 11111
* 11111 | 00111 => 11111
* ....
* => 11111 + 1 =100000 = 32
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
resizeStamp方法
* 扩容时计算出一个扩容唯一标识戳,用来标识table数组是从多大扩容到多大的。
* 多线程并发扩容,线程要拿到这个标识戳,且标识戳一致,才能参与到这个扩容中。
* 16 -> 32
* numberOfLeadingZeros(16) => 1 0000 =>27 =>0000 0000 0001 1011
* |
* (1 << (RESIZE_STAMP_BITS - 1)) => 1000 0000 0000 0000 => 32768
* ---------------------------------------------------------------
* 以上是关于ConcurrentHashmap核心源码分析的主要内容,如果未能解决你的问题,请参考以下文章
Java并发集合类ConcurrentHashMap底层核心源码解析