线程池从入门到放弃
Posted java充电站
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池从入门到放弃相关的知识,希望对你有一定的参考价值。
王小二最近刚刚拿到了一家互联网公司的java研发的实习offer,激动的他很快便前往公司去实习了。
赵铁柱是王小二的上司,开工的第一天便给王小二安排了一个需求,要求他开发一个功能,每天在指定的时间点运行各种各样的任务。王小二灵机一动,立马就想到了线程池。
谷歌了一下,王小二直接将网上的代码copy到了公司的项目里面,然后提交转测。
结果没想到在压测环节便出现了异常情况,于是赵铁柱便找到了王小二询问原因,通过代码审查,看到这样一段代码:
ExecutorService executorService=Executors.newCachedThreadPool();
executorService.execute(task);
于是赵铁柱便开始对王小二进行了质问,为何要采用这样的写法,由于每个运行的任务都对系统本身有较大的的负载,所以导致了线程创建过多,耗尽cpu导致异常。
王小二作为新来的实习生,对线程池的内部的构造原理并不是很熟悉,才会犯下这样的错误,于是赵铁柱便开始耐心地给他讲解了线程池的原理。
常见的几种线程池类型
1.newCachedThreadPool
newCachedThreadPool是一种可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。这类线程池的底层源码为:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
当使用了该类线程池的时候容易有几种情况发生:
由于设置的maximumPoolSize为Integer.MAX_VALUE,因此创建的线程数量几乎是没有上限的,存在一定的风险性。
keepAliveTime是指线程的生命周期,当一个线程创建了之后,这里默认设置为了60秒,如果60秒内该线程一直处于空闲状态的话,那么该线程就会被回收,减少资源的消耗。当然,回收之后如果又有新的任务要提交,那么还是可以重新创建线程的。
SynchronousQueue是一种不存储任何元素的堵塞队列,每一次执行插入操作的时候都需要等待到另一个线程的移除操作。这也就容易导致一个问题,如果加入的线程没有被消费,那么就会一直堵塞其中,导致后续的任务无法继续加入。
2.newFixedThreadPool
FixedThreadPool是一款比较好用而且较为优秀的线程池,在线程池进行初始化操作的时候,可以设定初始化线程池的线程个数。
下边我们来看看newFixedThreadPool的源代码部分:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
这里的两个corePoolSize和maximumPoolSize参数都设置为相同的nThreads形参值,然后keepAliveTime值设置为了0,说明一旦有多余的空闲线程就会立马进行回收操作,从而减少消耗。当然啦,这里采用的队列结构是LinkedBlockingQueue。
3.newSingleThreadExecutor
SingleThreadExecutor是一个单线程化的Executor,只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
相应的源码定义如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4.newScheduleThreadPool
ScheduleThreadPool也是继承自ThreadPoolExecutor,主要的应用场景是执行一些定时任务。在应用方面有点类似于Timer,但是却与Timer有着本质上区别,ScheduleThreadPool更加的灵活更加的强大,Timer只是一个单线程在运行任务,如果运行期间出现了程序崩溃的情况,则会波及到后边所要执行的任务,而ScheduleThreadPool可以指定多个线程数目来执行任务,各个线程执行的任务相互独立,不会互相波及。
相应的源码定义如下:
Executors内部的定义:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
ScheduledThreadPoolExecutor内部的定义:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
最主要常用的线程池就是这几类了,但是在Executors的内部其实还隐藏了很多的细节内容。
在老大的简单介绍之后,王小二大概地对线程池有了个初步的了解。
我们从整个Execute的框构来看,Executor接口底下有着非常多的继承和实现。
而在ExecutorService里面则包含了非常重要的一系列具体方法:
1,execute(Runnable command):履行Ruannable类型的任务,
2,submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象
3,shutdown():在完成已提交的任务后封闭办事,不再接管新任务,
4,shutdownNow():停止所有正在履行的任务并封闭办事。
5,isTerminated():测试是否所有任务都履行完毕了。
6,isShutdown():测试是否该ExecutorService已被关闭。
线程池的重要属性
在ThreadPoolExecutor里面包含有多个参数,每个不同的参数都表示了不同的含义。我们来大致认识一下下列的这些属性含义:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
在这段代码里面,有个关键的变量叫做ctl ,该变量的主要作用是对线程池的整体运行状态和线程池中的worker线程数量进行控制的字段,这里我们可以理解为通过二进制的转换将线程池的运行状态和worker数量统一用一个数值变量来表示,该变量的高3位表示为当前线程池的状态,低的29位表示为当前worker线程的数量。
线程池的状态有哪些
一共有5种状态,代码的描述为:
RUNNING = 1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011
1、RUNNING
(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
2、 SHUTDOWN
(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
3、STOP
(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or
SHUTDOWN ) -> STOP。
4、TIDYING
(1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;
可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
5、 TERMINATED
(1) 状态说明:线程池彻底终止,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。进入TERMINATED的条件如下:线程池不是RUNNING状态;线程池状态不是TIDYING状态或TERMINATED状态;如果线程池状态是SHUTDOWN并且workerQueue为空;workerCount为0;设置TIDYING状态成功。
整体的线程池生命周期
此时的王小二开始听得有点好奇了......
当worker线程忙不过来的时候该如何处理
我们从整体的运作流程来看看线程池的执行顺序:
首先我们通过execute或者submit的形式将任务加入到线程池里面,然后会有以下的几种情况:
1.corePool里面有空闲的核心线程可以执行该任务,则直接执行该任务
2.corePool已经满了,需要创建多的线程来执行任务,于是这个时候就会创建新的非核心线程来执行该任务。
3.当非核心线程+核心线程的数目达到了上线的时候,这个时候相关的任务就需要加入到相应的任务队列中了,然后等待线程去消费。
4.如果整个任务队列都满了的话,就会执行内置的RejectedExecutionHandler
RejectedExecutionHandler里面包含了四种类型策略:
1、AbortPolicy:直接抛出异常,默认策略;
2、CallerRunsPolicy:用调用者所在的线程来执行任务;
3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4、DiscardPolicy:直接丢弃任务;
源码分析
在执行execute操作的时候,我们深入源码来阅读:
jdk1.8的源码位置在ThreadPoolExecutor类的1332行
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//记录当前线程池的 工作状态(高3位)和worker线程数(后边29位)
int c = ctl.get();
//首先判断是否当前的worker线程数是否小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
//创建worker线程,并且添加任务给worker线程
//worker线程,包含有thread属性和firsttask属性,通常一个worker对应一个线程,但是不一定会有firstTask,因为它有可能从队列里面取任务。第二个参数是指是否创建为core线程,如果为false则表示创建的是非核心线程。
if (addWorker(command, true))
return;
// 由它可以获取到当前有效的线程数和线程池的状态
c = ctl.get();
}
// 当前线程池是否是running状态,如果是则加入任务到任务队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//双重判断,如果任务刚放入队列就关闭了线程池,那么这个时候需要清除掉之前加入的任务,并且抛出饱和异常
if (! isRunning(recheck) && remove(command))
reject(command);
//如果当前的核心worker线程数为0,那么就直接从队列里面去取任务并且创建非核心线程,有可能刚创建了线程,但是该线程执行完毕之后立马就挂了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//创建非核心线程,并且绑定任务,如果失败则抛出饱和异常
else if (!addWorker(command, false))
reject(command);
}
这段代码里面主要包含了线程池运作的核心逻辑思路,从创建核心线程,到扩展非核心线程,到添加进入任务队列,再到饱和异常抛出,全都有涉及。
接下来我们来看worker的源码:
worker本身是ThreadPoolExecutor里面的一个私有类,该类继承了aqs同步队列器,同时实现了Runnable方法。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
// 当前线程完成的任务数量,日后可以通过查看该数目来统计该线程完成的任务数目
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//在reetrenlock里面这个参数设置为0,但是这里设置为了-1,主要是和线程池的中断有关,后边在runWorker方法中有个unlock操作和这里有呼应。
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//注意这里的thread是指当前的worker类
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
//调用了runWorker函数,其实真正执行firstTask的run方法是在runWorker里面
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
//这里面的release方法包含有tryRelease的方法,所以上边的tryRelease进行了重写,特意将state设置为0。
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
Worker类
线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,请参见上方源码。
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
在 调 用 构 造 方 法 时 , 需 要 把 任 务 传 入 , 这 里 通 过getThreadFactory().newThread(this); 来新建一个线程, newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:
1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
2. 如果正在执行任务,则不应该中断线程;
3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务, 这时可以对该线程进行中断;
4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize
这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。
addWorker方法
addWorker的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用
于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,代码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 这个if判断
* 如果rs >= SHUTDOWN,则表示此时不再接收新任务;* 接着判断以下3个条件,只要有1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却
可以继续处理阻塞队列中已保存的任务
* 2. firsTask为空
* 3. 阻塞队列不为空
*
* 首先考虑rs == SHUTDOWN的情况
* 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回
false;
* 然后,如果firstTask为空,并且workQueue也为空,则返回false,
* 因为队列中已经没有任务了,不需要再添加线程了
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是
// 29个1),返回false;
// 这里的core是addWorker方法的第二个参数,如果为true表示
// 根据corePoolSize来比较,
// 如果为false则根据maximumPoolSize来比较。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功就跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 自增失败就跳出循环
c = ctl.get(); // Re-read ctl
//对当前线程池的状态进行校验
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//根据firstTask来创建Worker对象
w = new Worker(firstTask);
//一个worker对应一个线程
final Thread t = w.thread;
if (t != null) {
// 由于可能会有并非情况,避免任务重复提交,需要加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// rs < SHUTDOWN表示是RUNNING状态;
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将woker加入到workers里面,workers其实是一个hashSet来的。
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动线程,然后会调用worker的run方法,原因是worker在创建线程的时候,往线程工厂里面注入了一个this对象。
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted
}
getTask方法
getTask方法是用来从阻塞队列中取任务,源代码如下:
private Runnable getTask() {
// timeOut变量的值表示上次从阻塞队列中取任务时是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/*
*如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
*1. rs >= STOP,线程池是否正在stop;
*2. 阻塞队列是否为空。
*如果以上条件满足,则将workerCount减1并返回null。
*因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
*timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
*接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
*如果减1失败,则返回重试。
*如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
*根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控 制,如果在keepAliveTime时间内没有获取到任务,则返回null;
*否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为
空。
**/
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
// 如果 r == null,说明已经超时,timedOut设置为true
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
timedOut = false;
}
}
}
这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
什么时候会销毁?
在runWorker方法执行完之后,也就是Worker中的run方法执行完,由JVM自动回收。getTask 方法返回null 时, 在runWorker 方法中会跳出while 循环, 然后会执行processWorkerExit方法。
processWorkerExit方法
相关源代码
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
// 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计完成的任务数
completedTaskCount += w.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
/*
*当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接
addWorker;
*如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个
worker;
*如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:
此时此刻,作为实习生小白的王小二早已在“流”下了没有技术的泪水,默默地回去改bug了......
长按二维码
以上是关于线程池从入门到放弃的主要内容,如果未能解决你的问题,请参考以下文章
Python爬虫从入门到放弃之 Scrapy框架的架构和原理