并发容器ConcurrentHashMap原理解析及应用

Posted 踩踩踩从踩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发容器ConcurrentHashMap原理解析及应用相关的知识,希望对你有一定的参考价值。

前言

本篇文章主要解析juc包下面的并发容器 ConcurrentHashMap,1.7的实现,以及1.8之后做了哪些优化;对比hashmap做的优化,为什么在hashtable的源码注释上推荐使用ConcurrentHashMap对比。

HashMap之实现原理及hash碰撞

HashMap之扩容 数据迁移

ConcurrentHashMap 概述

HashMap 是集合Collection的重要成员,也是Map族最为在工作用到常用的一种,正是因为查找插入等效率高导致它经常被使用,这个原因因此有它的局限性;那就是它不保证多线程情况下,数据的安全性,一旦多线程操作hashMap,就可能出现线程安全问题,例如扩容时死循环问题;

从jdk1.1开始开发时,作者已经想到这个问题,因此有了hashtable这个类去解决问题,他当时觉得反正对于多线程操作map集合我们只要保证他的线程安全并不怎么管效率如何,加了synchronized关键字看起来至少了解决办法,要不然你就对多线程下操作map自己加锁,就是效率太低了。

Java 集合深入理解 (十四) :Hashtable实现原理研究

一直到jdk1.5版本的时候,有了一个质的飞跃,不得不说到一个人,那就是Doug Lea 大神 想到了不使用synchronized关键字,使用synchronized关键字极不易扩展的,而且不能自己控制,并且锁粒度太粗了,本来我多线程过来都是读操作,对数据并不冲突,我可以让他并发进行的,只对写有进行锁起来。对于map来,我们本来map就是链表数组操作,将map进行分组,多个线程分组去查询和写入这是可以并行处理,当然我只说了一部分,例如阻塞 队列,以及park和unpark,然后atomic 这些,都是juc包下面的实现,供给我们不一样的想法,我们可以将粒度降的很低。

继续说说ConcurrentHashMap 这个是jdk1.5  时,juc包提出时,就提出由这个类,当时提出将散列表,进行分组,多线程情况,按数据分组控制多线程。这在我看来已经挺好了,但是发觉在jdk1.8以后又继续优化,确实在不断的进步呀,因为在jdk1.8jvm给jdk提出了Unsafe 自旋操作,也是来源cpu在性能不断增加,以及内存不断增大,多线程在操作数据前做了一步检查,看老数据是否正确,就是这步操作,在ConcurrentHashMap并不需要分组操作,而只需要自旋操作,效率又做了很大的提升。对于jdk中ConcurrentHashMap来说它认为在使用过程中,不可能数据一直冲突把,如果是一直冲突,那我们自己业务代码肯定是需要修改的,来保证数据的准确性的。而且如果大量的多线程操作数据,大量自旋也是很消耗cpu 和内存,可能以后硬件的强度继续提升到非常大时,有可能又会有新的好算法好的思想出现,这篇文章主要也是研究ConcurrentHashMap是什么,也会从源码去分析,对比着hashmap去看,他们结构是非常相似的。

不外乎在原有操作新增了cas操作

  static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

 上图为整个map的继承实现图。ConcurrentHashMap 即实现了ConcurrentMap 提供线程安全性和原子性的{@link java.util.Map}保证。 又继承自AbstractMap 来实现基本map的基本方法 包括put remove size iterator迭代器等基础方法。

ConcurrentHashMap源码分析

jdk1.7中

更新操作之间允许的并发性由可选的concurrencyLevel构造函数参数 默认值16(这是来自于一个segments的长度为16),用作内部大小调整的提示。这个表是内部分区的,以尝试允许指定的无争用的并发更新数。因为安置在哈希表本质上是随机的情况下,实际的并发将变化。理想情况下,您应该选择一个值来容纳尽可能多的对象线程将同时修改表。使用远高于需要的价值可能会浪费空间和时间,并且显著较低的值可能会导致线程争用。但是 在一个数量级内高估和低估通常不会有太多明显的影响。

下面的属性和hashmap中字段是一致的,用来初始化散列表的大小和阈值。

 static final int DEFAULT_INITIAL_CAPACITY = 16;
 static final float DEFAULT_LOAD_FACTOR = 0.75f;

然后下面独有的默认的并发级别参数设置,这在1.8过后是取消掉的

 /**
     * 此表的默认并发级别,在构造函数中未另行指定时使用。
     */
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    
/**
     * 用于在段内建立索引的Shift值。
     */
    final int segmentShift;

 /** segments中每个元素都是一个专用的hashtable
     * The segments, each of which is a specialized hash table.
     */
    final Segment<K,V>[] segments;

put方法

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);       // 计算Hash值
        int j = (hash >>> segmentShift) & segmentMask;      //计算下标j
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);       //若j处有segment就返回,若没有就创建并返回
        return s.put(key, hash, value, false);  //将值put到segment中去
    }

所有的操作都是segment 封装好的对象进行操作。计算出hash值

主要Segment 进行封装起来,调用Segment 的put方法进行添加数据

Segment 从属性上看,就是一个hashtable.

而segment中put方法和hashmap中方法是一致的。

Segment方法看它怎么去保证数据线程安全,既然是juc下面的包,那肯定不会用synchronized,很不容易扩展。这点从继承方法就能看出。

static final class Segment<K, V> extends ReentrantLock implements Serializable {

针对put方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
			HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); // 如果tryLock成功,就返回null,否则。。。
			V oldValue;
			try {
				HashEntry<K, V>[] tab = table;
				int index = (tab.length - 1) & hash; // 根据table数组的长度 和 hash值计算index小标
				HashEntry<K, V> first = entryAt(tab, index); // 找到table数组在 index处链表的头部
				for (HashEntry<K, V> e = first;;) { // 从first开始遍历链表
					if (e != null) { // 若e!=null
						K k;
						if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { // 如果key相同
							oldValue = e.value; // 获取旧值
							if (!onlyIfAbsent) { // 若absent=false
								e.value = value; // 覆盖旧值
								++modCount; //
							}
							break; // 若已经找到,就退出链表遍历
						}
						e = e.next; // 若key不相同,继续遍历
					} else { // 直到e为null
						if (node != null) // 将元素放到链表头部
							node.setNext(first);
						else
							node = new HashEntry<K, V>(hash, key, value, first); // 创建新的Entry
						int c = count + 1; // count 用来记录元素个数
						if (c > threshold && tab.length < MAXIMUM_CAPACITY) // 如果hashmap元素个数超过threshold,并且table长度小于最大容量
							rehash(node); // rehash跟resize的功能差不多,将table的长度变为原来的两倍,重新打包entries,并将给定的node添加到新的table
						else // 如果还有容量
							setEntryAt(tab, index, node); // 就在index处添加链表节点
						++modCount; // 修改操作数
						count = c; // 将count+1
						oldValue = null; //
						break;
					}
				}
			} finally {
				unlock(); // 执行完操作后,释放锁
			}
			return oldValue; // 返回oldValue
		}

这里就是普通的HashEntry 。

这里保证数据线程安全的方法;就是下面的方法。 给对象进行加锁

HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);

这里这样做的好处还是在于每个没冲突的数据,保证多线程安全性。 散列数组每一个数据添加锁。

最后的现象,segment有多少个,并发量就有多少个。

jdk1.8

 这里说一点hashmap对于链表转换成红黑树和回退条件。

转换成红黑树,链表长度要达到8,以及数据数达到64进行转换;还是很困难的,因为泊松分布来看,一般更容易是扩容,而不是转换成红黑树

红黑树转链表,链表长度小于6,而发生条件,有两种一是扩容,也有可能删除元素。都有可能导致。

 

 主要利用的casTabAt 方法

               if (casTabAt(tab, i, null,                           //直接用CAS操作,i处的元素
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin   想emptybin中假如元素的时候,不需要加锁
	static final <K, V> boolean casTabAt(Node<K, V>[] tab, int i, Node<K, V> c, Node<K, V> v) {
		return U.compareAndSwapObject(tab, ((long) i << ASHIFT) + ABASE, c, v);
	}

这里对比着hashmap多了自旋操作,多线程下,有数据修改失败的情况

挂载到链表上,锁住头上存的元素, 添加了synchronized,如果链表数组元素为空时,就直接使用cas去修改数据,做自旋操作

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());      //计算hash值
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {   //自旋
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)       //table==null || table.length==0
                tab = initTable();                          //就initTable
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {    //若下标 i 处的元素为null
                if (casTabAt(tab, i, null,                           //直接用CAS操作,i处的元素
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin   想emptybin中假如元素的时候,不需要加锁
            }
            else if ((fh = f.hash) == MOVED)    //若下标 i 处的元素不为null,且f.hash==MOVED MOVED为常量值-1
                tab = helpTransfer(tab, f);     //
            else {                              //如果是一般的节点
                V oldVal = null;
                synchronized (f) {              //当头部元素不为null,且不需要转换成树时,需要进行同步操作
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {          //若 链表头部hash值 >=0
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {     //如果key相同
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)      //且不为absent
                                        e.val = value;      //旧值覆盖新值
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null), {     //如果链表遍历完成,还没退出,说明没有相同的key存在,在尾部添加节点
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {        //如果f是Tree的节点
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

相对于jdk1.7时,提升了没有hash碰撞时,做了优化使用cas操作。

1.8时,粒度更小了,并发量提高。

总结

整个concurrentHashMap对比各个版本去介绍他怎么达到多线程下数据安全,从hashtable开始到concurrenthashmap,体现出了java作者,对运行效率的优化和提升。在工作中根据业务去选择不同的集合容器,怎么样达到更快的效率,更好的内存。这都是需要研究和讨论的

以上是关于并发容器ConcurrentHashMap原理解析及应用的主要内容,如果未能解决你的问题,请参考以下文章

ConcurrentHashMap中节点数目并发统计的实现原理

Java ConcurrentHashMap 高并发安全实现原理解析

并发编程系列之并发容器:ConcurrentHashMap

Java并发容器--ConcurrentHashMap

java并发编程的艺术,读书笔记第六章 concurrentHashMap以及并发容器的介绍

第六章 Java并发容器和框架