JUCConcurrentHashMap源码分析
Posted remo0x
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUCConcurrentHashMap源码分析相关的知识,希望对你有一定的参考价值。
类注释
- 取数方法没有加锁,所以会被存数方法影响
- 聚合方法:size/isEmpty/containsValue,在没有被并发更新的情况下是准确的,但是存在并发更新时,上述聚合方法只是反映了map的一个瞬时状态,这种瞬时状态只能用于监测或估算,而不能用于程序控制
- 和Hashtable一样,和HashMap相反,不允许使用null作为key或value
属性
常量
MAXIMUM_CAPACITY
- map在确定数组下标时,采用的是
(length-1)&hash
的方式,只有当length为2的指数幂的时候才能较均匀的分布元素。所以map规定了其容量必须是2的n次方,使用位运算同时还提高了Java的处理速度 - map内部由
Entry[]
数组构成,Java的数组下标是由int
表示的。所以对于map来说其最大的容量应该是不超过int最大值的一个2的指数幂,而最接近int最大值的2的指数幂用位运算符表示就是1<<30
LOAD_FACTOR
- 在构造函数中指定loadFactor只能影响初次构造的map的capacity,后续不会用到
- 其实LOAD_FACTOR的值不会常用,因为直接用
n-(n>>>2)
即表示n的3/4,即n*LOAD_FACTOR
TREEIFY_THRESHOLD/UNTREEIFY_THRESHOLD
- TREEIFY_THRESHOLD设置为8的原因:TreeNode占用空间是Node的两倍,而且结点数较少时,红黑树的查找效率跟链表相差不大,所以只在结点数量较多时用红黑树才能得到时间和空间上的tradeoff。在随机hash和LOAD_FACTOR的前提下,bins中的结点分布符合泊松分布,在一个bin中节点数达到8的概率是0.00000006,所以使用8作为阈值是很考究的。
- UNTREEIFY_THRESHOLD设置为6的原因:使用6也是一个tradeoff,在TREEIFY_THRESHOLD使用8的前提下,如果UNTREEIFY_THRESHOLD使用7,则可能存在对同一个bin频繁插入和删除的操作就会导致bin频繁的转为红黑树和链表,如果UNTREEIFY_THRESHOLD使用6以下的值,那红黑树在空间和时间效率上并不比链表优良,所以使用6是最合适的
MIN_TRANSFER_STRIDE
- 扩容操作中,transfer这个步骤是允许多线程的,这个常量表示一个线程执行transfer时,最少要对连续的16个hash桶进行transfer(不足16就按16算,多控制下正负号就行)。也就是单线程执行transfer时的最小任务量,单位为一个hash桶,这就是线程的transfer的步进(stride)
- 最小值是DEFAULT_CAPACITY,不使用太小的值,避免太小的值引起transfer时线程竞争过多,如果计算出来的值小于此值,就使用此值。正常步骤中会根据CPU核心数目来算出实际的,一个核心允许8个线程并发执行扩容操作的transfer步骤,这个8是个经验值,不能调整的
- 因为transfer操作不是IO操作,也不是死循环那种100%的CPU计算,CPU计算率中等,1核心允许8个线程并发完成扩容,理想情况下也算是比较合理的值。一段代码的IO操作越多,1核心对应的线程就要相应设置多点,CPU计算越多,1核心对应的线程就要相应设置少一些
- 表明:默认的容量是16,也就是默认构造的实例,第一次扩容实际上是单线程执行的,看上去是可以多线程并发(方法允许多个线程进入),但是实际上其余的线程都会被一些if判断拦截掉,不会真正去执行扩容
MOVED/TREEBIN/RESERVED/HASH_BITS
- MOVED:ForwardingNode的hash值,ForwardingNode是一种临时节点,在扩进行中才会出现,并且它不存储实际的数据。如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode。读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容
- TREEBIN:TreeBin的hash值,TreeBin是ConcurrentHashMap中用于代理操作TreeNode的特殊节点,持有存储实际数据的红黑树的根节点。因为红黑树进行写入操作,整个树的结构可能会有很大的变化,这个对读线程有很大的影响,所以TreeBin还要维护一个简单读写锁,这是相对HashMap,这个类新引入这种特殊节点的重要原因
- RESERVED:ReservationNode的hash值,ReservationNode是一个保留节点,就是个占位符,不会保存实际的数据,正常情况是不会出现的,在jdk1.8新的函数式有关的两个方法computeIfAbsent和compute中才会出现
- HASH_BITS:用于和负数hash值进行
&
运算,将符号位置为0,将其转化为正数(绝对值不相等),Hashtable中定位hash桶也有使用这种方式来进行负数转正数
NCPU
- CPU的核心数,用于在扩容时计算一个线程一次要干多少活
serialPersistentFields
- 在序列化时使用,这是为了兼容以前的版本
变量
Node<K,V>[] table
- Node数组,用volatile修饰,通过Unsafe方法读写
Node<K,V>[] nextTable
- 扩容后的新的table数组,只有在扩容时才有用
nextTable != null
,说明扩容方法还没有真正退出,一般可以认为是此时还有线程正在进行扩容,极端情况需要考虑此时扩容操作只差最后给几个变量赋值(包括nextTable = null
)的这个大的步骤,这个大步骤执行时,通过sizeCtl经过一些计算得出来的扩容线程的数量是0
long baseCount
- 计数器基本值,主要在没有碰到多线程竞争时使用,需要通过CAS进行更新
int sizeCtl
- 非常重要的一个属性,源码中的英文翻译,直译过来是下面的四行文字的意思
sizeCtl = -1
,表示有线程正在进行真正的初始化操作sizeCtl = -(1 + nThreads)
,表示有nThreads个线程正在进行扩容操作sizeCtl > 0
,表示接下来的真正的初始化操作中使用的容量,或者初始化/扩容完成后的thresholdsizeCtl = 0
,默认值,此时在真正的初始化操作中使用默认容量
- 但是,通过我对源码的理解,这段注释实际上是有问题的,有问题的是第二句,
sizeCtl = -(1 + nThreads)
这个,网上好多都是用第二句的直接翻译去解释代码,这样理解是错误的。默认构造的16个大小的ConcurrentHashMap,只有一个线程执行扩容时,sizeCtl = -2145714174
,但是照这段英文注释的意思,sizeCtl的值应该是-(1 + 1) = -2
,sizeCtl在小于0时的确有记录有多少个线程正在执行扩容任务的功能,但是不是这段英文注释说的那样直接用-(1 + nThreads)
,实际中使用了一种生成戳,根据生成戳算出一个基数,不同轮次的扩容操作的生成戳都是唯一的,来保证多次扩容之间不会交叉重叠,当有n个线程正在执行扩容时,sizeCtl在值变为(基数 + n),1.8.0_111的源码的383-384行写了个说明:A generation stamp in field sizeCtl ensures that resizings do not overlap.
int transferIndex
- 下一个transfer任务的起始下标index加上1之后的值,transfer时下标index从
length - 1
开始往0走。transfer时方向是倒过来的,迭代时是下标从小往大,二者方向相反,尽量减少扩容时transefer和迭代两者同时处理一个hash桶的情况,顺序相反时,二者相遇过后,迭代没处理的都是已经transfer的hash桶,transfer没处理的,都是已经迭代的hash桶,冲突会变少。 - 下标在[nextIndex - 实际的stride (下界要 >= 0), nextIndex - 1]内的hash桶,就是每个transfer的任务区间,每次接受一个transfer任务,都要CAS执行
transferIndex = transferIndex - 实际的stride
,保证一个transfer任务不会被几个线程同时获取(相当于任务队列的size减1),当没有线程正在执行transfer任务时,一定有transferIndex <= 0,这是判断是否需要帮助扩容的重要条件(相当于任务队列为空)
int cellsBusy
- CAS自旋锁标志位,用于初始化,或者counterCells扩容时
CounterCell[] counterCells
- 用于高并发的计数单元,如果初始化了这些计数单元,那么跟table数组一样,长度必须是2^n的形式
KeySetView<K,V> keySet
/ValuesView<K,V> values
/EntrySetView<K,V> entrySet
- 视图变量
方法
静态方法
int spread(int h)
- 将高16位与低16位异或,并将符号位置为0
- 这样做是因为在计算node的index时,是用2的幂作为掩码,所以只用低位进行计算,存在大量的碰撞,比如一些Float的值,所以将高位的影响扩散到低位,可以减少这种碰撞。同时,因为table的容量限制,hash中的高位在计算index时很难被用到
- 处于对速度、效能和bit位扩散的质量的考虑,并且使用红黑树处理大量的碰撞,所以只是简单的将高位和低位进行异或就够了
int tableSizeFor(int c)
- 返回大于输入参数且最近的2的整数次幂的数
先分析有关n位操作部分 假设n的二进制为01xxx...xxx。接着 对n右移1位:001xx...xxx,再位或:011xx...xxx 对n右移2为:00011...xxx,再位或:01111...xxx 此时前面已经有四个1了,再右移4位且位或可得8个1 同理,有8个1,右移8位肯定会让后八位也为1 综上可得,该算法让最高位的1后面的位全变为1 最后再让结果n+1,即得到了2的整数次幂的值了 现在回来看看第一条语句:int n = cap - 1 让cap-1再赋值给n的目的是令找到的目标值大于或等于原值。例如二进制1000,十进制数值为8 如果不对它减1而直接操作,将得到答案10000,即16。显然不是结果。减1后二进制为111,再进行操作则会得到原来的数值1000,即8
table元素访问方法
<K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
- volatile语义获取table元素i
<K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)
- CAS将位置i的元素c替换为v
<K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)
- volatile语义将位置i的元素设置为v
- 用Unsafe类实现这三个方法的原因是,Java的数组在元素层面上的设计缺失,无法表达元素是final和volatile的语义,所以使用getObjectVolatile补充volatile的语义,使用@Stable补充final的语义。数组元素本身和没有volatile修饰的字段一样,无法保证线程之间的可见性,只有触发happens-before关系的操作,才能保证线程之间的可见性。比如使用
table[0] = new Object()
直接赋值,这个赋值不会触发任何happens-before关系的操作,相当于对一个无volatile变量进行赋值一样
构造函数
ConcurrentHashMap()
ConcurrentHashMap(int initialCapacity)
ConcurrentHashMap(Map<? extends K, ? extends V> m)
ConcurrentHashMap(int initialCapacity, float loadFactor)
ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
,initialCapacity * loadFactor = resizeThreshold
,所以这里的initialCapacity / loadFactor
表示乘以loadFactory后得到的resizeThreshold就是initialCapacity,即size达到initialCapacity后就会进行扩容,因此如果放到map中的元素数量刚好是initialCapacity个,那就避免了扩容操作,而+1.0
相当于是使用Math.ceil
将浮点数向上取整,不过如果initialCapacity/loadFactor是正数,就会多出一个元素,再用tableSizeFor调整size,就得到了最合适的capacity。比如initalCapacity=16,loadFactor=0.75,则size=22
get
源码
public V get(Object key)
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 重hash
int h = spread(key.hashCode());
// 先看bin是否有结点
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null)
// bin的头结点就是要找的结点
if ((eh = e.hash) == h)
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
// hash值小于0,都是特殊节点,调用find方法查询
// 包括已经迁移的结点、树节点、临时节点
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 链表查找
while ((e = e.next) != null)
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
return null;
put
源码
final V putVal(K key, V value, boolean onlyIfAbsent)
if (key == null || value == null) throw new NullPointerException();
// 重hash
int hash = spread(key.hashCode());
// 统计链表长度
int binCount = 0;
// 自旋
for (Node<K,V>[] tab = table;;)
Node<K,V> f; int n, i, fh;
// table为空则初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果bin没有结点,则直接CAS放入,失败则自旋重试
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
// 如果是fwd结点,表示正在迁移,则当前线程参与迁移table
// 迁移结束后,自旋重试
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 说明是链表或者红黑树
else
V oldVal = null;
// 对bin加锁
synchronized (f)
// 检查是否仍然是之前的头结点
if (tabAt(tab, i) == f)
// 链表
if (fh >= 0)
binCount = 1;
for (Node<K,V> e = f;; ++binCount)
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek))))
oldVal = e.val;
// onlyIfAbsent表示只有key不存在时才赋值,否则仍然为旧值
if (!onlyIfAbsent)
e.val = value;
break;
Node<K,V> pred = e;
// 链表遍历完仍然没有key,则放入新结点到链表尾
if ((e = e.next) == null)
pred.next = new Node<K,V>(hash, key,
value, null);
break;
// 红黑树
else if (f instanceof TreeBin)
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null)
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
// 判断链表是否需要转为红黑树
if (binCount != 0)
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
// 如果之前已经存在该key,则直接返回
if (oldVal != null)
return oldVal;
break;
// 如果key之前不存在,则是新增结点,需要自增count
addCount(1L, binCount);
return null;
initTable()
private final Node<K,V>[] initTable()
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0)
// sizeCtl小于0表示已经有线程在初始化table了
// 直接将cpu时间让出去,等他们初始化完,再自旋检查下
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 将sizeCtl设置为-1,表示当前线程将要初始化table
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1))
try
// 再次检查table是否仍然为空
if ((tab = table) == null || tab.length == 0)
// 如果sizeCtl>0,表示创建Map时指定了初始化容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 将sizeCtl置为当前容量的0.75,表示触发扩容的阈值
sc = n - (n >>> 2);
finally
sizeCtl = sc;
break;
return tab;
treeifyBin(Node<K,V>[],int)
private final void treeifyBin(Node<K,V>[] tab, int index)
Node<K,V> b; int n, sc;
if (tab != null)
// 如果table容量小于能支持红黑树的阈值,则只扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
// 判断索引位置的bin是链表结点
else if ((b = tabAt(tab, index)) != null && b.hash >= 0)
// 对bin加锁
synchronized (b)
if (tabAt(tab, index) == b)
// 将所有结点转为双向链表
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next)
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
// 创建TreeBin即红黑树,并放置在索引处
setTabAt(tab, index, new TreeBin<K,V>(hd));
tryPresize()
private final void tryPresize(int size)
// 给tableSizeFor传1.5倍size,可以理解为将0.75size是触发扩容的阈值
// 而1.5size是下次触发扩容的阈值,所以tableSizeFor会返回该阈值对应的容量
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// 自旋
while ((sc = sizeCtl) >= 0)
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0)
// sc>0表示创建Map时指定了capacity,这里用sc和c中最大值
n = (sc > c) ? sc : c;
// sizeCtl置为-1表示正在初始化
if (U.compareAndSwapInt(this, SIZECTL, sc, -1))
try
if (table == tab)
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
finally
sizeCtl = sc;
// 在table已经初始化的情况下,sc表示下次扩容的阈值
// 这里c<sc,表示容量已经够了,不需要扩容,可能是其他线程已经扩容了
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// 校验是否还是之前的table,是否有其他线程已经初始化了
else if (tab == table)
int rs = resizeStamp(n);
// sc<0,表示正在初始化或者正在扩容
if (sc < 0)
Node<K,VJUCConcurrentHashMap源码分析