1.8ConcurrentHashMap源码分析
Posted xingfeng_coder
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1.8ConcurrentHashMap源码分析相关的知识,希望对你有一定的参考价值。
ConcurrentHashMap是一个线程安全的HashMap,
ConcurrentHashMap和HashMap的底层数据结构相同,都是数组+链表+红黑树;但是hash方法、键、值不允许为null和Hashtable一样,并且都是线程安全的,只不过实现同步的细节上有所差别。尽管支持高并发的读写,但是ConcurrentHashMap在读操作时不会加锁。
关于HashMap和Hashtable可以参考下面三篇文章:
关于ConcurrentHashMap的变化,1.7和1.8有所变化,1.7可以参考:
源码分析
构造方法
重要字段
ConcurrentHashMap的底层数据结构是数组+链表+红黑树,所以有一个节点的数组table字段,并且和HashMap一样,这个数组的长度只能是2的指数倍,即16、32…
ConcurrentHashMap中的一些默认参数和HashMap中相同,比如默认容量为16,加载因子为0.75,将链表转为红黑树的阈值为8,将红黑树转换为链表的阈值为6等等。
除了和HashMap中相同的参数外,ConcurrentHashMap中还有一系列volatile的变量,如下:
/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/
transient volatile Node<K,V>[] table;
/**
* The next table to use; non-null only while resizing.
*/
private transient volatile Node<K,V>[] nextTable;
/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
private transient volatile long baseCount;
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
hash表初始化或扩容时的一个控制位标识量。
负数代表正在进行初始化或扩容操作
-1代表正在初始化
-N 表示有N-1个线程正在进行扩容操作
正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
/**
* The next table index (plus one) to split while resizing.
*/
private transient volatile int transferIndex;
/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;
上面的几个变量中,sizeCtl为下一次resize到达的阈值,counterCells是用于记录table数组中每个桶中元素的个数。
构造方法
ConcurrentHashMap和HashMap一样,对底层数组采用懒加载的方式,只有在第一次插入元素的时候才会创建数组。构造方法如下:
public ConcurrentHashMap()
public ConcurrentHashMap(int initialCapacity)
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
public ConcurrentHashMap(Map<? extends K, ? extends V> m)
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
public ConcurrentHashMap(int initialCapacity, float loadFactor)
this(initialCapacity, loadFactor, 1);
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel)
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
从构造方法可以看到,如果传入了容量的参数,那么会调用tableSizeFor将参数转换为2的指数倍数,然后赋值给sizeCtl。在table为null时,sizeCtl代表创建数组的长度,当table不为null且为正值时,表示下一次resize的阈值。
主要操作
put(K k,V v)方法
Hashtable中的put方法使用了synchronized关键字,所以是对Hashtable整个对象加锁的,而ConcurrentHashMap的put方法是对单个桶加锁的,代码如下:
public V put(K key, V value)
return putVal(key, value, false);
final V putVal(K key, V value, boolean onlyIfAbsent)
//键、值均不允许为null
if (key == null || value == null) throw new NullPointerException();
//计算hash值
int hash = spread(key.hashCode());
//记录单个桶中元素的个数
int binCount = 0;
for (Node<K,V>[] tab = table;;)
Node<K,V> f; int n, i, fh;
//如果表为空,创建表
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//表不为空,计算得到索引出的头结点
//如果头节点为空
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null)
//成功该节点作为表中该桶处的头结点,则跳出,否则计算。失败的原因是有并发
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
//哈希表正在扩容,帮助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else
V oldVal = null;
//对桶的头节点加锁
synchronized (f)
//如果table表中该桶中头节点仍然是f
if (tabAt(tab, i) == f)
//如果头节点的hash值正常
if (fh >= 0)
binCount = 1;
//从头开始遍历
for (Node<K,V> e = f;; ++binCount)
K ek;
//如果找到了匹配的值,那么更新值,跳出循环
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek))))
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
//不匹配,判断是否到了最后一个节点
Node<K,V> pred = e;
//到了最后一个节点,那么插入一个节点,跳出循环
if ((e = e.next) == null)
pred.next = new Node<K,V>(hash, key,
value, null);
break;
//如果头几点是TreeBin,即该桶处于红黑树结构
else if (f instanceof TreeBin)
Node<K,V> p;
binCount = 2;
//使用红黑树的插入节点方式
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null)
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
//此处无锁,在之前加锁的部分已经成功插入了节点,不管是链表结构还是红黑树结构
//如果bindCount不为0,只要tabAt(tab,i)==f为真,那么binCount就不会等于0
if (binCount != 0)
//如果binCount超出了8,那么将该桶处的链表转为红黑树结构
//在红黑树结构中,binCount=2,不会超过8;在链表结构中,会遍历链表会记录个数,如果插入在最后节点并且超过了8,那么将会将链表转为红黑树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
//整个size增加1
addCount(1L, binCount);
return null;
上面的代码中已经将流程注释清清楚了,下面将分别介绍其中调用的几个方法。
首先是哈希表为空时,调用的initTable创建哈希表的方法,其实现如下:
private final Node<K,V>[] initTable()
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0)
//如果小于0,让出线程
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//如果CAS将sizeCtl变为-1成功
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1))
try
//如果table为null
if ((tab = table) == null || tab.length == 0)
//获取创建容量,
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//创建表
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
//更改引用指向
table = tab = nt;
//相当于乘以0.75 n-n/4=0.75*n
sc = n - (n >>> 2);
finally
//将sc赋值给sizeCtl
sizeCtl = sc;
break;
return tab;
可以看到initTable()方法中没有加入synchronized关键字,那么如果有两个线程A和B同时在哈希表为空时,都执行put方法,都在table为null的时候进入了initTable方法,该段代码是如何保证只有一个线程执行了初始化的呢?
首先,构造方法中对sizeCtl变量赋值了,为0或者一个整数(无参构造方法中sizeCtl为0)。比如说线程A和B同时进入了while循环中,因为sizeCtl>=0,所以都经过了第一个if语句块,进入到第二个if语句块时,由于只有一个线程可以CAS将sizeCtl变为-1,这里假设线程A成功做到了,那么线程B再执行这句话时就失败了,就会继续循环,又会执行第一个if语句块,这时由于sc=sizeCtl=-1<0,所以让出了线程,如果之后线程B再次执行但是由于线程A还未将table创建好的话,依然会进入循环,会在第一个if语句块处让出执行。下面再来分析成功将sizeCtl变为-1的线程A,进入了if语句块后,就是根据sc是否大于0得到数组容量,如果使用无参构造方法,因为sc=0,所以使用默认容量16,;如果构造方法中传入了容量,那么将使用sc,之后就是创建数组,更改引用,最后将sc变成一个负值后赋给sizeCtl。这样线程A就返回了tab变量。而线程B如果再次执行时,由于while循环中tab不为null了,所以也跳出了循环,返回了tab。
从上面的分析可以看到,initTable也是一个线程安全的方法,确保了底层数组初始的唯一性。
当哈希表创建后,将根据hash值计算得到索引,然后调用tabAt得到头节点,tabAt方法如下:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
由于使用Unsafe类,所以该方法是一个原子性的,如果头节点为null,那么调用casTabAt方法添加头节点,其实现如下:
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v)
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
该方法使用CAS更改头节点,如果失败了,那么说明有并发。putVal()方法中如果添加头节点成功了,那么直接跳出了循环,执行addCount()方法,如果失败了,会进入下一次循环。下面分析一下什么情况下会出现并发失败。
假设两个线程A和B均执行到了tabAt()这个方法,并且由于该桶中无节点,所以,线程A和线程B均执行casTabAt()方法,这时如果线程A比较幸运地执行成功了,那么就跳出循环了;而线程B由于执行失败了,就需要继续进入下一次循环,当进入下一次循环时,得到的头节点就不会再为null了,就不会进入到这个if语句块了。
接下来线程B在下一个循环中时,如果此时没有其他线程操作哈希表的话,那么将进入到最后一个else语句块,暂时先不考虑头节点的hash值为MOVED的情况,后面会讨论。现在分析一下最后一个else语句块。
由于前面已经得到了头节点f,然后使用synchronized关键字对头节点加锁,由于put、remove方法中都需要得到头节点后做遍历,所以该操作就是对哈希表的这一个桶加锁了,但是其他的桶依然可以被其他线程操作。当获取到f的锁后,首先调用tabAt方法判定此时的头节点是否没有变化,如果没有变化则继续操作,否则就释放锁了。为什么需要这么做呢?
因为线程阻塞在获取头节点的锁时,有可能别的线程在获取到头节点后删除了,那么这时头节点就不存在了,自然需要进入下一个循环重新获取头节点,如果是出现头节点更改的这种情况,那么binCount将会等于0。
如果线程获取到头节点的锁后并且头节点没有更改的话,那么如果头节点的hash值大于等于0,执行链表的插入;如果hash值小于0,并且头节点如果是TreeBin,那么执行红黑树的插入。具体插入过程就不详细讲解了,但是需要注意的是在链表的插入中,binCount代表的是遍历的节点个数,在红黑树的插入中,binCount始终为2。不过这里有一点可能会很奇怪,那就是节点的hash值怎么会为负呢?
ConcurrentHashMap中有一个常量值,表示了头节点的hash值的各种含义,如下:
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
可以看到其中有我们上面碰到的MOVED,RESERVED表示预留的值,而TREEBIN的-2表示该节点是红黑树的根结点。由于putVal在头节点的hash值为MOVED的情况下会调用helpTransfer方法,那么当进入到同步语句块时,hash值小于0就只剩下了-2的情况,即头节点为红黑树的节点。这个在下面将链表转为红黑树的结构中可以说明。
当执行完插入节点或更新节点的操作后,进入判断binCount语句块,上面解释了,在头节点更改的情况下,binCount为0,在链表插入的情况下表示遍历的节点个数,在红黑树的插入情况下始终为2,所以一旦不等于0,即表示执行了插入或更新操作,如果大于8,则需要调用treeifyBin(tab,i)将哈希表中该桶处的结构从链表转换为红黑树,然后如果是更新操作,则没有添加新节点,直接返回值跳出方法;否则表示执行了插入操作,需要调用addCount增加一个节点数。
下面首先看一下treeifyBin()方法,该方法用于将链表转换为红黑树,如下:
private final void treeifyBin(Node<K,V>[] tab, int index)
Node<K,V> b; int n, sc;
if (tab != null)
//如果哈希表的长度小于64,那么执行扩容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
//如果长度足够,并且头节点不为null
else if ((b = tabAt(tab, index)) != null && b.hash >= 0)
//对头节点加锁
synchronized (b)
//如果头节点没有更改
if (tabAt(tab, index) == b)
//记录新的链表的头节点和尾节点
TreeNode<K,V> hd = null, tl = null;
//复制链表
for (Node<K,V> e = b; e != null; e = e.next)
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
//将链表转换为红黑树,并调用setTabAt更改桶处的结构
setTabAt(tab, index, new TreeBin<K,V>(hd));
从上面的代码可以看到,当哈希表的长度没有达到64之前,调用treeifyBin()方法总是会调用tryPresize()方法将容量扩大1倍;如果长度达到了64,那么才会执行转换,转换过程中依然是首先得到头节点,然后对头节点加锁,如果头节点没有更改,那么复制链表,最后调用setTabAt()方法更改节点。其中可以看到几个类,一个是TreeNode,一个是TreeBin,其中TreeNode代表红黑树的节点,而TreeBin用作桶中红黑树的头节点,该节点不保存键和值,但是会有一个指向红黑树根结点的指针。并且内部维持一个读写锁,用于在树的重建操作完成前,强制写线程(拥有锁)等待读写成(没有锁)。
TreeBin类
TreeBin的部分代码如下所示:
static final class TreeBin<K,V> extends Node<K,V>
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
/**
* Creates bin with initial set of nodes headed by b.
*/
TreeBin(TreeNode<K,V> b)
//hash值为-1、key、value以及next均为null
super(TREEBIN, null, null, null);
this.first = b;
//将链表转换为红黑树
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next)
//保存下一个节点
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null)
x.parent = null;
x.red = false;
r = x;
else
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;)
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null)
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
this.root = r;
assert checkInvariants(root);
可以看到上述代码主要包括TreeBin类的字段以及构造方法,TreeBin继承自Node类,所以也是一个节点。构造方法中的参数是一个单链表的头节点,不过其中每个节点都是TreeNode节点。
可以看到TreeBin的构造方法中主要完成两步操作:
- 使hash值为-2,first指向第一个节点,root指向红黑树根结点
- 完成TreeNode单链表到红黑树的转换。
下面再看一下setTabAt()方法,如下:
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
在了解了TreeBin的结构后,再看一下putVal中,当头节点是TreeBin的时候,会调用TreeBin的putVal()方法,该方法如下:
//该方法会搜索红黑树,如果是新插入的值,那么将返回null,否则返回旧节点
final TreeNode<K,V> putTreeVal(int h, K k, V v)
Class<?> kc = null;
boolean searched = false;
for (TreeNode<K,V> p = root;;)
int dir, ph; K pk;
if (p == null)
first = root = new TreeNode<K,V>(h, k, v, null, null);
break;
else if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
if (!searched)
TreeNode<K,V> q, ch;
searched = true;
if (((ch = p.left) != null &&
(q = ch.findTreeNode(h, k, kc)) != null) ||
((ch = p.right) != null &&
(q = ch.findTreeNode(h, k, kc)) != null))
return q;
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null)
TreeNode<K,V> x, f = first;
first = x = new TreeNode<K,V>(h, k, v, f, xp);
if (f != null)
f.prev = x;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
if (!xp.red)
x.red = true;
else
lockRoot();
try
root = balanceInsertion(root, x);
finally
unlockRoot();
break;
assert checkInvariants(root);
return null;
putVal方法中在执行了插入一个元素后,会调用addCount()方法增加一个值。其中第一个参数始终为1,表示增加一个元素,而第二参数是binCount,如果插入的元素是头节点,那么为0;否则大于0。该方法如下:
private final void addCount(long x, int check)
CounterCell[] as; long b, s;
//利用CAS更新baseCount值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x))
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)))
fullAddCount(x, uncontended);
return;
if (check <= 1)
return;
s = sumCount();
//如果check值大于0,则需要检查是否需要进行扩容操作
if (check >= 0)
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY)
int rs = resizeStamp(n);
if (sc < 0)
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//如果已经有其他线程在执行扩容方法
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
//当前线程是唯一的或是第一个发起扩容的线程,此时nextTable为null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
可以看到addCount()方法中没有有关同步的操作,那么addCount()方法是如何实现线程安全的呢,关键在于transfer()方法。
transfer方法
transfer方法用于将哈希表中的元素从一个表中转移到另一个表中,第一个参数为原表,后一个参数为新表。
ConcurrentHashMap中有一个字段叫做nextTable,如下:
/**
* The next table to use; non-null only while resizing.
*/
private transient volatile Node<K,V>[] nextTable;
注释中说明了,只有在扩容的时候才不会为null。
下面是transfer()方法的实现:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//如果新表为null
if (nextTab == null) // initiating
try
@SuppressWarnings("unchecked")
//两倍容量创建新表
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
catch (Throwable ex) // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
//将nextTable指向新表
nextTable = nextTab;
transferIndex = n;
int nextn = nextTab.length;
//新建ForwardingNode节点,指向新表
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;//并发扩容的关键,如果等于true,说明这个节点已经处理过了
boolean finishing = false; // to ensure sweep before committing nextTab
//死循环
for (int i = 0, bound = 0;;)
Node<K,V> f; int fh;
//这个while循环主要控制i-- 通过i--可以依次遍历原hash表中的节点
while (advance)
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0)
i = -1;
advance = false;
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0)))
bound = nextBound;
i = nextIndex - 1;
advance = false;
if (i < 0 || i >= n || i + n >= nextn)
int sc;
//如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTable
if (finishing)
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍
//跳出循环
return;
//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1))
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
//如果遍历到节点为null,则加入ForwardingNode节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍历到ForwardingNode节点,说明该节点已经被处理了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else
//处理头节点
synchronized (f)
//如果头节点没有更改
if (tabAt(tab, i) == f)
Node<K,V> ln, hn;
//链表结构
if (fh >= 0)
int runBit = fh & n;
Node<K,V> lastRun = f;
//将旧表中的链表拆成两部分
for (Node<K,V> p = f.next; p != null; p = p.next)
int b = p.hash & n;
if (b != runBit)
runBit = b;
lastRun = p;
if (runBit == 0)
ln = lastRun;
hn = null;
else
hn = lastRun;
ln = null;
for (Node<K,V> p = f; p != lastRun; p = p.next)
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
//更新新表中的节点
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//将旧表中该桶处设为ForwardNode节点
setTabAt(tab, i, fwd);
//处理成功
advance = true;
//红黑树结构
else if (f instanceof TreeBin)
//将红黑树结构转换为两个TreeNode的单链表
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next)
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0)
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
else
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
//如果低端或高端节点个数小于6,那么将TreeNode单链表转换为Node单链表,否则将TreeNode单链表转换为红黑树结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//更新新表结构
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//将旧表该桶处设为ForwadingNode,表示处理过了
setTabAt(tab, i, fwd);
advance = true;
单线程情况下,该方法遵循以下步骤:
- 如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点;
- 如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上
- 如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上
- 遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容
再来看一下多线程情况下是如何工作的:在代码的69行有一个判断,如果遍历到的节点是forward节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后遍历。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。
示意图如下:
ForwardingNode
下面看一下ForwardingNode类的定义:
static final class ForwardingNode<K,V> extends Node<K,V>
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab)
super(MOVED, null, null, null);
this.nextTable = tab;
可以看到ForwardingNode节点继承自Node,并添加了一个nextTable字段指向新表,并且hash值为MOEVD,即-1。
在putVal方法中,如果待插入的桶中头节点的哈希值已经是MOVED,说明当前正在进行扩容的方法,那么会调用helpTransfer帮助扩容,该方法的实现如下:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f)
Node<K,V>[] nextTab; int sc;
//如果表不为null并且头节点是ForwardingNode并且nextTable不为null,说明仍在扩容中
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null)
int rs = resizeStamp(tab.length);//计算一个操作校验码
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0)
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//帮助转移,nextTab不为null
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nextTab);
break;
//返回新表
return nextTab;
return table;
经过上面的整个分析,可以看到putVal()方法是相当复杂的,虽然putVal()方法中加锁的地方看似很少,但是每一个分支中调用的方法,都实现了各自的加锁机制,而相关的加锁机制都是使用的CAS算法,通过控制变量来控制线程让步或者重新进入循环。
get(K k)方法
get()方法用于根据键得到值,其实现如下:
public V get(Object key)
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算hash值
int h = spread(key.hashCode());
//如果表不为空,并且桶中头节点不为null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null)
//比较头节点,如果hash值相等,那么比较键是否相等,如果相等,则返回
if ((eh = e.hash) == h)
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
//如果hash值小于0,-1或者-2
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//遍历链表
while ((e = e.next) != null)
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
return null;
首先可以看到,get()方法中没有对链表加锁。从get()方法可以看到分为几种情况:
- 如果表为null,返回null
- 如果表不为null,根据hash值得到桶的头节点,如果头节点为null,返回null
- 如果头节点不为null,比较头节点,如果头节点匹配,直接返回
- 如果头节点不匹配,并且hash值<0,那么调用节点的find方法,这时会有两种情况,第一是hash值为-1,表示当前节点为ForwardingNode,第二是-2,表示是一个红黑树的节点
- 如果头节点不匹配,并且hash值大于等于0,那么遍历链表,没有找到返回null,找到了则返回
下面分析一下hash值为负数的情况,那么该节点可能为ForwardingNode或者为TreeBin,如果是红黑树结构,那么会调用TreeBin的find方法,主要是红黑树的查找,这儿就不过多介绍了。主要分析一下节点为ForwardingNode的情况,说明此时有另外的线程在执行扩容。ForwardingNode的find方法如下:
Node<K,V> find(int h, Object k)
// loop to avoid arbitrarily deep recursion on forwarding nodes
//外循环是一个死循环。tab指向nextTable,因为正在扩容,nextTable不为null
outer: for (Node<K,V>[] tab = nextTable;;)
Node<K,V> e; int n;
//几种情况下,返回null
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
//死循环,主要控制链表的遍历
for (;;)
int eh; K ek;
//如果头节点匹配,直接返回
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
//头节点的hash值<0
if (eh < 0)
//如果是ForwardingNode,那么在新表中执行查询,继续外部循环
if (e instanceof ForwardingNode)
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
//调用TreeBin的find方法
else
return e.find(h, k);
//如果链表到尾了,返回null
if ((e = e.next) == null)
return null;
从上面的方法可以看到,该方法主要是在newTable中执行查找,因为此时正在进行扩容,所以nextTable不为null。
size()方法
对于ConcurrentHashMap而言,这个哈希表中干到底装了多少元素是不确定的,因为不可能在调用size()方法的时候像GC的“stop the world”一样让其他线程都停下来让你去统计,因此只能说这个数量是个估计值。对于这个估计值,ConcurrentHashMap也是大费周章才计算出来的。
辅助定义
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
@sun.misc.Contended static final class CounterCell
volatile long value;
CounterCell(long x) value = x;
/******************************************/
/**
* 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新
但它并不用返回当前hashmap的元素个数
*/
private transient volatile long baseCount;
/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
private transient volatile int cellsBusy;
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;
mapingCount方法
mapingCount方法的实现如下所示:
/**
* Returns the number of mappings. This method should be used
* instead of @link #size because a ConcurrentHashMap may
* contain more mappings than can be represented as an int. The
* value returned is an estimate; the actual count may differ if
* there are concurrent insertions or removals.
*
* @return the number of mappings
* @since 1.8
*/
public long mappingCount()
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
final long sumCount()
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null)
for (int i = 0; i < as.length; ++i)
if ((a = as[i]) != null)
sum += a.value;
return sum;
可以看到sumCount方法中基数为baseCount,然后遍历CounterCell数组。Java工程师建议使用mappingCount()方法代替size()方法,size()方法如下:
public int size()
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
可以看到size()和mapingCount()方法类似,都是调用了sumCount()方法。
总结
经过上面put、get以及size方法的分析,可以得出ConcurrentHashMap的同步设计思路:
- 使用CAS加锁机制控制变量
- 读写分离,读的时候不加锁,写的时候加锁
- 分段加锁,对哈希表中每一个桶加锁
- 多线程共同协作完成旧表到新表的迁移
- 在HashMap的基础上添加了TreeBin和ForwardingNode标识两种状态
以上是关于1.8ConcurrentHashMap源码分析的主要内容,如果未能解决你的问题,请参考以下文章