ThreadPoolExecutor分析
Posted wolf-w
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor分析相关的知识,希望对你有一定的参考价值。
1 ThreadPoolExecutor
1.1 常见参数
1.1.1 线程池参数
BlockingQueue<Runnable> workQueue; // 任务队列。使用workQueue.isEmpty()判断队列是否为空,而非workQueue.poll()==null判断,这样的判空方式容纳特殊队列,如DelayQueue
ReentrantLock mainLock;
HashSet<Worker> workers; // 线程池中的所有工作线程,仅能在mainLock下访问
Condition termination; // 用于支持awaitTermination
// 所有的控制参数都被定义为volatile
ThreadFactory threadFactory;
RejectedExecutionHandler handler;
long keepAliveTime;
boolean allowCoreThreadTimeOut;
int corePoolSize;
int maximumPoolSize;
int largestPoolSize; // 线程池中工作线程的历史最大数量:largestPoolSize = workers.size() > largestPoolSize ? workers.size() : largestPoolSize;
long completedTaskCount; // 完成任务的计数器,仅在工作线程终止时更新,仅在mainLock下访问
1.1.2 核心参数ctl
/* 核心字段,打包了2种含义:worker线程数、线程池状态。
workerCount(低29位):已经被允许start并且不被允许stop的worker的数量。该值可能与活动线程的实际数量会出现短暂性不同
runState(高3位):状态的数值顺序是重要的,以允许有序的比较,状态流转如下
> RUNNING -> SHUTDOWN,调用shutdown()方法,可能隐含在finalize()方法中
> RUNNING or SHUTDOWN -> STOP,调用shutdownNow()
> STOP -> TIDYING,当线程池是empty时
> TIDYING -> TERMINATED,当terminate()钩子方法执行完成
当状态为TERMINAED时,线程在awaitTermination()方法上的等待将会返回。
运行状态描述 已知:RUNNING=111, SHUTDOWN=0, STOP=001, TIDYING=010, TERMINATED=011
RUNNING // 接收的新任务,并且处理排队中的任务
SHUTDOWN // 不接受新任务,但处理排队中的任务
STOP // 不接受新任务,不处理排队中的任务,并且中断正在处理的任务
TIDYING // 所有任务已经终止,workerCount=0,将运行terminate()钩子方法
TERMINATED // terminate()钩子方法执行完毕
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
1.1.3 关于mainLock
ReentrantLock mainLock;
HashSet<Worker> workers; // 线程池中的所有工作线程,仅能在mainLock下访问
mainLock
字段主要是为了保证工作线程池字段workers(非安全集合HashMap)
在多线程并发情况下的访问。至于workers
为何使用HashMap
而非使用安全的ConcurrentHashMap
,原因如下所示:
使用
mainLock
加锁操作会让操作按照顺序一个一个执行。这样保证了interruptIdleWorkers()
方法在执行期间避免中断风暴,尤其是在shutdown期间。注:
interruptIdleWorkers()
方法只会被shutdown()
方法调用
1.2 Worker线程
1.2.1 简介
从类图结构来看:Worker
类既是锁,又是线程。
核心代码如下所示,共分为3部分:
- 多线程核心代码:run()方法
- AQS核心代码:tryAcquire、tryRelease。由实现可知,该锁是不可重入锁
- 为worker线程中断提供方法:interruptIfStarted()
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 抑制中断操作,直到runWorker()方法开始运行,runWorker()方法运行时会先执行unlock()方法,将state设置为0
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
// 多线程核心部分:实现run()方法
public void run() {
runWorker(this);
}
// AQS核心实现部分
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 中断worker线程
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
1.2.2 抑制中断
worker线程自己提供了中断worker的方法interruptIfStarted()
。即创建Worker对象之后,只有在worker线程运行后,才可以执行worker线程的中断操作。
抑制中断的手段:
- 将state=-1
- 提供方法
interruptIfStarted()
tryAcquire()
方法中,CAS只有在state=0的时候才能操作成功,即获取到锁
在中断操作的方法中:
interruptWorkers()
:该方法会调用interruptIfStarted()
方法中断worker线程,而该方法只有在state>=0的情况下才会执行中断操作interruptIdleWorkers()
:该方法只有在worker.tryLock()
获取到锁时,才能执行中断操作
而上述情况下,state=-1,均不能执行成功。只有在runWorker()
方法执行后,才会将state=0
操作执行
2.3 execute
2.3.1 execute核心
该方法作为整体方法入口,会根据线程池的线程数量、线程池状态,来决定针对添加的task执行以下哪3种操作:
- 添加worker线程(前提条件:线程池为运行态)
- worker线程数量<corePoolSize
- 当
任务队列
添加新task成功,且线程池仍在运行状态,但此时线程池worker线程数量为0 - 当
任务队列
添加新task成功,且线程池状态已经不在运行态,但从任务队列中移除新任务失败,但此时线程池worker线程数量为0
- 放入
任务队列
(前提条件:线程池为运行态)- worker线程数量<corePoolSize,但addWorker()添加线程池失败,且线程池为运行态
- worker线程数量>=corePoolSize,且线程池为运行态
- 拒绝任务
- 线程池为非运行态
- 线程池数量满了,任务队列满了
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} else if (!addWorker(command, false))
reject(command);
}
2.3.2 addWorker
该方法一共干了3件事:
- 将线程池中worker线程的数量自增
- 创建一个Worker对象,并添加到
workers
属性中 - 添加成功后,启动worker线程
2.4 runWork
源码省略
2.4.1 getTask
该方法有个非常重要的关键点,即:worker线程是否会正常执行结束。即:getTask()
方法一旦返回null,表明该worker线程无task可以处理,此时正常情况下该worker线程将会执行结束,线程退出。而getTask()
方法中,会会根据timed来决定在BlockingQueue
获取task时,是否阻塞等待。
总结:该方法一旦返回NULL,即表明该worker线程
将执行结束;否则,worker
线程将在BlockingQueue
队列中阻塞等待
private Runnable getTask() {
boolean timedOut = false; // 在workQueue.poll()是否超时
for (;;) {
/* 如下操作判断线程池的状态,是否结束该线程:
1. 线程池状态SHUTDOWN
2. 线程池状态STOP,且任务队列为空(即没有任何可执行任务)*/
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/* timed:用于判断worker线程调用BlockingQueue获取task时,阻塞是否为有限期的阻塞。
timed=true,表示是有超时时间的阻塞
timed=false,表示无限期阻塞
allowCoreThreadTimeOut=true,则表示允许核心线程数超时 */
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 线程池数量大于maximumPoolSize,该线程获取任务直接返回null。
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
2.4.2 processWorkerExit
顾名思义,该方法时worker线程
退出前的处理工作:
- 如果是异常导致退出worker线程,则减少workerCount数量
将当前线程完成的任务数量,累加到completedTaskCount中
尝试进行线程池终止
如果线程池仍在RUNNING状态,且worker线程没有异常退出,由于getTask()==null,即任务等待队列已经为空,此时判断coreThread是否允许超时,来限制空闲的workers的线程数量
注:是否将线程池的worker线程
数量维护在一个稳定范围,仍然需要考虑。该方法就是做如此处理:worker线程在结束前,会判断线程池是否需要新增一个新的worker线程,新增worker线程的情况如下:
- worker线程为异常终止
- 线程池中的worker线程数量,小于其应该有的最小值(若允许核心线程运行结束,则最小值为1,否则最小值为corePoolSize)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 试着看看线程池是否可以终止
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
/* 新增worker线程的情况如下:
1. worker线程为异常终止
2. 线程池中的worker线程数量,小于其应该有的最小值(若允许核心线程运行结束,则最小值为1,否则最小值为corePoolSize)*/
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) // 如果min == 0,且任务队列中又增加新任务,则将min=1,workers中保留min个worker线程。
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 相当于创建一个新的worker线程,但没有马上为该worker线程分配task。该worker将会从队列中getTask()获取任务。
addWorker(null, false);
}
}
2.5 shutdown
2.5.1 代码示例
核心操作如下:
- 设置线程池运行状态为
SHUTDOWN
- 中断所有空闲worker线程(即处于park状态的线程):
interruptIdleWorkers()
- 调用钩子方法:
onShutdown()
- 尝试将线程池设置为终止状态:
tryTerminate()
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
2.5.2 中断风暴
中断风暴的产生,主要来自于如下方法:
- 遍历
空闲的worker线程
- 判断是否已经中断,未中断,则进行中断操作
假如多线程并发调用shutdown()
方法,此时若没有mainLock
锁,让线程有序的进行调用,则可能引发大规模的空闲worker
线程中断
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
2.5.3 对比shutdownNow
两个方法如下所示,其中主要区别为:
shutdown()
的状态是SHUTDOWN
;而shutdownNow()
的状态则是STOP
shutdown()
中断空闲worker线程;而shutdownNow()
则是中断所有worker线程shutdown()
有钩子方法
2.5.4 中断worker线程对比
中断worker
的线程总共有两类:
中断空闲worker:若未中断,且
不可重入锁
加锁成功,再中断。非重入锁加锁的时机有两个:- 此处,用于中断空闲线程
runWorker()
方法中,getTask()获取任务时,会加锁。
即,运行可执行任务和中断线程的操作,是不可同时发生!!
中断所有worker:遍历直接中断
2.6 总结钩子方法
1、runWorker()中有前置、后置方法
2、shutdown()中有onShutdown()方法
以上是关于ThreadPoolExecutor分析的主要内容,如果未能解决你的问题,请参考以下文章
聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源代码分析
Java中的线程池——ThreadPoolExecutor源代码分析
高并发多线程基础之ThreadPoolExecutor源代码分析