并发容器ConcurrentHashMap原理解析及应用
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发容器ConcurrentHashMap原理解析及应用相关的知识,希望对你有一定的参考价值。
前言
本篇文章主要解析juc包下面的并发容器 ConcurrentHashMap,1.7的实现,以及1.8之后做了哪些优化;对比hashmap做的优化,为什么在hashtable的源码注释上推荐使用ConcurrentHashMap对比。
ConcurrentHashMap 概述
HashMap 是集合Collection的重要成员,也是Map族最为在工作用到常用的一种,正是因为查找插入等效率高导致它经常被使用,这个原因因此有它的局限性;那就是它不保证多线程情况下,数据的安全性,一旦多线程操作hashMap,就可能出现线程安全问题,例如扩容时死循环问题;
从jdk1.1开始开发时,作者已经想到这个问题,因此有了hashtable这个类去解决问题,他当时觉得反正对于多线程操作map集合我们只要保证他的线程安全并不怎么管效率如何,加了synchronized关键字看起来至少了解决办法,要不然你就对多线程下操作map自己加锁,就是效率太低了。
Java 集合深入理解 (十四) :Hashtable实现原理研究
一直到jdk1.5版本的时候,有了一个质的飞跃,不得不说到一个人,那就是Doug Lea 大神 想到了不使用synchronized关键字,使用synchronized关键字极不易扩展的,而且不能自己控制,并且锁粒度太粗了,本来我多线程过来都是读操作,对数据并不冲突,我可以让他并发进行的,只对写有进行锁起来。对于map来,我们本来map就是链表数组操作,将map进行分组,多个线程分组去查询和写入这是可以并行处理,当然我只说了一部分,例如阻塞 队列,以及park和unpark,然后atomic 这些,都是juc包下面的实现,供给我们不一样的想法,我们可以将粒度降的很低。
继续说说ConcurrentHashMap 这个是jdk1.5 时,juc包提出时,就提出由这个类,当时提出将散列表,进行分组,多线程情况,按数据分组控制多线程。这在我看来已经挺好了,但是发觉在jdk1.8以后又继续优化,确实在不断的进步呀,因为在jdk1.8jvm给jdk提出了Unsafe 自旋操作,也是来源cpu在性能不断增加,以及内存不断增大,多线程在操作数据前做了一步检查,看老数据是否正确,就是这步操作,在ConcurrentHashMap并不需要分组操作,而只需要自旋操作,效率又做了很大的提升。对于jdk中ConcurrentHashMap来说它认为在使用过程中,不可能数据一直冲突把,如果是一直冲突,那我们自己业务代码肯定是需要修改的,来保证数据的准确性的。而且如果大量的多线程操作数据,大量自旋也是很消耗cpu 和内存,可能以后硬件的强度继续提升到非常大时,有可能又会有新的好算法好的思想出现,这篇文章主要也是研究ConcurrentHashMap是什么,也会从源码去分析,对比着hashmap去看,他们结构是非常相似的。
不外乎在原有操作新增了cas操作
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
上图为整个map的继承实现图。ConcurrentHashMap 即实现了ConcurrentMap 提供线程安全性和原子性的{@link java.util.Map}保证。 又继承自AbstractMap 来实现基本map的基本方法 包括put remove size iterator迭代器等基础方法。
ConcurrentHashMap源码分析
jdk1.7中
更新操作之间允许的并发性由可选的concurrencyLevel构造函数参数 默认值16(这是来自于一个segments的长度为16),用作内部大小调整的提示。这个表是内部分区的,以尝试允许指定的无争用的并发更新数。因为安置在哈希表本质上是随机的情况下,实际的并发将变化。理想情况下,您应该选择一个值来容纳尽可能多的对象线程将同时修改表。使用远高于需要的价值可能会浪费空间和时间,并且显著较低的值可能会导致线程争用。但是 在一个数量级内高估和低估通常不会有太多明显的影响。
下面的属性和hashmap中字段是一致的,用来初始化散列表的大小和阈值。
static final int DEFAULT_INITIAL_CAPACITY = 16;
static final float DEFAULT_LOAD_FACTOR = 0.75f;
然后下面独有的默认的并发级别参数设置,这在1.8过后是取消掉的
/**
* 此表的默认并发级别,在构造函数中未另行指定时使用。
*/
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 用于在段内建立索引的Shift值。
*/
final int segmentShift;
/** segments中每个元素都是一个专用的hashtable
* The segments, each of which is a specialized hash table.
*/
final Segment<K,V>[] segments;
put方法
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key); // 计算Hash值
int j = (hash >>> segmentShift) & segmentMask; //计算下标j
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j); //若j处有segment就返回,若没有就创建并返回
return s.put(key, hash, value, false); //将值put到segment中去
}
所有的操作都是segment 封装好的对象进行操作。计算出hash值
主要Segment 进行封装起来,调用Segment 的put方法进行添加数据
Segment 从属性上看,就是一个hashtable.
而segment中put方法和hashmap中方法是一致的。
Segment方法看它怎么去保证数据线程安全,既然是juc下面的包,那肯定不会用synchronized,很不容易扩展。这点从继承方法就能看出。
static final class Segment<K, V> extends ReentrantLock implements Serializable {
针对put方法
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); // 如果tryLock成功,就返回null,否则。。。
V oldValue;
try {
HashEntry<K, V>[] tab = table;
int index = (tab.length - 1) & hash; // 根据table数组的长度 和 hash值计算index小标
HashEntry<K, V> first = entryAt(tab, index); // 找到table数组在 index处链表的头部
for (HashEntry<K, V> e = first;;) { // 从first开始遍历链表
if (e != null) { // 若e!=null
K k;
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { // 如果key相同
oldValue = e.value; // 获取旧值
if (!onlyIfAbsent) { // 若absent=false
e.value = value; // 覆盖旧值
++modCount; //
}
break; // 若已经找到,就退出链表遍历
}
e = e.next; // 若key不相同,继续遍历
} else { // 直到e为null
if (node != null) // 将元素放到链表头部
node.setNext(first);
else
node = new HashEntry<K, V>(hash, key, value, first); // 创建新的Entry
int c = count + 1; // count 用来记录元素个数
if (c > threshold && tab.length < MAXIMUM_CAPACITY) // 如果hashmap元素个数超过threshold,并且table长度小于最大容量
rehash(node); // rehash跟resize的功能差不多,将table的长度变为原来的两倍,重新打包entries,并将给定的node添加到新的table
else // 如果还有容量
setEntryAt(tab, index, node); // 就在index处添加链表节点
++modCount; // 修改操作数
count = c; // 将count+1
oldValue = null; //
break;
}
}
} finally {
unlock(); // 执行完操作后,释放锁
}
return oldValue; // 返回oldValue
}
这里就是普通的HashEntry 。
这里保证数据线程安全的方法;就是下面的方法。 给对象进行加锁
HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
这里这样做的好处还是在于每个没冲突的数据,保证多线程安全性。 散列数组每一个数据添加锁。
最后的现象,segment有多少个,并发量就有多少个。
jdk1.8
这里说一点hashmap对于链表转换成红黑树和回退条件。
转换成红黑树,链表长度要达到8,以及数据数达到64进行转换;还是很困难的,因为泊松分布来看,一般更容易是扩容,而不是转换成红黑树
红黑树转链表,链表长度小于6,而发生条件,有两种一是扩容,也有可能删除元素。都有可能导致。
主要利用的casTabAt 方法
if (casTabAt(tab, i, null, //直接用CAS操作,i处的元素
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin 想emptybin中假如元素的时候,不需要加锁
static final <K, V> boolean casTabAt(Node<K, V>[] tab, int i, Node<K, V> c, Node<K, V> v) {
return U.compareAndSwapObject(tab, ((long) i << ASHIFT) + ABASE, c, v);
}
这里对比着hashmap多了自旋操作,多线程下,有数据修改失败的情况
挂载到链表上,锁住头上存的元素, 添加了synchronized,如果链表数组元素为空时,就直接使用cas去修改数据,做自旋操作
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //计算hash值
int binCount = 0;
for (Node<K,V>[] tab = table;;) { //自旋
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) //table==null || table.length==0
tab = initTable(); //就initTable
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //若下标 i 处的元素为null
if (casTabAt(tab, i, null, //直接用CAS操作,i处的元素
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin 想emptybin中假如元素的时候,不需要加锁
}
else if ((fh = f.hash) == MOVED) //若下标 i 处的元素不为null,且f.hash==MOVED MOVED为常量值-1
tab = helpTransfer(tab, f); //
else { //如果是一般的节点
V oldVal = null;
synchronized (f) { //当头部元素不为null,且不需要转换成树时,需要进行同步操作
if (tabAt(tab, i) == f) {
if (fh >= 0) { //若 链表头部hash值 >=0
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { //如果key相同
oldVal = e.val;
if (!onlyIfAbsent) //且不为absent
e.val = value; //旧值覆盖新值
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null), { //如果链表遍历完成,还没退出,说明没有相同的key存在,在尾部添加节点
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //如果f是Tree的节点
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
相对于jdk1.7时,提升了没有hash碰撞时,做了优化使用cas操作。
1.8时,粒度更小了,并发量提高。
总结
整个concurrentHashMap对比各个版本去介绍他怎么达到多线程下数据安全,从hashtable开始到concurrenthashmap,体现出了java作者,对运行效率的优化和提升。在工作中根据业务去选择不同的集合容器,怎么样达到更快的效率,更好的内存。这都是需要研究和讨论的
以上是关于并发容器ConcurrentHashMap原理解析及应用的主要内容,如果未能解决你的问题,请参考以下文章
ConcurrentHashMap中节点数目并发统计的实现原理
Java ConcurrentHashMap 高并发安全实现原理解析