详解Java线程池运行机制(结合源码)
Posted 海涛技术漫谈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了详解Java线程池运行机制(结合源码)相关的知识,希望对你有一定的参考价值。
欢迎关注同名博客:
http://blog.csdn.net/zhanht
由于项目原因,近期使用多线程较为频繁,因此对多线程产生了浓厚的兴趣,也一直想弄懂它的原理,但每次都是浅尝而止。
决定提笔写这文章源于前两天code review的时候,组里的一位同事无意间提了一个问题:
线程池中,如果一个线程中的任务执行过程中,抛了异常,会发生什么?线程池中的线程个数会因此变少吗?
思考这个问题的过程中,我意识到,是时候对线程池知识做一个总结了。
接着往下看之前,也希望各位可以花个几分钟自己在自己脑子里好好思考下这个问题,然后带着问题往下接着走。
接下来,本文将结合JDK1.8的ThreadPoolExecutor类源码,从如下几个方面对线程池进行详细的阐述:
线程池的重要性
线程池整体工作示意图
线程池的创建
任务的提交
总结
一: 线程池的重要性
我记得李智慧老师的《大型网站技术架构》一书中说到,提高性能和可用性有三驾马车,分别是:缓存,异步和集群。而线程池正是异步的常用手段,
他合适使得主逻辑和附加逻辑进行解耦,能提高响应速度,提高性能,还能在高并发情况下达到削峰的作用。
在如今多核处理器的时代,多线程开发发挥着巨大的作用,总的来说,我认为它至少有如下三个明细优势:
(1) 降低cpu等资源消耗:通过对线程的重用,避免不断的创建和销毁线程占用大量的cpu等资源
(2) 提高速度和性能:利用多线程进行异步,并行 处理任务
(3) 简单方便:线程池统一进行线程的管理,调度
二: 线程池整体工作示意图
该示意图描述了线程池的整个运行过程:
(1) 通过submit(Callable<T> task ) 或者 execute(Runnable command)方法提交任务,submit方法其实最后也是调用的execute方法,只是调用前将任务封装成了FutureTask,它会将任务运行的结果或者任务抛出的异常封装进 Future中,返回给调用方,这样主线程就能拿到返回值或者异常信息了。
(2) 如果线程池中线程个数小于corePoolSize,则新建一个线程用来执行这个任务。线程池,其实说白了就是一个装有Worker的HashSet,Worker是线程池中的一个内部类,它有两个重要属性:线程和任务。
线程池新建线程其实就是new 一个Worker然后加进Set中,这个Worker会新建一个线程,并在此线程中,进行任务的处理。
(3) 如果线程个数达到corePoolSize时,任务则会被放进队列进行排队
(4) 如果队列也放满了,则会继续新建Worker,直到个数达到maxPoolSize
(5) 如果线程个数已经达到maxPoolSize,并且队列也满了,则会走拒绝策略,常见策略包括:主线程执行(CallerRunsPolicy),抛异常(AbortPolicy)和丢弃(DiscardPplicy)三种
三: 线程池的创建
可以通过ThreadPoolExecutor的构造方法直接创建线程池,更常见的一般是使用Executors中的工厂方法进行创建。
如果项目使用的spring框架,建议使用ThreadPoolTaskExecutor类,然后corePoolSize, maxPoolSize,queueCapacity和拒绝策略通过配置进行合理的配置。
orePoolSize, maxPoolSize和queueCapacity大小怎么设置,可以参考本期的另外一篇文章。
四: 任务的提交
前面介绍了,submit方法最后也是调用的execute方法,只是将任务进行了适当的包装处理,因此这里详细介绍execute方法。根据章节二的描述可知,提交任务无非就是往Set添加新的Worker,要么是任务入队列排队,或者进行拒绝策略。下面从execute方法开始结合源码进行详细的介绍:
execute作为提交任务的方法,要么新建并添加线程(addWorker),要么入队列(workQueue.offer()),或者走拒绝策略。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/**
* ct1是个AtomicInteger类型,int是32位的
* 前3位用于表示线程池的状态,后29位
* 表示Worker(线程池中线程)的数量
* 因此最大的线程个数为2的29次方减一
*/
int c = ctl.get();
/**
* 这块代码主要就是,如果当前线程个数
* 小于corePoolSize
* 就addWorker,新创建一个worker
* 并加入Set<Worker>
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
* 这块代码是当前线程个数大于等于
* corePoolSize或者上面addWorker失败
* 然后进行入队列的操作
* 入队列成功后,如果线程池状态突变了,
* 则会出队并拒绝这个任务
* 接下来会再次检查线程池的线程个数
* 如果为0,那么就没线程执行队列等待的任务
* 会addWorker,新建一个
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 走到这,说明队列满了,会再次addWorker
* (直到maxPoolSize)
* 如果失败,则走拒绝策略拒绝这个任务
*/
else if (!addWorker(command, false))
reject(command);
}
execute方法里面的核心,是addWorker,下面看源码:
private boolean addWorker(Runnable firstTask,
boolean core) {
// goto用法的label,用来方便的跳出双层循环
retry:
/**
* 这快代码主要是校验线程池状态和线程个数
* 判断是否还能添加Worker
*/
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 这块代码比较绕,读懂的关键在于
* 把握 &&运算符的短路特性
* 主要功能是看什么情况下会返回false,
* 即addWorker失败
*
* 首先介绍下,线程池包括如下状态:
* RUNNING:可以新加线程,
* 同时可以处理queue中的线程
* SHUTDOWN:不增加新线程,
* 但是处理queue中的线程
* STOP:不增加新线程,但是
* 处理queue中的线程
* TIDYING:所有的线程都终止了 queue中
* )同时workerCount为0,
* 那么此时进入TIDYING
* TERMINATED:terminated()方法结束
* ,变为TERMINATED
*
* 分析可知,addWorker失败有如下情况:
* 1. rs>SHUTDOWN
* 2. rs=SHUTDOWN,firstTask != null,
* 因为shutDown状态下不接受新线程,
* 只处理queue中的任务
* 3. rs=SHUTDOWN,firstTask = null,queue为空
* 此时说明是新增线程用来消耗queue中
* 的任务,但 queue为空,所以也返回失败
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
/**
* 上面关注的是线程池的状态,
* 这块代码关注的则是worker(即线程)的数量
*/
for (;;) {
int wc = workerCountOf(c);
/**
* capacity为线程的最大个数,
* 受int的29位限制,大于此值直接返回失败
* core表示的是以corePoolSize还是
* maximumPoolSize为上限
* 最初始阶段,线程是逐渐增加至
* corePoolSize的,core为true
* 后面,对列也满了后,就也
* maximumPoolSize为上限了,core为false
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// workerCount加1成功后,跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
/**
* 接下来的这块代码,就是新建线程
* 并且维护进set中
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Worker实现了Runnable接口,
// 内部维护了线程和任务两个重要属性,
// 关键是它的run()方法
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// HashSet是线程不安全的,所以操作要加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
/**
* rs < SHUTDOWN(即running):可以
* 正常添加线程
* rs == SHUTDOWN && firstTask == null
* 此时添加的是消耗queue中任务的线程
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
/**
* 添加成功后,启动新worker中的线程
* 拿到时间片后,会执行Worker中
* 的runWorker()方法
*/
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
到目前为止,就算是往线程池添加线程成功了,接下来就是线程执行任务。
Worker创建成功后,先执行firstTask,之后工作就是不断的去队列拿任务执行
runWorker()方法:
final void runWorker(Worker w) {
// 获取当前线程,worker的构造方法
// 会通过线程工厂方法新建一个线程
Thread wt = Thread.currentThread();
// 刚创建worker的firstTask才可能非null,
//创建用来消耗queue中的任务的firstTask都是给的null
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// task不为null执行task,否者通过getTask()去队列拿
while (task != null || (task = getTask()) != null) {
w.lock();
// 再次判断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 线程池提供的一个钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
/**
* 这里需要注意下
* completedAbruptly默认是true
* 只有当上面的循环正常退出
* 即队列任务执行完了,才会设为false
* 任务执行过程中抛异常,依然未true
*/
completedAbruptly = false;
} finally {
/**
* 注意这里,这个方法实现就是
* 文章开头问题的答案所在
* 如果线程执行任务过程中抛了异常
* 会发生什么
*/
processWorkerExit(w, completedAbruptly);
}
}
runWorker()方法比较简单,关键在于去队列拿任务的getTask()方法,和线程异常关闭处理的 processWorkerExit方法,下面分别进行阐述:
getTask()方法:
private Runnable getTask() {
// 判断最后的poll是否要超时,
// 用于考量当前的线程个数是否富余
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 如果rs=shutdown,queue为空:
* 减少workerCount并返回null
* 如果rs>=stop
* 同样减少workerCount并返回null
*/
if (rs >= SHUTDOWN && (rs >= STOP
|| workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**
* 允许核心线程超时退出 或者
* 当前worker个数 > corePoolSize时
* timed = true, 表示当前线程允许退出
*/
boolean timed = allowCoreThreadTimeOut
|| wc > corePoolSize;
/**
* 这里关键在于分析if里面什么时候为true
* (1): wc > maximumPoolSize, 为true,
* 减少线程数,返回null
* (2): 1 < wc <= maximumPoolSize,
* 里面简化为 timed && timedOut
* 允许线程退出,并且当前线程富余
* 才减少线程
* (3): wc <= 1, 里面简化为:
* timed && timedOut && workQueue.isEmpty()
* 此时只有一个工作线程,只要队列不为空
* 就得一直处理,不能减小线程数
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/**
* 1. timed为true,当前线程允许退出,
* 通过poll从队列拿
* 拿到直接返回任务,如果超时时间内没拿到,
* 说明任务不多,但线程多
* 即worker有富余,因此timedOu 置为 true
* 上面就能减少线程数了
* 2. timed为false,不允许当前线程退出
* 拿不到任务一直阻塞
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime,
TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit()方法:
private void processWorkerExit(Worker w,
boolean completedAbruptly) {
// 如果异常退出,workerCount先减一
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 任务抛异常了,线程池也认为
// 此线程完成了它的工作
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// 线程异常退出,不会进if,直接addWorker
if (!completedAbruptly) {
/**
* 这里面处理的是线程正常退出逻辑
* 如果允许核心线程超时退出,
* 并且队列不为空,有任务需要
* 处理,则 min = 1
* 如果不允许核心线程超时退出
* 则min = corePoolSize
*
* 如果当前的线程数小于min
* 则新建一个线程进行补位
*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
五: 总结
至此,关于线程池原理就介绍完了,下面进行简单的总结。
线程池,其实说白了就是一个 Set<Worker>,Worker是线程池的内部类,它实现了Runnable接口,维护了线程和任务,当新建一个Worker时,它的构造方法内会新创建一个线程用来执行任务,他首先条件入参传进来的任务,执行完后,就不管的从workQueue中不断的通过getTask()从队列获取任务来执行。
以上是关于详解Java线程池运行机制(结合源码)的主要内容,如果未能解决你的问题,请参考以下文章