Java的并发神器concurrent包详解

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java的并发神器concurrent包详解相关的知识,希望对你有一定的参考价值。

 

在JDK 1.5之前,提到并发,java程序员们一般想到的是wait()、notify()、Synchronized关键字等,但是并发除了要考虑竞态资源、死锁、资源公平性等问题,往往还需要考虑性能问题,在一些业务场景往往还会比较复杂,这些都给java coder们造成不小的难题。JDK 1.5的concurrent包帮我们解决了不少问题。

 

Concurrent包中包含了几个比较常用的并发模块,这个系列,LZ就和大家一起来学习各个模块,Let’s Go!

 

一、线程池的基本用法

一般并发包里有三个常用的线程池实例化方法,在Executors这个工厂类中。

  • ·newFixedThreadPool(int size):

创建一个可重用固定线程数的线程池,以共享的无界队列(LinkedBlockingQueue)方式来运行这些线程。任何时刻最多只有size大小的线程在执行各自任务。当线程池中所有线程都在运行,后续新来的线程将在无界队列中等待,直到有空闲线程为止。如果有线程在执行任务期间终止退出,一个新的线程会替代原先的线程继续后面任务的执行。线程池中的线程会一直存在,除非显示的关闭某个线程。

 

ExecuterService fixedPool = Executors. newFixedThreadPool(5); //创建一个固定包含5个线程的线程池

 

  • ·newCachedThreadPool()

创建一个可以根据需要创建线程的线程池,当一个任务过来,如果线程池中有空闲可用线程,先用空闲线程,没有就创建一个新线程。这种线程池对于执行short-lived asynchronous tasks(短期异步任务)通常会提高性能。默认空闲60s的线程会从线程池中移除,如果长时间空闲的话,这个线程池是不会占用任何资源的。但是以我个人的经验,这种线程池要慎用,用不好经常会造成堆外内存溢出,从而造成应用反应慢,最后导致宕机。

 

ExecuterService fixedPool = Executors. newCachedThreadPool();

 

  • ·newSingleThreadExecutor()

创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。可以保证顺序执行各个任务。有人说这不就是newFixedThreadPool(1)么,错矣。看源码就知道外部还包了一层FinalizableDelegatedExecutorService,这个是Executors的一个静态内部类,这个内部类继承了DelegatedExecutorService,返回的这个ExecutorService实现只包含ExecutorService接口定义的一些方法,不同于newFixedThreadPool有自己扩展的方法。源码注释的区别是:newSingleThreadExecutor可以保证使用其他线程时,返回的Executor不用重新配置。

ExecuterService fixedPool = Executors. newSingleThreadExecutor();

 

 

  • ·newScheduledThreadPool(int size)

创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。这是个无界大小的线程池。其中主要的执行方法是scheduleAtFixedRate和scheduleWithFixedDelay方法,最终的核心方法是delayedExecute,这两个方法有什么区别呢,简单来说,scheduleAtFixedRate是以任务执行时间开始为延迟时间起点的,scheduleWithFixedDelay是以任务执行结束时间为延时时间起点的。

 

 

接下来对并发包里面常用的一些类进行使用解析

 

1)  信号量 Semaphore:

一个计数信号量,维护了一个许可集。通俗的讲,就是对有限共享资源记录使用情况。

 

举个日常生活中的例子好了:有20个人去登记信息表格,但是只有3只笔,一个人用完之后给下面一个人用,直到所有人都用完。

 

没有并发包之前,我们是怎么设计的呢?

 

定义一个变量a,用synchronized修饰,值为3,新建20个线程模拟20个人访问,每个人用笔之前,先看是否有空余的笔,如果有,就使用,并将变量a减少1,使用完成之后,再放回去,即将变量a加1,各个线程之间对于变量a的访问需要互斥进行。Synchronized是一个重量级的同步操作,涉及到用户态与核心态的切换,所以性能一般。

 

让我们再看看信号量是怎么实现的,Semaphore实现的思想跟上面差不多。只不过Semaphore已经帮我们做了同步处理,有两个关键的方法:

  • ·acquire() 获取一个信号量许可
  • ·release() 释放一个信号量许可

此外还有其他一些方法:availablePermits() 返回信号量可用的许可数,talk is cheap show me the code:

 

public class SemaphoreTest extends Thread{

  

   private String name;

   private Semaphore sh;

  

   public SemaphoreTest(String name,Semaphore sh){

      this.name = name;

      this.sh = sh;

   }

 

   public void run(){

      if(sh.availablePermits()>0){

         System.out.println("有笔");

      }else{

         System.out.println("笔没了,等等");

      }

      try {

         sh.acquire();//信号量减1

         System.out.println(this.name+"号在用笔");

         Thread.sleep((long) (Math.random()*1000));

         sh.release();

         System.out.println(this.name+"号用完了");

      } catch (Exception e) {

         // TODO Auto-generated catch block

         e.printStackTrace();

      }

   }

  

   public static void main(String[] args) {

      ExecutorService es = Executors.newCachedThreadPool();

      Semaphore sh = new Semaphore(3);

      for(int i=0;i<20;i++){

         es.submit(new SemaphoreTest(i+"",sh));

      }

      es.shutdown();

   }

  

}

 

上述代码直接运行即可。

 

 

这个信号量是如何同步资源的,就需要读读jdk相关源码了。核心类是AbstractQueuedSynchronizer,该类实现了一个FIFO的列表,列表中Node表示列表节点,该Node有prev、next、Thread等属性,表示前节点后节点以及线程等。并发包中每个需要实现个性化同步机制的都要扩展该类,以实现不同的同步功能。比如Semaphore、countDownLatch、CyclicBarrier等,具体原理要分析源码,篇幅较长,这里不做展开,有兴趣的可以研究一下,对volatile关键字、CAS等都会有更深的理解。

 

 

2)  CountDownLatch

用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。

比较重要的2个方法:countDown()和await()

 

下面来个常见的例子:导游带了20人团到了饭店吃饭,要等到20个人到齐了,才开始吃饭,还要等到20人都吃完了,才可以继续下个景点。

来看看不用并发包你会怎么实现,首先会设置20个人到来以及吃完的标志,每个线程过来更改自己的标志,主程序for或者while不停的循环监听,制止全部到来以及吃完才继续。如果是100人1000人呢,未免臃肿。

 

CountDownLatch的实现:

public class CountDownLautchTest {

  

   public static void main(String[] args) {

     

      ExecutorService es = Executors.newFixedThreadPool(20);

      final CountDownLatch endFlag = new CountDownLatch(20);

      final CountDownLatch startFlag = new CountDownLatch(1);

      final CountDownLatch comeFlag = new CountDownLatch(20);

 

 

      for(int i=0;i<20;i++){

         final int j = i + 1;

         Runnable person = new Runnable(){

            @Override

            public void run() {

                System.out.println(j+"号游客来了");

                comeFlag.countDown();

                try {

                   startFlag.await();

                   Thread.sleep(1000);

                } catch (InterruptedException e) {

                   // TODO Auto-generated catch block

                   e.printStackTrace();

                } finally {

                   System.out.println(j+"号吃完了");

                   endFlag.countDown();

                }

            }

           

         };

         es.submit(person);

      }

      try {

         comeFlag.await();   //保证所有人都到齐了

         System.out.println("人都齐了,大家一起吃饭");

         startFlag.countDown();//开吃

         endFlag.await();//等待所有人都吃完了

         System.out.println("全都吃完了,继续下个景点");

      } catch (InterruptedException e) {

         // TODO Auto-generated catch block

         e.printStackTrace();

      }

      es.shutdown();

   }

 

}

 

CountDownLatch是一种不可逆的降序计数操作,所以上述代码里面定义了2个分别表示等候以及吃饭的标志。如果需要重复计数,就需要用到下面这个类:CyclicBarrier。

 

3) CyclicBarrier

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

 

还是上述的例子吧,再加一点场景,吃完饭还要游玩,游玩结束返程回家。让我们来看看CyclicBarrier是如何实现的,上代码:

 

public class CyclicBarrierTest {

  

   public static void main(String[] args) {

      final CyclicBarrier barrier = new CyclicBarrier(20);

      final ExecutorService es = Executors.newFixedThreadPool(20);

      System.out.println("****等人齐****");

      for(int i=0;i<20;i++){

         final int j = i+1;

         Runnable person = new Runnable(){

            @Override

            public void run() {

                try {

                   System.out.println(j+"号来了");

                   if(barrier.await()==0){

                      System.out.println("****人都到齐了****");

                   }

                   System.out.println(j+"号开吃");

                   if(barrier.await()==0){

                      System.out.println("****吃完了出发****");

                   }

                   System.out.println(j+"号玩好了");

                   if(barrier.await()==0){

                      System.out.println("****游玩结束回家****");

                   }

                } catch (Exception e) {

                   // TODO Auto-generated catch block

                   e.printStackTrace();

                }

            }

           

         };

         es.submit(person);

      }

      es.shutdown();

   }

  

}

 

代码直接运行即可,应该比较好懂吧,这里有个小窍门,代码中if(barrier.await()==0)这个条件,有点类似于CountDownLatch的await往下执行的条件了,每次判断都会减1,减到0,即完成所有线程的等待,继续下面的操作。

 

结语

         关于并发包一些常用的类就介绍到这里吧,工作中具体怎么使用要具体分析各个业务场景,选择合适的方法,随机应变,触类旁通吧,后续会介绍一下Lock源码以及实现,涉及AQS的原理等。

 

未完,待续……

以上是关于Java的并发神器concurrent包详解的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程系列 concurrent包概览

java并发包java.util.concurrent详解

Java 并发之Concurrent 包综述

java 并发(concurrent)包源码分析

利用java concurrent 包实现日志写数据库的并发处理

深入理解java:2.3.1. 并发编程concurrent包 之Atomic原子操作