Java并发编程:线程池的使用
Posted 夜尽天明00
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程:线程池的使用相关的知识,希望对你有一定的参考价值。
Java并发编程:线程池的使用
原文链接http://www.cnblogs.com/dolphin0520/p/3932921.html
在前面的文章中,我们使用线程的使用就去创建一个线程,这样实现起来非常简便,但是有一个问题:
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁的创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
那么有没有一种方法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以执行其他的任务?
在Java中,可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例。最后讨论了一下如何合理配置线程池的大小。
以下是本文的目录大纲
一.Java中的ThreadPoolExecutor类
二.深入剖析线程池实现原理
三.使用示例
-
Java中的ThreadPoolExecutor类
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。
在ThreadPoolExecutor类中提供了四个构造方法:
// Public constructors and methods
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } |
从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。 下面解释下一下构造器中各个参数的含义:
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 |
在ThreadPoolExecutor类中有几个非常重要的方法: execute() submit() shutdown() shutdownNow()
|
-
深入剖析线程池实现原理
-
线程池状态
在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3; |
runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
下面的几个static final变量表示runState可能的几个取值。
-
当创建线程池后,初始时,线程池处于RUNNING状态;
-
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
-
如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
-
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
-
任务的执行
在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务 private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池的大小//、runState等)的改变都要使用这个锁 private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集 private volatile long keepAliveTime; //线程存货时间 private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间 private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列) private volatile int maximumPoolSize; //线程池最大能容忍的线程数 private volatile int poolSize; //线程池中当前的线程数 private volatile RejectedExecutionHandler handler; //任务拒绝策略 private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程 private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数 private long completedTaskCount; //用来记录已经执行完毕的任务个数 |
每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。
corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:
假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;然后就将任务也分配给这4个临时工人做;如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施
-
任务提交给线程池之后到被执行的整个过程
-
首先,要清楚corePoolSize和maximumPoolSize的含义;
-
其次,要知道Worker是用来起到什么作用的;
-
要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:
-
-
如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
-
如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
-
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
-
如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
-
线程池的初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。
在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:
prestartCoreThread():初始化一个核心线程;
prestartAllCoreThreads():初始化所有核心线程
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意传进去的参数是null }
public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意传进去的参数是null ++n; return n; } |
注意上面传进去的参数是null,根据第2小节的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的
r = workQueue.take();
即等待任务队列中有任务。
-
任务缓存队列及排队策略
在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
-
ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
-
LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
-
synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务
-
任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
-
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
-
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
-
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
-
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
-
线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
-
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
-
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
-
线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
-
setCorePoolSize:设置核心池大小
-
setMaximumPoolSize:设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务
-
使用示例
public class TestThreadPool {
public static void main(String[] args) {
//corePoolSize:核心池的大小 5
//maxmumPoolSize:最大线程数
//200空闲时间
//TimeUnit.MILLISECONDS:时间单位
//创建一个阻塞队列,里面可以存储5个线程
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,200,TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
for(int i=0;i<15;i++) {
MyTask myTask = new MyTask(i);//创建一个线程
executor.execute(myTask);//将这个线程放到线程池中
System.out.println("线程池中线程数目"+executor.getPoolSize()+",队列中等待执行的任务数目"+
executor.getQueue().size()+",已经执行完的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class MyTask implements Runnable{
private int taskNum;
public MyTask(int num) {
this.taskNum=num;
}
@Override
public void run() {
System.out.println("正在执行task"+taskNum);
try {
Thread.currentThread();
Thread.sleep(4000);
}catch(InterruptedException e) {
e.printStackTrace();
}
Java并发编程:线程池的使用