#yyds干货盘点# 来,听我讲讲常用并发容器
Posted 你呀不牛
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了#yyds干货盘点# 来,听我讲讲常用并发容器相关的知识,希望对你有一定的参考价值。
常用并发容器
前几天和同事xhf、zm走查代码,功能是为了减少频繁你创建FTP开销用线程notify和wait实现了一个FTP池子,当时提的建议就是用java自带的线程集合实现可能更高效,本文整理下JDK自带线程安全的集合,不考虑多线程并发的情况下,容器类一般使用 ArrayList、HashMap 等线程不安全的类,效率更高。在并发场景下,常会用到ConcurrentHashMap、ArrayBlockingQueue 等线程安全的容器类,虽然牺牲了一些效率,但却得到了安全。
什么是线程安全:
线程安全一般指的就是线程同步的意思,就是当一个程序对一个线程安全的方法或者语句进行访问的时候,其他的不能再对他进行操作了,必须等到这次访问结束以后才能对这个线程安全的方法进行访问。
线程非安全用hashmap举例试下:
public class TestThreadSafe
private Map<String, Integer> persons = new HashMap<>();
private AtomicInteger count = new AtomicInteger(0);
@Test
public void test() throws Exception
for (int i = 0; i < 10000; i++)
int age = i;
new Thread(()->addName("steven"+ age, age)).start();
TimeUnit.SECONDS.sleep(10);
System.out.println("count is:"+count.get()+",persons:"+persons.size());
private void addName(String name, int age)
persons.put(name, age);
count.addAndGet(1);
输出:
count is:10000,persons:9996
可以看到addName方法执行了10000次但是真正添加成功的有9996次,这就是由于多线程并发put时会因为size++问题导致覆盖问题(jdk8,jdk7时当并发执行扩容操作时会造成环形链和数据丢失的情况)使用concurrenthashmap时就不会出现此线程安全问题。
1.ConcurrentHashMap 并发版 HashMap
最常见的并发容器之一,可以用作并发场景下的缓存。底层依然是哈希表,但在 JAVA 8 中有了不小的改变,而 JAVA 7 和 JAVA 8 都是用的比较多的版本,因此经常会将这两个版本的实现方式做一些比较(比如面试中)。
一个比较大的差异就是,JAVA 7 中采用分段锁来减少锁的竞争,JAVA 8 中放弃了分段锁,采用 CAS(一种乐观锁),同时为了防止哈希冲突严重时退化成链表(冲突时会在该位置生成一个链表,哈希值相同的对象就链在一起),会在链表长度达到阈值(8)后转换成红黑树(比起链表,树的查询效率更稳定)。
除了key和value不能为null外,其余方法和hashMap几乎一样
常用方法
@Test
public void test_function() throws Exception
ConcurrentHashMap<String, String> data = new ConcurrentHashMap<>();
data.put("Steven","18");
System.out.println(data.get("Steven"));
2.CopyOnWriteArrayList 并发版 ArrayList
并发版 ArrayList,底层结构也是数组,和 ArrayList 不同之处在于:当新增和删除元素时会创建一个新的数组,在新的数组中增加或者排除指定对象,最后用新增数组替换原来的数组。
适用场景:由于读操作不加锁,写(增、删、改)操作加锁,因此适用于读多写少的场景。
局限:由于读的时候不会加锁(读的效率高,就和普通 ArrayList 一样),读取的当前副本,因此可能读取到脏数据。
核心方法可以看出add元素时加锁同时复制了一个数组:
public boolean add(E e)
final ReentrantLock lock = this.lock;
lock.lock();
try
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
finally
lock.unlock();
常用方法:
@Test
public void test_function() throws Exception
CopyOnWriteArrayList<String> data = new CopyOnWriteArrayList<>();
data.add("Steven");
System.out.println(data.get(0));
3.CopyOnWriteArraySet 并发 Set
基于 CopyOnWriteArrayList 实现(内含一个 CopyOnWriteArrayList 成员变量),也就是说底层是一个数组,意味着每次 add 都要遍历整个集合才能知道是否存在,不存在时需要插入(加锁)。
适用场景:在 CopyOnWriteArrayList 适用场景读多写少且集合元素不是太多的场景。
核心方法可以看出内部维护一个CopyOnWriteArrayList添加时判断是否存在,不存在时调用CopyOnWriteArrayList的add方法
public class CopyOnWriteArraySet<E> extends AbstractSet<E>
private final CopyOnWriteArrayList<E> al;
/**
* Creates an empty set.
*/
public CopyOnWriteArraySet()
al = new CopyOnWriteArrayList<E>();
public boolean addIfAbsent(E e)
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
常用方法:
@Test
public void test_function() throws Exception
CopyOnWriteArraySet<String> data = new CopyOnWriteArraySet<>();
data.add("Steven");
System.out.println(data.stream().findFirst().get());
4.ConcurrentLinkedQueue 并发队列 (基于链表)
一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素。当多个线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许使用 null 元素。因为数据结构是链表,所以理论上是没有队列大小限制的,也就是说添加数据一定能成功。队列用的相对少一点,所以把方法都列举一下:
- boolean add(E e) 将指定元素插入此队列的尾部和offer方法完全相同
- boolean contains(Object o) 如果此队列包含指定元素,则返回 true。
- boolean isEmpty() 如果此队列不包含任何元素,则返回 true。
- Iterator<E> iterator() 返回在此队列元素上以恰当顺序进行迭代的迭代器。
- boolean offer(E e) 将指定元素插入此队列的尾部。
- E peek() 获取但不移除此队列的头;如果此队列为空,则返回 null。
- E poll() 获取并移除此队列的头,如果此队列为空,则返回 null。
- boolean remove(Object o) 从队列中移除指定元素的单个实例(如果存在)。
- int size() 返回此队列中的元素数量。
- Object[] toArray() 返回以恰当顺序包含此队列所有元素的数组。
- <T> T[]
-
toArray(T[] a)返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
@Test
public void test_function() throws Exception
ConcurrentLinkedQueue<String> data = new ConcurrentLinkedQueue<>();data.add("Steven"); data.offer("Steven2"); System.out.println(data.peek() + ",size:" + data.size()); System.out.println(data.poll() + ",size:" + data.size());
输出:
Steven,size:2
Steven,size:1
5.ConcurrentLinkedDeque 并发队列 (基于双向链表)
非阻塞队列,基于双向链表实现的并发队列,可以分别对头尾进行操作,因此除了先进先出 (FIFO),也可以先进后出(FILO),当然先进后出的话应该叫它栈了。现对于单向列表方法的添加,取出都增加了相应的XXFirst()和XXLast()方法:
@Test
public void test_function() throws Exception
ConcurrentLinkedDeque<String> data = new ConcurrentLinkedDeque<>();
data.addLast("Steven");
data.offerFirst("Steven2");
System.out.println(data.getLast() + ",size:" + data.size());
System.out.println(data.pollLast() + ",size:" + data.size());
输出:
Steven,size:2
Steven,size:1
6.ConcurrentSkipListMap 基于跳表的并发 Map
SkipList 即跳表,跳表是一种空间换时间的数据结构,通过冗余数据,将链表一层一层索引,达到类似二分查找的效果,ConcurrentSkipListMap在JDK并发工具类使用范围不是很广,它是针对某一特殊需求而设计的——支持排序,同时支持搜索目标返回最接近匹配项的导航方法。一般情况下开发者很少会使用到该类,但是如果你有如上的特殊需求,那么ConcurrentSkipListMap将是一个很好地解决方案。
原理比较复杂以后再分析。
7.ConcurrentSkipListSet 基于跳表的并发 Set
类似 HashSet 和 HashMap 的关系,ConcurrentSkipListSet 里面就是一个 ConcurrentSkipListMap,就不细说了。
8.ArrayBlockingQueue 阻塞队列 (基于数组)
基于数组实现的可阻塞队列,构造时必须指定数组大小,往里面放东西时如果数组满了便会阻塞直到有位置(也支持直接返回和超时等待),通过一个锁 ReentrantLock 保证线程安全。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public E take() throws InterruptedException
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
while (count == 0)
notEmpty.await();
return dequeue();
finally
lock.unlock();
public void put(E e) throws InterruptedException
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
while (count == items.length)
notFull.await();
enqueue(e);
finally
lock.unlock();
通过put和take存取数据,读和写都是同一个锁,那要是空的时候正好一个读线程来了不会一直阻塞吗?答案就在 notEmpty、notFull 里,这两个出自 lock 的小东西让锁有了类似 synchronized + wait + notify 的功能。
/** Condition for waiting puts */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:
抛出异常 | 特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(e) | offer(e)(false) | put(e) | offer(e, time, unit) |
移除 | remove() | poll()(null) | take() | poll(time, unit) |
检查 | element() | peek()(null) | \\ | \\ |
9.LinkedBlockingQueue 阻塞队列 (基于链表)
基于链表实现的阻塞队列,相对于不阻塞的 ConcurrentLinkedQueue,它多了一个容量限制,如果不设置默认为 int 最大值。LinkedBlockingQueue保存元素的是一个链表。其内部有一个Node的内部类,其中有一个成员变量 Node next。就这样形成了一个链表的结构,要获取下一个元素,只要调用next就可以了。而ArrayBlockingQueue则是一个数组。
LinkedBlockingQueue内部读写(插入获取)各有一个锁,而ArrayBlockingQueue则读写共享一个锁,常用方法和ArrayBlockingQueue完全一样
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
10.LinkedBlockingDeque 阻塞队列 (基于双向链表)
类似 LinkedBlockingQueue,但提供了双向链表特有的操作。
11.PriorityBlockingQueue 线程安全的优先队列
构造时可以传入一个比较器,可以看做放进去的元素会被排序,然后读取的时候按顺序消费。某些低优先级的元素可能长期无法被消费,因为不断有更高优先级的元素进来。priorityBlockingQueue是一个无界队列,它没有限制,在内存允许的情况下可以无限添加元素;它又是具有优先级的队列,是通过构造函数传入的对象来判断,传入的对象必须实现comparable接口。
@Test
public void test_function() throws Exception
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
queue.add(3);
queue.add(2);
queue.add(1);
System.out.println(queue);
System.out.println(queue.poll());
System.out.println(queue);
输出
[1, 3, 2]
1
[2, 3]
对结果分析,每次添加一个元素,PriorityBlockingQueue中的元素都会执行compareTo方法进行排序,但是只是把第一个元素排在首位,其他元素按照队列的一系列复杂算法排序。这就保障了每次获取到的元素都是经过排序的第一个元素。
12.SynchronousQueue 数据同步交换的队列
一个虚假的队列,因为它实际上没有真正用于存储元素的空间,每个插入操作都必须有对应的取出操作,没取出时无法继续放入。
@Test
public void test_function() throws Exception
SynchronousQueue<String> queue = new SynchronousQueue<>();
new Thread(() ->
try
int i = 0;
while (true)
String name = "steven" + i++;
System.out.println("增加:" + name);
queue.put(name);
catch (InterruptedException e)
e.printStackTrace();
).start();
new Thread(() ->
while (true)
try
System.out.println("取出:" + queue.take());
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
).start();
TimeUnit.MINUTES.sleep(10);
输出:
增加:steven0
取出:steven0
增加:steven1
取出:steven1
增加:steven2
取出:steven2
增加:steven3
取出:steven3
也就是说SynchronousQueue的队列大小是1
13.LinkedTransferQueue 基于链表的数据交换队列
实现了接口 TransferQueue,通过 transfer 方法放入元素时,如果发现有线程在阻塞在取元素,会直接把这个元素给等待线程。如果没有人等着消费,那么会把这个元素放到队列尾部,并且此方法阻塞直到读取这个元素。和 SynchronousQueue 有点像,但比它更强大。调用add添加时不会再等待取出。
14.DelayQueue 延时队列
可以使放入队列的元素在指定的延时后才被消费者取出,元素需要实现 Delayed 接口。延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。延时队列不能存放空元素。
DelayedQuene的元素存储交由优先级队列存放。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();//元素存放
DelayedQuene的优先级队列使用的排序方式是队列元素的compareTo方法,优先级队列存放顺序是从小到大的,所以队列元素的compareTo方法影响了队列的出队顺序。
若compareTo方法定义不当,会造成延时高的元素在队头,延时低的元素无法出队。
public E poll()
final ReentrantLock lock = this.lock;
lock.lock();
try
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
finally
lock.unlock();
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException @inheritDoc
*/
public E take() throws InterruptedException
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try
for (;;)
E first = q.peek();
if (first == null)//没有元素,让出线程,等待java.lang.Thread.State#WAITING
available.await();
else
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)// 已到期,元素出队
return q.poll();
first = null; // dont retain ref while waiting
if (leader != null)
available.await();// 其它线程在leader线程TIMED_WAITING期间,会进入等待状态,这样可以只有一个线程去等待到时唤醒,避免大量唤醒操作
else
Thread thisThread = Thread.currentThread();
leader = thisThread;
try
available.awaitNanos(delay);// 等待剩余时间后,再尝试获取元素,他在等待期间,由于leader是当前线程,所以其它线程会等待
finally
if (leader == thisThread)
leader = null;
finally
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
常用场景:缓存系统的设计,缓存中的对象,超过了空闲时间,需要从缓存中移出;任务调度系统,能够准确的把握任务的执行时间。可能需要通过线程处理很多时间上要求很严格的数据,如果使用普通的线程,我们就需要遍历所有的对象,一个一个的检 查看数据是否过期等,首先这样在执行上的效率不会太高,其次就是这种设计的风格也大大的影响了数据的精度。一个需要12:00点执行的任务可能12:01 才执行,这样对数据要求很高的系统有更大的弊端。此时可以使用DelayQueue。
总结
这些并发容器能很好的解决日常大部分需求,可以学习原理,但不要重复造轮子况且轮子可能还不如这些。
以上是关于#yyds干货盘点# 来,听我讲讲常用并发容器的主要内容,如果未能解决你的问题,请参考以下文章
#yyds干货盘点# Spring核心原理之IoC容器初体验
#yyds干货盘点# Docker环境搭建以及常用技巧可视化