提升--13---ConcurrentHashMapConcurrentSkipListMap CopyOnWrite各种Queue

Posted 高高for 循环

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了提升--13---ConcurrentHashMapConcurrentSkipListMap CopyOnWrite各种Queue相关的知识,希望对你有一定的参考价值。


1. ConcurrentHashMap

主要就是为了应对hashmap在并发环境下不安全而诞生的,ConcurrentHashMap的设计与实现非常精巧,大量的利用了volatile,final,CAS等lock-free技术来减少锁竞争对于性能的影响。

  • 我们都知道Map一般都是数组+链表结构(JDK1.8该为数组+红黑树)。


ConcurrentHashMap避免了对全局加锁改成了局部加锁操作,这样就极大地提高了并发环境下的操作速度,由于ConcurrentHashMap在JDK1.7和1.8中的实现非常不同,接下来我们谈谈JDK在1.7和1.8中的区别。

JDK1.7版本的CurrentHashMap的实现原理

  • 在JDK1.7中ConcurrentHashMap采用了数组+Segment+分段锁的方式实现。

Segment(分段锁)-减少锁的粒度

ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表,同时又是一个ReentrantLock(Segment继承了ReentrantLock)。

内部结构

ConcurrentHashMap使用分段锁技术,将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问,能够实现真正的并发访问。如下图是ConcurrentHashMap的内部结构图:

从上面的结构我们可以了解到,ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作。

  • 第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部。

该结构的优劣势

  • 坏处
    是这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长。

  • 好处
    是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的Segment上)。

所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高。

JDK1.8版本的CurrentHashMap

JDK8中ConcurrentHashMap参考了JDK8 HashMap的实现,采用了数组+链表+红黑树的实现方式来设计,采用CAS+Synchronized保证线程安全。内部大量采用CAS操作,这里我简要介绍下CAS。

CAS

volatile

  • volatile是轻量级同步机制。在访问volatile变量时不会执行加锁操作,因此也就不会使执行线程阻塞,是一种比synchronized关键字更轻量级的同步机制。

JDK8中彻底放弃了Segment转而采用的是Node,其设计思想也不再是JDK1.7中的分段锁思想

  • Node:保存key,value及key的hash值的数据结构。其中value和next都用volatile修饰,保证并发的可见性。

Java8 ConcurrentHashMap结构基本上和Java8的HashMap一样,不过保证线程安全性。

小结

Map<String, String> map = new ConcurrentHashMap<>();

其实可以看出JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树

  1. 数据结构:取消了Segment分段锁的数据结构,取而代之的是数组+链表+红黑树的结构。
  2. 保证线程安全机制:JDK1.7采用segment的分段锁机制实现线程安全,其中segment继承自ReentrantLock。JDK1.8采用CAS+Synchronized保证线程安全。
  3. 锁的粒度:原来是对需要进行数据操作的Segment加锁,现调整为对每个数组元素加锁(Node)。
  4. 链表转化为红黑树:定位结点的hash算法简化会带来弊端,Hash冲突加剧,因此在链表节点数量大于8时,会将链表转化为红黑树进行存储。
  5. 查询时间复杂度:从原来的遍历链表O(n),变成遍历红黑树O(logN)。

2. ConcurrentSkipListMap

ConcurrentSkipListMap通过跳表来实现的高并发容器

并且这个Map是有排序的

跳表

  1. 跳表是什么样的结构呢?底层本身存储的元素一个链表,它是排好顺序的,
  2. 大家知道当一个链表排好顺序的时候往里插入是特别困难的,查找的时候也特别麻烦,因为你得从头去遍历查找这个元素到底在哪里,所以就出现了这个跳表的结构,底层是一个链表,链表查找的时候比较困难怎么办,那么我们在这些链表的基础上在拿出一些关键元素来,在上面做一层,那这个关键元素的这一层也是一个链表,那这个数量特别大的话在这个基础之上在拿一层出来再做一个链表,每层链表的数据越来越少,而且它是分层,
  3. 在我们查找的时候从顶层往下开始查找,所以呢,查找容易了很多,同时它无锁的实现难度比TreeMap又容易很多,因此在JUC里面提供了ConcurrentSkipListMap这个类。

Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发并且排序

3. CopyOnWrite-----写时复制

再来说一个在并发的时候经常使用的一个类,这个类叫CopyOnWrite的意思叫写时复制。

  • CopyOnWriteList、
  • CopyOnWriteSet

原理

这个CopyOnWrite解释一下,你通过这个名字进行分析一下,当Write的时候我们要进行复制,写时复制,写的时候进行复制。这个原理非常简单,当我们需要往里面加元素的时候你把里面的元素得复制出来。在很多情况下,写的时候特别少,读的时候很多。在这个时候就可以考虑CopyOnWrite这种方式来提高效率,

  • CopyOnWrite为什么会提高效率呢,是因为我读的时候不加锁,大家知道我Vector写的时候加锁,读的时候也加锁。那么用CopyOnWriteList的时候我读的时候不加锁,
  • 写的时候会在原来的基础上拷贝一个,拷贝的时候扩展出一个新元素来,然后把你新添加的这个扔到这个元素扔到最后这个位置上,于此同时把指向老的容器的一个引用指向新的,这个写法就是写时复制。
  • 读比较多写比较少的情况下使用CopyOnWrite

源码

  • 读 get

  • 写 set

案例:

package c_023_02_FromHashtableToCHM;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;

public class T02_CopyOnWriteList {

    public static void main(String[] args) {
        //new ArrayList<>(); //这个会出并发问题!
        //new Vector();
        List<String> lists = new CopyOnWriteArrayList<>();

        Random r = new Random();
        Thread[] ths = new Thread[100];

        for(int i=0; i<ths.length; i++) {
            Runnable task = new Runnable() {

                @Override
                public void run() {
                    for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000));
                }

            };
            ths[i] = new Thread(task);
        }

        runAndComputeTime(ths);
        System.out.println(lists.size());
    }

    static void runAndComputeTime(Thread[] ths) {
        long s1 = System.currentTimeMillis();
        Arrays.asList(ths).forEach(t->t.start());
        Arrays.asList(ths).forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long s2 = System.currentTimeMillis();
        System.out.println(s2 - s1);

    }
}


CopyOnWrite读的时候不加锁,

写的时候会在原来的基础上拷贝一个,

适合在读比较多写比较少的情况下使用

4. Queue

友好的API

而且这几个对于BlockingQueue来说也确实是线程安全的一个操作

  • offer
  • poll
  • peek

offer

  • 我们读一下这个offer的概念,offer是往里头添加,加进去没加进去它会给你一个布尔类型的返回值,和原来的add是什么区别呢,add如果加不进去了是会抛异常的。所以一般的情况下我们用的最多的Queue里面都用offer,它会给你一个返回值

poll

  • poll是取并且remove掉

peek

  • peek的概念是去取并不是让你remove掉

面试题:Queue和List的区别到底在哪里

  • 回到那个面试经常被问到的问题,Queue和List的区别到底在哪里,主要就在这里,添加了offer、peek、poll、put、take这些个对线程友好的或者阻塞,或者等待方法。

1. ConcurrentLinkedQueue

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class T04_ConcurrentQueue {
   public static void main(String[] args) {
      Queue<String> strs = new ConcurrentLinkedQueue<>();
      
      for(int i=0; i<10; i++) {
         strs.offer("a" + i);  //add
      }
      
      System.out.println(strs);
      
      System.out.println(strs.size());
      
      System.out.println(strs.poll());
      System.out.println(strs.size());
      
      System.out.println(strs.peek());
      System.out.println(strs.size());
      

   }
}

BlockingQueue----阻塞队列

  • BlockingQueue的概念重点是在Blocking上,Blocking阻塞,Queue队列,是阻塞队列。他提供了一系列的方法,我们可以在这些方法的基础之上做到让线程实现自动的阻塞

BlockingQueue在Queue的基础上又添加了两个方法

这两个方法一个叫put,一个叫take。这两个方法是真真正正的实现了阻塞

  • put
    put往里装如果满了的话我这个线程会阻塞住
  • take
    take往外取如果空了的话线程会阻塞住

2. LinkedBlockingQueue-----无界队列

  • LinkedBlockingQueue,用链表实现的BlockingQueue,是一个无界队列。就是它可以一直装到你内存满了为止,一直添加。

生产者消费者

package c_023_02_FromHashtableToCHM;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class T05_LinkedBlockingQueue {

    static BlockingQueue<String> strs = new LinkedBlockingQueue<>();

    static Random r = new Random();

    public static void main(String[] args) {
        new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                try {
                    strs.put("a" + i); //如果满了,就会等待
                    TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "p1").start();

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                for (;;) {
                    try {
                        System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "c" + i).start();

        }
    }
}

  • 来看一下这个小程序,这么一些线程,第一个线程是我往里头加内容,加put。put往里装如果满了的话我这个线程会阻塞住,take往外取如果空了的话线程会阻塞住
  • 所以这个BlockingQueue就实现了生产者消费者里面的那个容器。这个小程序是往里面装了100个字符串,a开头i结尾,每装一个的时候睡1秒钟。
  • 然后,后面又启动了5个线程不断的从里面take,空了我就等着,什么时候新加了我就马上给它取出来。这是BlockingQueue和Queue的一个基本的概念。

3. ArrayBlockingQueue----有界队列

  • ArrayBlockingQueue是有界的,你可以指定它一个固定的值10,它容器就是10,那么当你往里面扔容器的时候,一旦他满了这个put方法就会阻塞住。然后你可以看看用add方法满了之后他会报异常。offer用返回值来判断到底加没加成功,
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class T06_ArrayBlockingQueue {

   static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);

   static Random r = new Random();

   public static void main(String[] args) throws InterruptedException {
      for (int i = 0; i < 10; i++) {
         strs.put("a" + i);
      }
      
      //strs.put("aaa"); //满了就会等待,程序阻塞
      //strs.add("aaa");
      //strs.offer("aaa");
      strs.offer("aaa", 1, TimeUnit.SECONDS);
      
      System.out.println(strs);
   }
}

offer还有另外一个写法你可以指定一个时间尝试着往里面加1秒钟,1秒钟之后如果加不进去它就返回了。

strs.offer(“aaa”, 1, TimeUnit.SECONDS);

特殊的Queue

  • 我们来看几个比较特殊的Queue,这几个Queue是BlockingQueue,全是阻塞的,记住这点。这几种Queue都有特殊的用途

4. DelayQueue

DelayQueue可以实现在时间上的排序

DelayQueue可以实现在时间上的排序,这个DelayQueue能实现按照在里面等待的时间来进行排序。这里我们new了一个DelayQueue,他是BlockingQueue的一种也是用于阻塞的队列,

  • 这个阻塞队列装任务的时候要求你必须实现Delayed接口,Delayed往后拖延推迟,Delayed需要做一个比较compareTo,最后这个队列的实现,这个时间等待越短的就会有优先的得到运行,所以你需要做一个比较 ,这里面他就有一个排序了,这个排序是按时间来排的,所以去做好,哪个时间返回什么样的值,不同的内容比较的时候可以按照时间来排序。
  • 总而言之,你要实现Comparable接口重写 compareTo方法来确定你这个任务之间是怎么排序的。getDelay去拿到你Delay多长时间了。往里头装任务的时候首先拿到当前时间,在当前时间的基础之上指定在多长时间之后这个任务要运行,添加顺序参看代码,但是当我们去拿的时候按时间进行排序(按紧迫程度进行排序)。

DelayQueue就是按照时间进行是任务调度。

package c_023_02_FromHashtableToCHM;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class T07_DelayQueue {

    static BlockingQueue<MyTask> tasks = new DelayQueue<>();

    static Random r = new Random();

    static class MyTask implements Delayed {
        String name;
        long runningTime;

        MyTask(String name, long rt) {
            this.name = name;
            this.runningTime = rt;
        }

        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                return -1;
            else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
                return 1;
            else
                return 0;
        }

        @Override
        public long getDelay(TimeUnit unit) {

            return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }


        @Override
        public String toString() {
            return name + " " + runningTime;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        long now = System.currentTimeMillis();
        MyTask t1 = new MyTask("t1", now + 1000);
        MyTask t2 = new MyTask("t2", now + 2000);
        MyTask t3 = new MyTask("t3", now + 1500);
        MyTask t4 = new MyTask("t4", now + 2500);
        MyTask t5 = new MyTask("t5", now + 500);

        tasks.put(t1);
        tasks.put(t2);
        tasks.put(t3);
        tasks.put(t4);
        tasks.put(t5);

        System.out.println(tasks);

        for (int i = 0; i < 5; i++) {
            System.out.println(tasks.take());
        }
    }
}

DelayQueue本质上用的是一个PriorityQueue

5. PriorityQueue----优先队列

  • PriorityQueue是从AbstractQueue继承的。PriorityQueue特点是它内部你往里装的时候并不是按顺序往里装的,而是内部进行了一个排序。按照优先级,最小的优先。它内部实现的结构是一个二叉树,这个二叉树可以认为是堆排序里面的那个最小堆值排在最上面

线性表–08—优先队列


import java.util.PriorityQueue;

public class T07_01_PriorityQueque {
    public static void main(String[] args) {
        PriorityQueue<String> q = new PriorityQueue<>();

        q<

以上是关于提升--13---ConcurrentHashMapConcurrentSkipListMap CopyOnWrite各种Queue的主要内容,如果未能解决你的问题,请参考以下文章

一个例子,变量提升和函数提升就是这么简单!

如何理解函数提升

函数提升优于变量提升?

JavaScript系列文章:变量提升和函数提升

变量提升与函数提升

Solr 索引时间提升 VS 查询时间提升?