并发容器和框架之ConcurrentHashMap
Posted MindMrWang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发容器和框架之ConcurrentHashMap相关的知识,希望对你有一定的参考价值。
了解HashMap的人都知道HashMap是线程不安全的(多线程下的put方法达到一定大小,引发rehash,导致闭链,最终占满CPU),同时线程安全的HashTable效率又令人望而却步(每个方法都进行同步,效率低下),所以在这种情境下为并发而生的ConcurrentHashMap就应运而生!
接下来我们按照以下顺序揭开ConcurrentHashMap的面纱:
- JDK1.6,1.7的ConcurrentHashMap
- JDK1.8的ConcurrentHashMap
1.1 ConcurrentHashMap的大体结构:
如果你去百度一下ConcurrentHashMap你到处可以发现“分段锁”,“提高并发度”等字眼对ConcurrentHashMap的介绍,当前我们使用的ConcurrentHashMap(我现在使用的时jdk8,当然也不是最新的)已经不再是分段锁的那种设计方式了,但是老版本的设计方式同样很值得我们去学习,下面简单介绍下老版本的ConcurrentHashMap(对于那些关于分段锁写的很好的推荐大家看下,这里我只作一个简单的介绍)。
static class Segment<K,V> extends ReentrantLock
implements Serializable {
.
.
.
}
在JDK1.6/1.7的版本ConcurrentHashMap是使用分段锁的理念,ConcurrentHashMap是一个Segment数组,Segment继承自ReentrantLock,实质上是一个锁,这个数组将数据分块,所以在多线程的情况下只要不涉及同一块Segment内的数据,是不会产生竞态条件的;在每个Segment下面是HashEntry数组+链表的数据结构,这个和HashMap的数据存储结构相同。
1.2 ConcurrentHashMap的初始化
我们之前说了ConcurrentHashMap的结构,并给出了结构图,但是ConcurrentHashMap的结构的初始化时根据什么来确定的?
ConcurrentHashMap初始化方法是通过initialCapacity(初始化容量),loadFactor(加载因子),concurrencyLevel(并发等级),等几个参数来初始化,初始化容量和加载因子我们在HashMap里面都有类似的概念,这个并发等级是ConcurrentHashMap独有的,并发等级决定Segment数组Size的初始大小,concurrencyLevel<=Size = 2^N(N取满足条件的最小值),即Size大小应为2的N次幂,和HashMap是同样的原理,这样的目的是为了进行散列算法(取模运算)时能够等价的位运算,因为位运算效率较其他散列算法效率更佳!
万物存在即有它的道理,CurrentHashMap不能够完全替代HashTable因CurrentHashMap是弱一致性的,而HashTable是强一致性的,什么叫强弱一致性?即当数据结构内的数据改变时,是否能够被其它部分所“察觉”,什么意思:即get方法是弱一致的,put操作将一个元素加入到底层数据结构后,get方法可能在一段时间内无法看到这个元素。
下面这篇文章很详细的解释了HashTable的不可替代性!
https://my.oschina.net/hosee/blog/675423
关于老版本的ConcurrentHashMap就介绍到这里了,我们对新版(jdk1.8)ConcurrentHashMap的主要方法作稍加详细的解释。
老版本的ConcurrentHashMap同样很值得我们学习!
2.1新版的ConcurrentHashMap简介
从jdk1.8开始ConcurrentHashMap就放弃了分段锁的概念了,转而使用Synchronized和CAS来实现并发,为什么不用Lock?可能是因为Doug Lea觉得jdk1.8的Syncchronized已经优化的足够好了吧!并且它的底层数据结构也和同版本的HashMap相同,数组+链表/红黑树,同时为了能够做到并发,它添加了TreeBin,Traverser等辅助类。
2.2字段介绍
在正式理解ConcurrentHashMap的主要方法之前(put,get,rehash...),我们先要知道它的字段和基本数据结构。
/**
* 表的最大容量为2^30 为什么不是32,因为高两位被用来控制目的
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 默认容量,最大不能超过2^30
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* 最大的数组大小,需要和toArray方法和相关方法
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* 表的默认并发级别,“没用”但是为了兼容以前的版本
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 加载因子,当容量达到阀值时(阀值=容量*加载因子),进行扩容。
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* 链表转为红黑树的阀值,默认为8(当然这个值是经过权衡过得,不建议更改)
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 树转链表的的阀值(当节点增加或减少时会有变量记录当前链表或者红黑树的节点个数)
*/
static final int UNTREEIFY_THRESHOLD = 6;
/**
* 链表转为红黑树的最小表容量
*/
static final int MIN_TREEIFY_CAPACITY = 64;
/**
* 每次改变步数的重新开始数
*/
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* resize的标记位
*/
private static int RESIZE_STAMP_BITS = 16;
/**
* 帮助resize的最大线程数
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/**记录size stamp 的位移
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
/**可用CUP数 **/
static final int NCPU = Runtime.getRuntime().availableProcessors();
细心的朋友就发现了上面的字段都是一些常量,接下来我们看看变量字段:
/**
* 这个是容器数组,也就是我们的数组+链表+红黑树中所提到的数组,节点类下面有介绍
*/
transient volatile Node<K,V>[] table;
/**
* 下一个使用的数组,除了扩容,否则非空
*/
private transient volatile Node<K,V>[] nextTable;
/**
* 计数器,也可以当数组初始化时回滚,通过CAS更新
*/
private transient volatile long baseCount;
/**
* 这个是比较重要的一个字段了,用来数组初始化和扩容控制,
* -1:正在初始化
* -n:表示正在由n-1个线程进行扩容操作
* +n和0:表示还没有被初始化,这个值表示下一次进行扩容的大小
*/
private transient volatile int sizeCtl;
/**
*当扩容时,下个用来拆分的数组索引
* The next table index (plus one) to split while resizing.
*/
private transient volatile int transferIndex;
/**
*当进行扩容或创建计数单元时容器锁使用
*/
private transient volatile int cellsBusy;
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;
// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;
2.3主要的类:
2.3.1 Node节点类(作为ConcurrentHashMap的存储最小单元):
static class Node<K,V> implements Map.Entry<K,V> {
//这里的Hash值和key都为final类型,不可变
final int hash;
final K key;
//value和下一个节点的引用都为volatile,保持可见性
volatile V val;
volatile Node<K,V> next;
//简单的构造
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
//两个get方法,分别返回key和value
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
//调用setValue方法会抛出错误,表示这个value不可改变
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
//重写equals方法
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
以上的Node节点是一个单向的线性链表,只能查询不能更新。
2.3.2 TreeNode:
当链表的数量超过8的时候链表就会转换成红黑树了,但不是直接转换成红黑树,而是先将Node对象转换陈TreeNode节点对象,通过TreeBin转换成红黑树,所以TreeNode只是节点类,并不复杂,也只提供了find方法,同时也通过TreeBin进行包装来提供并发操作,也就是直接使用的是TreeBin而不是TreeNode。
下面的TreeNode是一个红黑树节点类,不了解红黑树的并不影响本章学习,可以将这个红黑树当做二叉查找树来看就ok。
/**
* Nodes for use in TreeBins
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
/*
*像HashMap中(1.8)TreeNode是继承自LinkedHashMap.Entry<K,V>类,
*而通过上面代码我们可以看到这个TreeNode是继承自Node的,这是因为为了
*方便TreeBin的访问。
*/
2.3.3TreeBin
TreeBin我们上面也提及到了,是用来将TreeNode包装成红黑树的,同时也提供了并发操作,所以我们要知道一个的是在ConcurrentHashMap Node数组中存放的是TreeBin(或者Node)节点而不是TreeNode,我们通过注释也了解到了这个类还提供了读写锁(They also maintain a parasitic read-write lock forcing writers (who hold bin lock) to wait for readers (who do not) to complete before tree restructuring operations),博主水平有限对红黑树的理解和认识止于皮毛,虽然一直想写一篇红黑树的博文,但是每每拜读大神的文章就打消了念头~
给出一篇写的比较好的红黑树解析博文:
https://blog.csdn.net/eson_15/article/details/51144079
/*这个类颇为庞大,足有五百多行(包含注释)
*我们需要了解的是这个类可以初始化红黑树,并提供并发操作即可
*/
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// 锁的状态值
static final int WRITER = 1;
static final int WAITER = 2;
static final int READER = 4;
.
.
.
}
2.4 核心方法:
对于并发容器类,我们最关注的的就是put方法和get方法了,这个两个方法是所有容器类的核心。
2.4.1 put方法:
public V put(K key, V value) {
return putVal(key, value, false);
}
这个就是put方法,它直接调用putVal方法,所以我们直接向下看putVal方法:
final V putVal(K key, V value, boolean onlyIfAbsent) {
//判断放入的key和value的合法性,如果key为null或value为null则抛出异常
if (key == null || value == null) throw new NullPointerException();
//获得key的hash值
int hash = spread(key.hashCode());
int binCount = 0;
//table为ConcurrentHashMap的属性,为节点数组,
//也就是数组+链表+红黑树的数组
//获取table的引用
for (Node<K,V>[] tab = table;;) {
//定义一系列局部变量,用到时解释
Node<K,V> f; int n, i, fh;
//当tab(table)为空或者tab的长度为空则调用initTable初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//(n - 1) & hash等价于hash%n(所以必须要求n为2的N次幂)
//所以这里的i为插入的key的hash值经过散列后在数组中的位置
//f指向链表的首节点或者红黑树的根节点
//tabAt方法里面使用到了Unsafe的getObjectVolatile
//来保证获取到的是最新值
//当i位置的数组为空(未发生碰撞),
//直接通过CAS的方式将需要插入的key,value构造成为节点插入到数组中
//如果插入成功,break,跳出死循环
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//如果首节点的hash值为MOVED,MOVED为-1,表示当前正在扩容
//如果正在扩容则帮助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//最后一种情况如果f不为空,即发生了碰撞
//这里也是需要使用synchronized关键字进行同步了
//将新的需要插入的值包装成节点放在
else {
V oldVal = null;
synchronized (f) {
//继续用Unsafe来保证最新值
if (tabAt(tab, i) == f) {
//如果节点是链表结构(节点hash值大于等于零表示链表结构)
if (fh >= 0) {
binCount = 1;
//下面就是以死循环的方式进行节点插入了
for (Node<K,V> e = f;; ++binCount) {
K ek;
//当插入的key和首节点相同那么覆盖
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
//下面将插入的值包装成节点放到头结点的下一个节点
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果当前结构是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//来检查是否需要将链表转换成红黑树
//TREEIFY_THRESHOLD为8
//treeifyBin方法就是将链表转换成红黑树的方法
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//addCount用来统计容器Size,检查是否需要扩容
addCount(1L, binCount);
return null;
}
上面是put方法的大概,注释已经很明确,就不多作解释,其中还涉及到了一些其他的方法,我们来进一步的看看其实现:
2.4.2 spread方法:
刚一开始看见 int hash = spread(key.hashCode());这行代码的时候还在想这个spread方法是什么鬼,为什么不直接int hash = key.Code(); 后来点进去才知道这个是进行两次哈希,减少碰撞的概率。
static final int spread(int h) {
//HASH_BITS为int值的最大值:0x7fffffff表示32位首位为0的最大long值
return (h ^ (h >>> 16)) & HASH_BITS;
}
2.4.3 initTable方法:
它同时只允许一个线程进行初始化操作
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//当table为空且长度为0
while ((tab = table) == null || tab.length == 0) {
//sizeCtl<0表示正在扩容或者初始化,线程挂起
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//用CAS保证当前的sc和之前所读到的sizeCtl一致否则置为-1
//表示当前的状态有线程进行扩容或者初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//再次判断
if ((tab = table) == null || tab.length == 0) {
//如果大于零表示知道初始化(扩容)容量大小
//否则设置为默认容量(16)
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//初始化数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//这里对sc动手了,继续看finally
sc = n - (n >>> 2);
}
} finally {
//通过 sc = n - (n >>> 2); 可以看出:
//sizeCtl起到了阈值的作用
sizeCtl = sc;
}
break;
}
}
return tab;
}
2.4.4 tabAt / casTabAt / setTabAt:
这三个方法中前两个方法在putVal方法里面也出现了,在ConcurrentHashMap中这三个方法写在一起,想必有什么共通之处(偷偷告诉你是CAS操作)。
2.4.4.1 tabAt:
在putVal方法里面也解释道了,tabAt是用来保证最新值,也就是用CAS来确定数组的i位置是否还等于f,我们知道f为链表的头结点,也就是数组i位置直接指向的节点。
其实也不能说这三个方法都是严格意义上的CAS方法,因为CAS是指CompareAndSwap,只有第二个方法是CAS方法,但是他们都是严格意义上的原子方法,用来保证得到的不是脏数据。
关于CAS的介绍我转载了一篇不错的文章值得看一下:
https://blog.csdn.net/qq_39266910/article/details/79910192
//原子方法获取值
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//CAS更新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
在ConcurrentHashMap中随处可以看见大写“U”,大量的使用Unsafe 下的原子方法,利用这些原子方法可以实现无锁化的更新值(例如更新SIZECTL ),从而提高程序在多线程下运行的效率,这个算法的思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁,设计思想是一致的。
关于这CAS的原理和具体是怎么实现的,可以参见上面给的链接,里面经过整理写的算式非常详细了。
2.4.5 transfer方法:
transfer方法是扩容方法,HashMap我们都知道它有一个阈值,ConcurrentHashMap也有阈值,当他的当前容量>=阈值*预设容量
时候,就会触发一次扩容。
我们先介绍下扩容的步骤:
1)新建一个是原来数组两倍的数组
2)遍历需要扩容的数组,如果数组i的位置为空或者已经遍历过,那么将对应节点的值set为ForwardingNode(表示为空),这个是多线程扩容的核心所在,当其他线程读取到这个这个节点的值为ForwardingNode时就知道这个节点是null或者已经处理过的。
3)ForwardingNode是一个继承Node的节点类,里面包含nextTable的引用,这个节点里面的key和value都为空(不需要这个),hash 为MOVED(我们之前见到过,当为-1的时候表示正有节点在进行扩容), 所以当处理过的节点放置一个ForwardingNode节点,当其他线程处理到这个数组位置的时候就可以直接跳过这个节点,来到他里面的下一个节点。
4)通过i--(hash值)来遍历数组中的每个位置,将遍历过的值正序放在我们新创建的nextTable中的i中和反序放在i+n中,通过头结点的哈希值来判断这个数组位置是链表还是红黑树。
5)赋值完成后将table=nextTab,nextTable=null,扩容完成,并释放nextTable。
//参数tab为需要扩容的数组,nextTab为新建扩容后的数组
//字段里面有一个叫nextTable的,和这个意义相同,下面会看到
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//定义当前数组的长度,和stride:
//单个线程允许处理的最少table节点首节点个数
int n = tab.length, stride;
//初始化stride,stride由原数组长度和可用CPU决定
//MIN_TRANSFER_STRIDE=16为最小扩容step(步幅)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//如果nextTab为空,进行初始化
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//新建数组,这个数组左移1,即新建的数组为原来的两倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
//赋值
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//将nextTab赋值给全局字段nextTable
nextTable = nextTab;
//扩容索引为当前数组的长度
transferIndex = n;
}
int nextn = nextTab.length;
//ForwardingNode一个用于连接两个table的节点类
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;//如果等于true 说明这个节点已经处理过
boolean finishing = false; // to ensure sweep before committing nextTab
//一个死循环,我们来看看具体干什么
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//advance原来为true,如果advance不改变,那么这个
//while又是一个死循环
while (advance) {
int nextIndex, nextBound;
//第一次运行肯定是if(0)的
if (--i >= bound || finishing)
advance = false;
//nextIndex = transferIndex为需扩容数组长度
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
//如果需扩容数组<=0,那么i=-1,
//advance=false,表示退出while循环
advance = false;
}
else if (U.compareAndSwapInt
//TRANSFERINDEX为静态块下面定义的
//ConcurrentHashMap的字段
//通过反射获取transferIndex字段
//transferIndex和nextIndex比较
//如果相同,返回true,并将
//transferIndex赋值为nextBound
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//如果所有的节点都已经完成复制工作
//就把nextTable赋值给table 清空临时对象nextTable
if (finishing) {
nextTable = null;
table = nextTab;
//扩容阈值设置为原来容量的1.5倍
//依然相当于现在容量的0.75
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,
//说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果遍历到的节点为空 则放入ForwardingNode指针
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍历到ForwardingNode节点,说明这个点已经被处理过
//直接跳过
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//fh>=0表示是Node节点,不是红黑树
//put方法里面有看到过
if (fh >= 0) {
//以下的部分在完成的工作是构造两个链表,
//一个是原链表,另一个是原链表的反序排列
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//如果是红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果扩容后不再需要红黑树结构
//就untreefy为链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点
//表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true
//返回到上面的while循环中就可以执行i--操作
advance = true;
}
}
}
}
}
}
2.4.6 treeifyBin方法:
这个方法在讲put方法的时候解释过了,这个方法是用来将链表转换成红黑树的,但是不是只要满足链表大于等于8就能转换的,还要进行一次容量的判断,也就是说当前的容量太小的话只需要扩容就OK了,而不需要转换成红黑树。同时我们也知道转换成红黑树也不是直接将节点包装成红黑树,而是将节点转换成TreeNode 然后由TreeBin来将节点构成红黑树,也就是table数组里放的是TreeBin对象,而不是TreeNode。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
//这里加了控制
//如果table.length<64 就扩大一倍,即不转换成红黑树
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
2.4.7get方法:
get方法比较简单,给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果刚好头结点(根节点)的Hash值就是和key的Hash值
if ((eh = e.hash) == h) {
//如果搜索到的节点key与传入的key相同且不为null,
//直接返回这个节点
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//eh小于零表示是树结构
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//为链表,遍历链表,返回相应的值
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
2.5其他方法
除了上面说的一些核心方法,ConcurrentHashMap还有其他的一些重要方法:
1)addCount方法
2)remove方法
3)Size方法
1)这两个方法都是用来统计ConcurrentHashMap的容量是多少的,ConcurrentHashMap不像GC那样,将所有的线程都stop,让我们去计算这个数量,在我们统计的时候它可能还在插入节点或者删除节点。
关于baseCount(容量)的统计ConcurrentHashMap是交给addCount来完成的,一方面来更新baseCount一方面来判断baseCount值是否达到阈值。
2)remove方法和put方法类似,先通过key定位到数组i位置,如果定位结果table为空或者长度为0,返回nul,如果f.hash(f为头结点或根节点)为-1(MOVED),表示这是一个ForwardNode节点,表示正在扩容,帮助扩容,负否则加锁寻找到key的哈希值对应的数组位置,并遍历这个数组位置的链表或树,并删除和key相同的“key”的节点,最后用addCount统计baseCount。
3)size 方法的作用是为我们返回哈希表中实际存在的键值对的总数。
我看到这个方法的时候想法是多此一举,ConcurrentHashMap 中的 baseCount 不就是是记录的所有键值对的总数吗?直接返回它不就行了?是因为我们的 addCount 方法用于 CAS 更新 baseCount,但很有可能在高并发的情况下更新失败,那么这些节点虽然已经被添加到表中,但是数量却没有增加,但是,addCount 方法在更新 baseCount 失败的时候会调用 fullAddCount 将这些失败的结点包装成一个 CounterCell 对象,保存在 CounterCell 数组中,所以整张表实际的 size 其实是 baseCount 加上 CounterCell 数组中元素的个数。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
总结:ConcurrentHashMap的设计有很多非常亮眼的地方,这些设计理念很值得我们学习,和HashMap相比扩容并发设计一反原来的阻塞算法,它欢迎其他线程加入来帮助它来进行扩容操作,并发操作上面也采用了很多CAS方法来避免过多的加锁。
当然它还有我说不完的一些优点,由您来补充吧,到这里博主已经不想写了(看了好长时间了),即使知道这个ConcurrentHashMap博大精深,博主深知自己的认识还是浅薄,以后还会拜读源码,再补充!
2018 4.12 20:24
以上是关于并发容器和框架之ConcurrentHashMap的主要内容,如果未能解决你的问题,请参考以下文章
并发容器线程安全应对之道-ConcurrentHashMap