数据结构 - ConcurrentHashMap 一步步深入
Posted yuanjiangnan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据结构 - ConcurrentHashMap 一步步深入相关的知识,希望对你有一定的参考价值。
简介
ConcurrentHashMap是一个经常被使用的数据结构,它在线程安全的基础上提供了更好的写并发能力。ConcurrentHashMap跟Map有很大的不同,内部大量使用volatile和CAS等减少锁竞争,当然代码也比HashMap难理解的多,本章基于JDK1.8对ConcurrentHashMap做基本介绍,后续章节一步步深入。
ConcurrentHashMap 类
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable
继承AbstractMap抽象类,实现ConcurrentMap接口
ConcurrentMap 接口
public interface ConcurrentMap<K, V> extends Map<K, V>
继承Map接口
ConcurrentMap 方法
// 为空返回默认值
@Override
default V getOrDefault(Object key, V defaultValue) {
V v;
return ((v = get(key)) != null) ? v : defaultValue;
}
// 如果存在键不覆盖(put 键存在新值覆盖原值)
V putIfAbsent(K key, V value);
// 删除
boolean remove(Object key, Object value);
// 替换,oldValue一样才替换
boolean replace(K key, V oldValue, V newValue);
// 替换
V replace(K key, V value);
这里忽略使用Function的方法
重要内部类 Node
static class Node<K,V> implements Map.Entry<K,V>
实现Map.Entry
Node 属性?
// 键的hash值
final int hash;
// 键
final K key;
// 值
volatile V val;
// 下一个节点
volatile Node<K,V> next;
Node 构造函数
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
Node 方法
// 获取键
public final K getKey() { return key; }
// 获取值
public final V getValue() { return val; }
// 获取节点hashCode(键值异或)
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
// toString方法
public final String toString(){ return key + "=" + val; }
// 直接对节点设置值抛异常
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
// 节点equals
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
// 必须是Map.Entry实例,key不能为空值不能为空,键值一样
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
// 搜索键
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
// 当前节点与h一样,key必须一样
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
// 节点下移,继续搜索
} while ((e = e.next) != null);
}
return null;
}
重要内部类 TreeNode
static final class TreeNode<K,V> extends Node<K,V>
TreeNode 继承Node
TreeNode 属性
// 父级节点
TreeNode<K,V> parent; // red-black tree links
// 左节点
TreeNode<K,V> left;
// 右节点
TreeNode<K,V> right;
// 前一个节点(跟Node中next组成双向链表)
TreeNode<K,V> prev;
// 颜色
boolean red;
TreeNode 构造函数
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
// 初始化父类属性
super(hash, key, val, next);
// 初始化上级节点
this.parent = parent;
}
TreeNode 方法
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
// 查找元素
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
// 当前节点
TreeNode<K,V> p = this;
do {
// 取左右节点
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
// h 小于当前节点hash走左边
if ((ph = p.hash) > h)
p = pl;
// h 大于当前节点hash走右边
else if (ph < h)
p = pr;
// hash一样,key一样
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
// hash一样,key不一样,左节点为空
else if (pl == null)
p = pr;
// ash一样,key不一样,右节点为空
else if (pr == null)
p = pl;
// hash一样,key不一样,左右不为空
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
// 通过比较器比较结果,小于0走左边,大于0走右边
p = (dir < 0) ? pl : pr;
// 比较器为空先从右边递归
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
// 右边也没找到,下次从左边找
else
p = pl;
} while (p != null);
}
return null;
}
重要内部类 TreeBin
static final class TreeBin<K,V> extends Node<K,V>
TreeBin 也继承Node
TreeBin 属性
// 根节点
TreeNode<K,V> root;
// 头节点
volatile TreeNode<K,V> first;
// 等待
volatile Thread waiter;
// 锁定状态
volatile int lockState;
static final int WRITER = 1;
static final int WAITER = 2;
static final int READER = 4;
// 内存操作不安全类
private static final sun.misc.Unsafe U;
// lockState 偏移量
private static final long LOCKSTATE;
TreeBin 加载初始化
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = TreeBin.class;
LOCKSTATE = U.objectFieldOffset
(k.getDeclaredField("lockState"));
} catch (Exception e) {
throw new Error(e);
}
}
TreeBin 构造函数
TreeBin(TreeNode<K,V> b) {
// 初始化Node属性
super(TREEBIN, null, null, null);
// 设置当前元素b为头节点
this.first = b;
TreeNode<K,V> r = null;
// 遍历b所有next
for (TreeNode<K,V> x = b, next; x != null; x = next) {
// 获取next
next = (TreeNode<K,V>)x.next;
// 清空左右节点
x.left = x.right = null;
// 初始化r,设置为根节点
if (r == null) {
x.parent = null;
x.red = false;
r = x;
} else {
// 获取键和hash
K k = x.key;
int h = x.hash;
// 比较器
Class<?> kc = null;
// 遍历r
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
// 确定走左边还是右边
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
// 确定的一边为空,就把x加上去
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
// 赋值根节点
this.root = r;
// 在运行时,如果关闭了assertion功能,这些语句将不起任何作用。
// 如果打开了assertion功能,那么将执行checkInvariants,
// 如果它的值为false,该语句强抛出一个AssertionError对象。
assert checkInvariants(root);
}
比较大小方法
static int tieBreakOrder(Object a, Object b) {
int d;
if (a == null || b == null ||
(d = a.getClass().getName().
compareTo(b.getClass().getName())) == 0)
d = (System.identityHashCode(a) <= System.identityHashCode(b) ?
-1 : 1);
return d;
}
TreeBin 锁定节点
private final void lockRoot() {
// 使用CAS把lockState从0改为WRITER(1)
if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
// 修改失败
contendedLock();
}
private final void unlockRoot() {
lockState = 0;
}
private final void contendedLock() {
// 等待状态
boolean waiting = false;
for (int s;;) {
// lockState为0结果一定是0
if (((s = lockState) & ~WAITER) == 0) {
// 使用CAS把lockState从0改为WRITER(1)
if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
// 等待状态
if (waiting)
waiter = null;
// 成功返回
return;
}
}
else if ((s & WAITER) == 0) {// s不为2时结果为0
// 使用CAS把lockState从s修改为s | WAITER(2)
if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
// 设置是否需要等待为true
waiting = true;
// 等待线程
waiter = Thread.currentThread();
}
}
else if (waiting)
// 阻塞当前线程
LockSupport.park(this);
}
}
TreeBin 查找
final Node<K,V> find(int h, Object k) {
// 值不能为空
if (k != null) {
// 从头节点开始遍历
for (Node<K,V> e = first; e != null; ) {
int s; K ek;
// 判断是否是锁定状态(lockState为0一定不能进入)
if (((s = lockState) & (WAITER|WRITER)) != 0) {
// 当前节点hash是否一样,key是否一样
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
// 节点下移
e = e.next;
}
// 更新lockState
else if (U.compareAndSwapInt(this, LOCKSTATE, s,
s + READER)) {
TreeNode<K,V> r, p;
try {
// 头不为空,从树中查找
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
Thread w;
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
// 唤醒阻塞的线程
LockSupport.unpark(w);
}
return p;
}
}
}
return null;
}
TreeBin 添加元素
final TreeNode<K,V> putTreeVal(int h, K k, V v) {
Class<?> kc = null;
// 是否搜索过
boolean searched = false;
// 从跟节点开始遍历
for (TreeNode<K,V> p = root;;) {
int dir, ph; K pk;
// 根节点为空,构建新的根节点
if (p == null) {
first = root = new TreeNode<K,V>(h, k, v, null, null);
break;
}
// 确定左边还是右边
else if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0) {
if (!searched) {
TreeNode<K,V> q, ch;
// hash一样,比较器失效,开始搜索
searched = true;
if (((ch = p.left) != null &&
(q = ch.findTreeNode(h, k, kc)) != null) ||
((ch = p.right) != null &&
(q = ch.findTreeNode(h, k, kc)) != null))
return q;
}
dir = tieBreakOrder(k, pk);
}
// 下级有空节点开始追加
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
// 获取一把头节点
TreeNode<K,V> x, f = first;
// 更新头节点
first = x = new TreeNode<K,V>(h, k, v, f, xp);
// 头节点不为空,原头节点上级设置为新节点
if (f != null)
f.prev = x;
// 树结构中插入
if (dir <= 0)
xp.left = x;
else
xp.right = x;
// 当前xp为叶子节点,叶子节点是红色需要重新平衡
if (!xp.red)
x.red = true;
else {
// 重新平衡需要锁定
lockRoot();
try {
// 插入平衡
root = balanceInsertion(root, x);
} finally {
// 解除锁定
unlockRoot();
}
}
break;
}
}
assert checkInvariants(root);
return null;
}
TreeBin 删除
final boolean removeTreeNode(TreeNode<K,V> p) {
// 获取上下节点
TreeNode<K,V> next = (TreeNode<K,V>)p.next;
TreeNode<K,V> pred = p.prev; // unlink traversal pointers
TreeNode<K,V> r, rl;
// 上级节点为空,设置next为头节点
if (pred == null)
first = next;
else
// 上级的下级指向当前下级
pred.next = next;
// 下级为空下级的上级指向当前上级
if (next != null)
next.prev = pred;
// 头节点为空
if (first == null) {
// 设置root为空
root = null;
return true;
}
// 在树中删除
if ((r = root) == null || r.right == null || // too small
(rl = r.left) == null || rl.left == null)
// 不成树,直接返回(只有一个元素)
return true;
// 锁定头节点
lockRoot();
try {
TreeNode<K,V> replacement;
TreeNode<K,V> pl = p.left;
TreeNode<K,V> pr = p.right;
// 红黑树删除节点参照HashMap
if (pl != null && pr != null) {
TreeNode<K,V> s = pr, sl;
while ((sl = s.left) != null) // find successor
s = sl;
boolean c = s.red; s.red = p.red; p.red = c; // swap colors
TreeNode<K,V> sr = s.right;
TreeNode<K,V> pp = p.parent;
if (s == pr) { // p was s‘s direct parent
p.parent = s;
s.right = p;
}
else {
TreeNode<K,V> sp = s.parent;
if ((p.parent = sp) != null) {
if (s == sp.left)
sp.left = p;
else
sp.right = p;
}
if ((s.right = pr) != null)
pr.parent = s;
}
p.left = null;
if ((p.right = sr) != null)
sr.parent = p;
if ((s.left = pl) != null)
pl.parent = s;
if ((s.parent = pp) == null)
r = s;
else if (p == pp.left)
pp.left = s;
else
pp.right = s;
if (sr != null)
replacement = sr;
else
replacement = p;
}
else if (pl != null) // 右边为空
replacement = pl;
else if (pr != null) // 左边为空
replacement = pr;
else
replacement = p;
if (replacement != p) {
TreeNode<K,V> pp = replacement.parent = p.parent;
if (pp == null)
r = replacement;
else if (p == pp.left)
pp.left = replacement;
else
pp.right = replacement;
p.left = p.right = p.parent = null;
}
// 删除并平衡
root = (p.red) ? r : balanceDeletion(r, replacement);
if (p == replacement) { // detach pointers
TreeNode<K,V> pp;
if ((pp = p.parent) != null) {
if (p == pp.left)
pp.left = null;
else if (p == pp.right)
pp.right = null;
p.parent = null;
}
}
} finally {
// 解锁
unlockRoot();
}
assert checkInvariants(root);
return false;
}
左旋右旋
static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root,
TreeNode<K,V> p) {
。。。
}
static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root,
TreeNode<K,V> p) {
。。。
}
插入删除平衡
static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root,
TreeNode<K,V> x) {
。。。
}
static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root,
TreeNode<K,V> x) {
。。。
}
重要内部类ForwardingNode
static final class ForwardingNode<K,V> extends Node<K,V>
ForwardingNode 在扩容移动时会使用
ForwardingNode 属性
// 扩容后数组
final Node<K,V>[] nextTable;
####ForwardingNode 构造函数
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
ForwardingNode 方法
Node<K,V> find(int h, Object k) {
// 自旋
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
// 新数组为空或者头节点为空,返回
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
// 自旋
for (;;) {
int eh; K ek;
// 头节点一样
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
// hash 小于 0(说明是treeBin)
if (eh < 0) {
// 头节点是ForwardingNode实例
if (e instanceof ForwardingNode) {
// 获取扩容后新数组
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
// 继续查找
return e.find(h, k);
}
// 达到尾节点,返回
if ((e = e.next) == null)
return null;
}
}
}
ConcurrentHashMap 属性
// 数组最大长度
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 初始化默认长度
private static final int DEFAULT_CAPACITY = 16;
// 元素个数最大值(toArray中使用)
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默认并发级别
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 默认加载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转树阈值
static final int TREEIFY_THRESHOLD = 8;
// 树转链表阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 树转链表,table长度阈值
static final int MIN_TREEIFY_CAPACITY = 64;
// 扩容迁移元素时,每个线程处理16个槽
private static final int MIN_TRANSFER_STRIDE = 16;
// 用于生成每次扩容都唯一的生成戳的数
private static int RESIZE_STAMP_BITS = 16;
// 最大的扩容线程的数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 移位量,把生成戳移位后保存在sizeCtl中当做扩容线程计数的基数,
// 相反方向移位后能够反解出生成戳
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 表示正在转移
static final int MOVED = -1;
// 表示已经转换成树
static final int TREEBIN = -2;
// ReservationNode 初始化hash值
static final int RESERVED = -3;
// int最大值
static final int HASH_BITS = 0x7fffffff;
// 获得可用的处理器个数
static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final ObjectStreamField[] serialPersistentFields = {
new ObjectStreamField("segments", Segment[].class),
new ObjectStreamField("segmentMask", Integer.TYPE),
new ObjectStreamField("segmentShift", Integer.TYPE)
};
// 元素数组
transient volatile Node<K,V>[] table;
// 扩容后的新的table数组
private transient volatile Node<K,V>[] nextTable;
// 元素个数计数器值
private transient volatile long baseCount;
// 扩容阈值
private transient volatile int sizeCtl;
// 下一个transfer任务的起始下标index
private transient volatile int transferIndex;
// counterCells 扩容标志
private transient volatile int cellsBusy;
// 并发时
private transient volatile CounterCell[] counterCells;
// 键集合
private transient KeySetView<K,V> keySet;
// 值集合
private transient ValuesView<K,V> values;
// 键值实体集合
private transient EntrySetView<K,V> entrySet;
// 内存操作不安全类
private static final sun.misc.Unsafe U;
// 扩容阈值偏移量
private static final long SIZECTL;
// transfer任务任务下标偏移量
private static final long TRANSFERINDEX;
// 元素个数偏移量
private static final long BASECOUNT;
// cellsBusy偏移量
private static final long CELLSBUSY;
// counterCells 偏移量
private static final long CELLVALUE;
// table偏移量
private static final long ABASE;
// table数组元素偏移量
private static final int ASHIFT;
ConcurrentHashMap 加载初始化
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset
(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset
(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset
(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset
(k.getDeclaredField("cellsBusy"));
Class<?> ck = CounterCell.class;
CELLVALUE = U.objectFieldOffset
(ck.getDeclaredField("value"));
Class<?> ak = Node[].class;
ABASE = U.arrayBaseOffset(ak);
int scale = U.arrayIndexScale(ak);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
ConcurrentHashMap 构造函数
public ConcurrentHashMap() {
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 计算扩容阈值
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// concurrencyLevel 表示估计的参与并发更新的
// 线程数量,必须比初始化容量的要大
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel;
// 扩容阈值
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
以上是关于数据结构 - ConcurrentHashMap 一步步深入的主要内容,如果未能解决你的问题,请参考以下文章
ConcurrentHashMap底层实现原理(JDK1.8)源码分析