提升--13---ConcurrentHashMapConcurrentSkipListMap CopyOnWrite各种Queue
Posted 高高for 循环
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了提升--13---ConcurrentHashMapConcurrentSkipListMap CopyOnWrite各种Queue相关的知识,希望对你有一定的参考价值。
文章目录
1. ConcurrentHashMap
主要就是为了应对hashmap在并发环境下不安全而诞生的,ConcurrentHashMap的设计与实现非常精巧,大量的利用了volatile,final,CAS等lock-free技术来减少锁竞争对于性能的影响。
- 我们都知道Map一般都是数组+链表结构(JDK1.8该为数组+红黑树)。
ConcurrentHashMap避免了对全局加锁改成了局部加锁操作,这样就极大地提高了并发环境下的操作速度,由于ConcurrentHashMap在JDK1.7和1.8中的实现非常不同,接下来我们谈谈JDK在1.7和1.8中的区别。
JDK1.7版本的CurrentHashMap的实现原理
- 在JDK1.7中ConcurrentHashMap采用了数组+Segment+分段锁的方式实现。
Segment(分段锁)-减少锁的粒度
ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表,同时又是一个ReentrantLock(Segment继承了ReentrantLock)。
内部结构
ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。如下图是ConcurrentHashMap的内部结构图:
从上面的结构我们可以了解到,ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作。
- 第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部。
该结构的优劣势
-
坏处
是这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长。 -
好处
是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的Segment上)。
所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高。
JDK1.8版本的CurrentHashMap
JDK8中ConcurrentHashMap参考了JDK8 HashMap的实现,采用了数组+链表+红黑树的实现方式来设计,采用CAS+Synchronized保证线程安全。内部大量采用CAS操作,这里我简要介绍下CAS。
CAS
volatile
- volatile是轻量级同步机制。在访问volatile变量时不会执行加锁操作,因此也就不会使执行线程阻塞,是一种比synchronized关键字更轻量级的同步机制。
JDK8中彻底放弃了Segment转而采用的是Node,其设计思想也不再是JDK1.7中的分段锁思想。
- Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。
Java8 ConcurrentHashMap结构基本上和Java8的HashMap一样,不过保证线程安全性。
小结
Map<String, String> map = new ConcurrentHashMap<>();
其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树。
- 数据结构:取消了Segment分段锁的数据结构,取而代之的是数组+链表+红黑树的结构。
- 保证线程安全机制:JDK1.7采用segment的分段锁机制实现线程安全,其中segment继承自ReentrantLock。JDK1.8采用CAS+Synchronized保证线程安全。
- 锁的粒度:原来是对需要进行数据操作的Segment加锁,现调整为对每个数组元素加锁(Node)。
- 链表转化为红黑树:定位结点的hash算法简化会带来弊端,Hash冲突加剧,因此在链表节点数量大于8时,会将链表转化为红黑树进行存储。
- 查询时间复杂度:从原来的遍历链表O(n),变成遍历红黑树O(logN)。
2. ConcurrentSkipListMap
ConcurrentSkipListMap通过跳表来实现的高并发容器
并且这个Map是有排序的;
跳表
- 跳表是什么样的结构呢?底层本身存储的元素一个链表,它是排好顺序的,
- 大家知道当一个链表排好顺序的时候往里插入是特别困难的,查找的时候也特别麻烦,因为你得从头去遍历查找这个元素到底在哪里,所以就出现了这个跳表的结构,底层是一个链表,链表查找的时候比较困难怎么办,那么我们在这些链表的基础上在拿出一些关键元素来,在上面做一层,那这个关键元素的这一层也是一个链表,那这个数量特别大的话在这个基础之上在拿一层出来再做一个链表,每层链表的数据越来越少,而且它是分层,
- 在我们查找的时候从顶层往下开始查找,所以呢,查找容易了很多,同时它无锁的实现难度比TreeMap又容易很多,因此在JUC里面提供了ConcurrentSkipListMap这个类。
Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发并且排序
3. CopyOnWrite-----写时复制
再来说一个在并发的时候经常使用的一个类,这个类叫CopyOnWrite的意思叫写时复制。
- CopyOnWriteList、
- CopyOnWriteSet
原理
这个CopyOnWrite解释一下,你通过这个名字进行分析一下,当Write的时候我们要进行复制,写时复制,写的时候进行复制。这个原理非常简单,当我们需要往里面加元素的时候你把里面的元素得复制出来。在很多情况下,写的时候特别少,读的时候很多。在这个时候就可以考虑CopyOnWrite这种方式来提高效率,
- CopyOnWrite为什么会提高效率呢,是因为我读的时候不加锁,大家知道我Vector写的时候加锁,读的时候也加锁。那么用CopyOnWriteList的时候我读的时候不加锁,
- 写的时候会在原来的基础上拷贝一个,拷贝的时候扩展出一个新元素来,然后把你新添加的这个扔到这个元素扔到最后这个位置上,于此同时把指向老的容器的一个引用指向新的,这个写法就是写时复制。
- 在读比较多写比较少的情况下使用CopyOnWrite。
源码
案例:
package c_023_02_FromHashtableToCHM;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
public class T02_CopyOnWriteList {
public static void main(String[] args) {
//new ArrayList<>(); //这个会出并发问题!
//new Vector();
List<String> lists = new CopyOnWriteArrayList<>();
Random r = new Random();
Thread[] ths = new Thread[100];
for(int i=0; i<ths.length; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000));
}
};
ths[i] = new Thread(task);
}
runAndComputeTime(ths);
System.out.println(lists.size());
}
static void runAndComputeTime(Thread[] ths) {
long s1 = System.currentTimeMillis();
Arrays.asList(ths).forEach(t->t.start());
Arrays.asList(ths).forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long s2 = System.currentTimeMillis();
System.out.println(s2 - s1);
}
}
CopyOnWrite读的时候不加锁,
写的时候会在原来的基础上拷贝一个,
适合在读比较多写比较少的情况下使用
4. Queue
友好的API
而且这几个对于BlockingQueue来说也确实是线程安全的一个操作
- offer
- poll
- peek
offer
- 我们读一下这个offer的概念,offer是往里头添加,加进去没加进去它会给你一个布尔类型的返回值,和原来的add是什么区别呢,add如果加不进去了是会抛异常的。所以一般的情况下我们用的最多的Queue里面都用offer,它会给你一个返回值
poll
- poll是取并且remove掉
peek
- peek的概念是去取并不是让你remove掉
面试题:Queue和List的区别到底在哪里
- 回到那个面试经常被问到的问题,Queue和List的区别到底在哪里,主要就在这里,添加了offer、peek、poll、put、take这些个对线程友好的或者阻塞,或者等待方法。
1. ConcurrentLinkedQueue
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class T04_ConcurrentQueue {
public static void main(String[] args) {
Queue<String> strs = new ConcurrentLinkedQueue<>();
for(int i=0; i<10; i++) {
strs.offer("a" + i); //add
}
System.out.println(strs);
System.out.println(strs.size());
System.out.println(strs.poll());
System.out.println(strs.size());
System.out.println(strs.peek());
System.out.println(strs.size());
}
}
BlockingQueue----阻塞队列
- BlockingQueue的概念重点是在Blocking上,Blocking阻塞,Queue队列,是阻塞队列。他提供了一系列的方法,我们可以在这些方法的基础之上做到让线程实现自动的阻塞。
BlockingQueue在Queue的基础上又添加了两个方法
这两个方法一个叫put,一个叫take。这两个方法是真真正正的实现了阻塞
- put
put往里装如果满了的话我这个线程会阻塞住 - take
take往外取如果空了的话线程会阻塞住
2. LinkedBlockingQueue-----无界队列
- LinkedBlockingQueue,用链表实现的BlockingQueue,是一个无界队列。就是它可以一直装到你内存满了为止,一直添加。
生产者消费者
package c_023_02_FromHashtableToCHM;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class T05_LinkedBlockingQueue {
static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
static Random r = new Random();
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
strs.put("a" + i); //如果满了,就会等待
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "p1").start();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
for (;;) {
try {
System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "c" + i).start();
}
}
}
- 来看一下这个小程序,这么一些线程,第一个线程是我往里头加内容,加put。put往里装如果满了的话我这个线程会阻塞住,take往外取如果空了的话线程会阻塞住。
- 所以这个BlockingQueue就实现了生产者消费者里面的那个容器。这个小程序是往里面装了100个字符串,a开头i结尾,每装一个的时候睡1秒钟。
- 然后,后面又启动了5个线程不断的从里面take,空了我就等着,什么时候新加了我就马上给它取出来。这是BlockingQueue和Queue的一个基本的概念。
3. ArrayBlockingQueue----有界队列
- ArrayBlockingQueue是有界的,你可以指定它一个固定的值10,它容器就是10,那么当你往里面扔容器的时候,一旦他满了这个put方法就会阻塞住。然后你可以看看用add方法满了之后他会报异常。offer用返回值来判断到底加没加成功,
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class T06_ArrayBlockingQueue {
static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);
static Random r = new Random();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
strs.put("a" + i);
}
//strs.put("aaa"); //满了就会等待,程序阻塞
//strs.add("aaa");
//strs.offer("aaa");
strs.offer("aaa", 1, TimeUnit.SECONDS);
System.out.println(strs);
}
}
offer还有另外一个写法你可以指定一个时间尝试着往里面加1秒钟,1秒钟之后如果加不进去它就返回了。
strs.offer(“aaa”, 1, TimeUnit.SECONDS);
特殊的Queue
- 我们来看几个比较特殊的Queue,这几个Queue是BlockingQueue,全是阻塞的,记住这点。这几种Queue都有特殊的用途
4. DelayQueue
DelayQueue可以实现在时间上的排序
DelayQueue可以实现在时间上的排序,这个DelayQueue能实现按照在里面等待的时间来进行排序。这里我们new了一个DelayQueue,他是BlockingQueue的一种也是用于阻塞的队列,
- 这个阻塞队列装任务的时候要求你必须实现Delayed接口,Delayed往后拖延推迟,Delayed需要做一个比较compareTo,最后这个队列的实现,这个时间等待越短的就会有优先的得到运行,所以你需要做一个比较 ,这里面他就有一个排序了,这个排序是按时间来排的,所以去做好,哪个时间返回什么样的值,不同的内容比较的时候可以按照时间来排序。
- 总而言之,你要实现Comparable接口重写 compareTo方法来确定你这个任务之间是怎么排序的。getDelay去拿到你Delay多长时间了。往里头装任务的时候首先拿到当前时间,在当前时间的基础之上指定在多长时间之后这个任务要运行,添加顺序参看代码,但是当我们去拿的时候按时间进行排序(按紧迫程度进行排序)。
DelayQueue就是按照时间进行是任务调度。
package c_023_02_FromHashtableToCHM;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class T07_DelayQueue {
static BlockingQueue<MyTask> tasks = new DelayQueue<>();
static Random r = new Random();
static class MyTask implements Delayed {
String name;
long runningTime;
MyTask(String name, long rt) {
this.name = name;
this.runningTime = rt;
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return name + " " + runningTime;
}
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
MyTask t1 = new MyTask("t1", now + 1000);
MyTask t2 = new MyTask("t2", now + 2000);
MyTask t3 = new MyTask("t3", now + 1500);
MyTask t4 = new MyTask("t4", now + 2500);
MyTask t5 = new MyTask("t5", now + 500);
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for (int i = 0; i < 5; i++) {
System.out.println(tasks.take());
}
}
}
DelayQueue本质上用的是一个PriorityQueue
5. PriorityQueue----优先队列
- PriorityQueue是从AbstractQueue继承的。PriorityQueue特点是它内部你往里装的时候并不是按顺序往里装的,而是内部进行了一个排序。按照优先级,最小的优先。它内部实现的结构是一个二叉树,这个二叉树可以认为是堆排序里面的那个最小堆值排在最上面。
线性表–08—优先队列
import java.util.PriorityQueue;
public class T07_01_PriorityQueque {
public static void main(String[] args) {
PriorityQueue<String> q = new PriorityQueue<>();
q<以上是关于提升--13---ConcurrentHashMapConcurrentSkipListMap CopyOnWrite各种Queue的主要内容,如果未能解决你的问题,请参考以下文章