阻塞队列和线程池
Posted aiguona
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了阻塞队列和线程池相关的知识,希望对你有一定的参考价值。
一、阻塞队列
1.介绍
阻塞队列会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。
2.实现
ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。
LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。
DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
二、线程池
1.介绍
使用线程池的好处有:1.创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率 2.线程并发数量过多,抢占系统资源从而导致阻塞 3.线程池可以对线程进行一些简单的管理
ThreadPoolExecutor类是线程池中最核心的一个类, 在ThreadPoolExecutor类中提供了四个构造方法,其中 三个构造器都是调用的第四个构造器进行的初始化工作。
2.构造方法为:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
七个参数的含义:
* corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
* maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
* keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
* unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒 |
* workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue; |
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
* threadFactory:线程工厂,主要用来创建线程;
* handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 |
3.线程池的执行策略
* 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
* 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
* 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
* 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
4..测试
public class Test { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 5 , 10 , 200 , TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>( 5 )); for ( int i= 0 ;i< 15 ;i++){ MyTask myTask = new MyTask(i); executor.execute(myTask); System.out.println( "线程池中线程数目:" +executor.getPoolSize()+ ",队列中等待执行的任务数目:" + executor.getQueue().size()+ ",已执行玩别的任务数目:" +executor.getCompletedTaskCount()); } executor.shutdown(); } } class MyTask implements Runnable { private int taskNum; public MyTask( int num) { this .taskNum = num; } @Override public void run() { System.out.println( "正在执行task " +taskNum); try { Thread.currentThread().sleep( 4000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println( "task " +taskNum+ "执行完毕" ); } } |
这个程序中当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。
5.使用Executors类中提供的几个静态方法来创建线程池
Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池 Executors.newFixedThreadPool( int ); //创建固定容量大小的缓冲池(定长) |
newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;1. 有且仅有一个工作线程执行任务2. 所有任务按照指定顺序执行,即遵循队列的入队出队规则
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。
另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。
三、Callable
创建线程在上一篇wiki中说明了两种方法,分别是继承Thread,重写run方法和实现Runnable接口,重新run方法。现在介绍第三种实现Callable接口,重写call方法。
区别:
-
Callable可以在任务结束的时候提供一个返回值Future对象,Runnable无法提供这个功能
-
Callable的call方法分可以抛出异常,而Runnable的run方法不能抛出异常。
-
Callable规定的方法是call(),而Runnable规定的方法是run().
测试:
/* * FileName: TestCallable * Author: aiguo.sun * Date: 2019/3/28 20:06 * Description: 测试callable方法 */ package JavaTest; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; /** * 一、创建执行线程的方式三:实现 Callable 接口。 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。 * * 二、执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。 FutureTask 是 Future 接口的实现类 */ public class TestCallable { public static void main(String[] args) { ThreadDemo td = new ThreadDemo(); //1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。 FutureTask<Integer> result = new FutureTask<>(td); new Thread(result).start(); //2.接收线程运算后的结果 try { //判断是否完成 if (!result.isDone()) { System.out.println( "-sorry-----------------------" );; } //FutureTask 可用于 闭锁 类似于CountDownLatch的作用,在所有的线程没有执行完成之后这里是不会执行的 Integer sum = result.get(); System.out.println(sum); System.out.println( "------------------------------------" ); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } } class ThreadDemo implements Callable<Integer> { @Override public Integer call() throws Exception { int sum = 0 ; for ( int i = 0 ; i <= 100000000 ; i++) { sum += i; } return sum; } }
|
以上是关于阻塞队列和线程池的主要内容,如果未能解决你的问题,请参考以下文章