?集合工具类使用线程

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了?集合工具类使用线程相关的知识,希望对你有一定的参考价值。

集合工具类使用线程

1. hashmap源码解析与并发可能遇见的问题

1.HashMap中的几个重要变量

static final int DEFAULT_INITIAL_CAPACITY = 16;
    //默认初始容量,必须是2的n次方
static final int MAXIMUM_CAPACITY = 1 << 30;
    //最大容量,当通过构造方法传入的容量比它还大时,就用这个最大容量,必须是2的n次方
static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //默认负载因子
transient Entry<K,V>[] table;
    //用来存储键值对,可以看到键值对都是存储在Entry中的
transient int size;
    //存放元素的个数
  
int threshold; 
    //临界值   当实际大小超过临界值时,会进行扩容threshold = 加载因子*容量
 
final float loadFactor; 
    //加载因子
  
transient int modCount;
    //被修改的次数


存储结构

技术分享图片技术分享图片 

2.Entry是一个链表结构,不仅包含keyvalue,还有可以指向下一个的next

static class Entry<K,V> implements Map.Entry<K,V> {
        final K key;
        V value;
        Entry<K,V> next;
        int hash;
        /**
         * Creates new entry.
         */
        Entry(int h, K k, V v, Entry<K,V> n) {
            value = v;
            next = n;
            key = k;
            hash = h;
        }
        ...
//3.put方法
public V put(K key, V value) {
        if (key == null)
            return putForNullKey(value);//储存空键
        int hash = hash(key);//计算hash值
        int i = indexFor(hash, table.length);//计算存储位置
        for (Entry<K,V> e = table[i]; e != null; e = e.next) {//遍历hashmap的内部数据
            Object k;
            if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
                V oldValue = e.value;
                e.value = value;
                e.recordAccess(this);
                return oldValue;
            }
        }
//这个for循环,当发生并发,两个线程冲突的时候,这个链表的结构会发生变化:可能两个key互为对方的next元素。此时通过next遍历,会形成死循环。在jdb8中已经不存在了。最好的解决办法是使用concurrenthashmap
        modCount++;
        addEntry(hash, key, value, i);
        return null;
    }
//首先通过hash方法对hashcode进行处理:
final int hash(Object k) {
        int h = 0;
        h ^= k.hashCode();
        h ^= (h >>> 20) ^ (h >>> 12);
        return h ^ (h >>> 7) ^ (h >>> 4);
    }
//可以看到只是在key的hashcode值上做了一些处理,通过hash计算出来的值将会使用indexFor方法找到它应该所在的table下标:
static int indexFor(int h, int length) {
        return h & (length-1);
    }
//这个方法其实相当于对table.length取模。
//当需要插入的key为null时,调用putForNullKey方法处理:
 private V putForNullKey(V value) {
        for (Entry<K,V> e = table[0]; e != null; e = e.next) {
            if (e.key == null) {
                V oldValue = e.value;
                e.value = value;
                e.recordAccess(this);
                return oldValue;
            }
        }
        modCount++;
        addEntry(0, null, value, 0);
        return null;
    }
//putForNullKey方法只从table[0]这个位置开始遍历,因为key为null只放在table中的第一个位置,下标为0,在遍历中如果发现已经有key为null了,则替换新value,返回旧value,结束;如果还没有key为null,调用addEntry方法增加一个Entry:
void addEntry(int hash, K key, V value, int bucketIndex) {
        if ((size >= threshold) && (null != table[bucketIndex])) {
            resize(2 * table.length);
            hash = (null != key) ? hash(key) : 0;
            bucketIndex = indexFor(hash, table.length);
        }
        createEntry(hash, key, value, bucketIndex);
    }
//可以看到jdk7中resize的条件已经发生改变了,只有当 size>=threshold并且 table中的那个槽中已经有Entry时,才会发生resize。即有可能虽然size>=threshold,但是必须等到每个槽都至少有一个Entry时,才会扩容。还有注意每次resize都会扩大一倍容量
void createEntry(int hash, K key, V value, int bucketIndex) {
        Entry<K,V> e = table[bucketIndex];
        table[bucketIndex] = new Entry<>(hash, key, value, e);
        size++;
    }
//最后看createEntry,它先保存这个桶中的第一个Entry,创建新的Entry放入第一个位置,将原来的Entry接在后面。这里采用的是头插法插入元素。
4.get方法
//其实get方法和put方法如出一辙,怎么放的怎么拿
public V get(Object key) {
        if (key == null)
            return getForNullKey();
        Entry<K,V> entry = getEntry(key);
        return null == entry ? null : entry.getValue();
    }
//key为null时,还是去table[0]去取:
private V getForNullKey() {
        for (Entry<K,V> e = table[0]; e != null; e = e.next) {
            if (e.key == null)
                return e.value;
        }
        return null;
    }
//否则调用getEntry方法:
final Entry<K,V> getEntry(Object key) {
        int hash = (key == null) ? 0 : hash(key);
        for (Entry<K,V> e = table[indexFor(hash, table.length)];
             e != null;
             e = e.next) {
            Object k;
            if (e.hash == hash &&
                ((k = e.key) == key || (key != null && key.equals(k))))
                return e;
        }
        return null;
    }
//这个方法也是通过key的hashcode计算出它应该所在的下标,再遍历这个下标的Entry链,如果key的内存地址相等(即同一个引用)或者equals相等,则说明找到了


hash的原则

A、等幂性。不管执行多少次获取Hash值的操作,只要对象不变,那么Hash值是固定的。如果第一次取跟第N次取不一样,那就用起来很麻烦.

B、对等性。若两个对象equal方法返回为true,则其hash值也应该是一样的。举例说明:若你将objA作为key存入HashMap中,然后new了一个objB。在你看来objB和objA是一个东西(因为他们equal),但是使用objB到hashMap中却取不出来东西。

C、互异性。若两个对象equal方法返回为false,hash值有可能相同,但最好是不同的,这个不是必须的,只是这样做会提高hash类操作的性能(碰撞几率低)。

解决hash碰撞的方法:
开放地址法
链地址法

hashmap采用的就是链地址法,这种方法好处是无堆积现象,但是next指针会占用额外空间

和jdk8中的HashMap区别

在jdk8中,仍然会根据key.hashCode()计算出hash值,再通过这个hash值去定位这个key,但是不同的是,当发生冲突时,会采用链表和红黑树两种方法去处理,当结点个数较少时用链表(用Node存储)个数较多时用红黑树(用TreeNode存储),同时结点也不叫Entry了,而是分成了Node和TreeNode。再最坏的情况下,链表查找的时间复杂度为O(n),而红黑树一直是O(logn),这样会提高HashMap的效率。
jdk8中的HashMap中定义了一个变量TREEIFY_THRESHOLD,当节点个数>= TREEIFY_THRESHOLD - 1时,HashMap将采用红黑树存储

Put方法也变了,为了防止并发问题。

扩展:为何数组的长度是 2 的 n 次方呢?

1.这个方法非常巧妙,它通过 h & (table.length -1) 来得到该对象的保存位,而HashMap 底层数组的长度总是 2 的 n 次方,2n-1 得到的二进制数的每个位上的值都为 1,那么与全部为 1 的一个数进行与操作速度会大大提升。

2.当 length 总是 2 的 n 次方时,h& (length-1)运算等价于对 length 取模,也就是h%length,但是&比%具有更高的效率。

3.当数组长度为 2 的 n 次幂的时候,不同的 key 算得的 index 相同的几率较小,那么数据在数组上分布就比较均匀,也就是说碰撞的几率小,相对的,查询的时候就不用遍历某个位置上的链表,这样查询效率也就较高了。

HashMap 的扩容机制:

当 HashMap 中的结点个数超过数组大小*loadFactor(加载因子)时,就会进行数组扩容,loadFactor 的默认值为 0.75。也就是说,默认情况下,数组大小为 16,那么当 HashMap中结点个数超过 16*0.75=12 的时候,就把数组的大小扩展为 2*16=32,即扩大一倍,然后重新计算每个元素在数组中的位置,并放进去,而这是一个非常消耗性能的操作


Hashmap和HashTable的异同:

01.两者的默认容量与负载因子有变化

02.hashtable的 容量可以是任意值,而hashmap必须是2的次幂

03.hashtable中在put方法里面不允许值与键为空

04.计算索引的方式不同(indexof函数不同)

05.hashtable大部分方法都加上了sychronied关键字

06.hashtable每次扩容,容量为原来的两倍加2.

 

2. concurrenthashmap源码解析与并发编程

使用与获取全局信息的方法并不频繁的时候

01.在 ConcurrentHashMap 中,不允许用 null 作为键和值。

02.ConcurrentHashMap 使用分段锁(减少锁粒度)技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。

默认情况下分为16个段。

技术分享图片 

 

03.当增加一个新的表项,不是全部加锁,会先计算在哪个段,对指定的段加锁。

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        int j = (hash >>> segmentShift) & segmentMask;
//上面两行用于获取段号
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);//得到段,将数据插入到段中
        return s.put(key, hash, value, false);
    }


04.当系统需要取得全局锁,消耗资源就会比较多。比如size()方法:事实上会先使用无锁的方式求和,如果失败,会先获得所有段的锁再去求和。

技术分享图片 

 

3. BlockingQueue

A线程可以知道b线程的存在

是一个接口并非一个具体实现:

ArrayBlockingQueue

ArrayBlockingQueue的内部元素都放置在一个对象数组中:final Object[] items;

Offer():当队列已经满了,会立即返回false

Put():如果队列满了会一直等待

Pool():弹出元素,如果为空返回null

Take():弹出元素,如果为空等待到有元素即可。

Take方法:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }
    
private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }


Put方法:

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }
/**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]);
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();
        return x;
    }

LinkedBlockingQueue(锁分离)

两把不同的锁
/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
 
    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();
 
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
 
    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
 
Take函数
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();//不能有两个线程同时取数据
        try {
            while (count.get() == 0) {//如果没有数据,一直等待(因为是lockInterruptibly,可中断)
                notEmpty.await();
            }
            x = dequeue();//取得第一个数据
            c = count.getAndDecrement();//数量-1,原子操作,因为会和put同时访问count。
            if (c > 1)
                notEmpty.signal();//通知其他take操作
        } finally {
            takeLock.unlock();//释放锁
        }
        if (c == capacity)
            signalNotFull();//通知put操作,已有空余空间
        return x;
    }
 
Put函数
public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();//上锁不能有两个线程同时进行put函数
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {//当队列已经满了以后,等待
                notFull.await();
            }
            enqueue(node);//插入数据
            c = count.getAndIncrement();//更新总数
            if (c + 1 < capacity)
                notFull.signal();//有足够的空间,通知其他线程
        } finally {
            putLock.unlock();//释放锁
        }
        if (c == 0)
            signalNotEmpty();//释放成功后,通知take函数取数据
    }

4. 并发下的ArrayList

当ArrayList在扩容的时候,内部一致性被破坏,由于没有锁的保护,另外一个线程访问不到不一致的内部状态,导致出现越界问题。

还会出现多个线程同时对同一位置进行赋值。

5. concurrentlinkedqueue:

高并发环境中可以说是最好的队列,也可以看做是一个线程安全的linkedList。

6. CopyOnWriteArrayList

性能很好的读写list,在读写的时候任何时候都不加锁;只有在写写的时候需要同步等待。

当写操作的时候,进行一次自我复制。对原有的数据进行一次复制,将修改的内容写入副本修改完之后,将副本替换原来的数据。


以上是关于?集合工具类使用线程的主要内容,如果未能解决你的问题,请参考以下文章

?集合工具类使用线程

Java多线程与并发库高级应用-工具类介绍

Java多线程与并发库高级应用-工具类介绍

Java并发多线程编程——集合类线程不安全之HashMap的示例及解决方案

Java并发多线程编程——集合类线程不安全之HashSet的示例及解决方案

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段