关于线程池的这 8 个问题你都能答上来吗?
Posted 郭霖
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于线程池的这 8 个问题你都能答上来吗?相关的知识,希望对你有一定的参考价值。
/ 今日科技快讯 /
近日特斯拉工厂的主体已经建设完成,冲压车间、涂装车间、总装车间等具备了生产能力,首辆白车身已经下线,今年底前将可以投入生产。特斯拉工厂从开工建设到白车身下线总共用了8个月的时间。按其计划,到年底特斯拉将实现每周3000辆的量产目标。建成后的上海工厂生产的车型将包括Model 3和Model Y,这些车辆将只在中国销售,而Model X和Model S以及高端Model 3和Model Y依然会在美国工厂生产,再出口到中国。
/ 作者简介 /
时间过得就是如此之快,转眼之间又要到周末了,虽然只放一天,但是后面就是国庆长假啦!
本篇文章来自MxsQ的投稿,分享了对线程池源码的分析,希望对大家有所帮助!同时也感谢作者贡献的精彩文章。
https://www.jianshu.com/u/9cf1f31e1d09
/ 前言 /
如果有人问我:“你了解Java线程池吗”,我不打算回答Java中常用的几种线程池,也记不住。从线程池的上层API来看,再多种的线程池,无非是参数的不同,让它们呈现出了不同的特性,那么这些特性到底依赖什么样的原理实现,就更值得去深究,也是本文的目的。
试着回答以下几个问题:
-
线程池如何实现 -
非核心线程延迟死亡,如何做到 -
核心线程为什么不会死 -
如何释放核心线程 -
非核心线程能成为核心线程吗 -
Runnable在线程池里如何执行 -
线程数如何做选择 -
常见的不同类型的线程池的功效如何做到
构造参数与对象成员变量
-
corePoolSize:核心线程数,期望保持的并发状态 -
maximumPoolSize:最大线程数,允许超载,虽然期望将并发状态保持在一定范围,但是在任务过多时,增加非核心线程来处理任务。非核心线程数 = maximumPoolSize - corePoolSize -
workQueue:阻塞队列,存储线程任务Runnable -
keepAliveTime:在没有任务时,线程存活时间 -
threadFactory:用来构建线程 -
handler:当任务已满,并且无法再增加线程数时,或拒绝添加任务时,所执行的策略
Worker
ctl
-
RUNNABLE:运行状态,接受新任务,持续处理任务队列里的任务 -
SHUTDOWN:不再接受新任务,但要处理任务队列里的任务 -
STOP:不接受新任务,不再处理任务队列里的任务,中断正在进行中的任务 -
TIDYING:表示线程池正在停止运作,中止所有任务,销毁所有工作线程 -
TERMINATED:表示线程池已停止运作,所有工作线程已被销毁,所有任务已被清空或执行完毕
// 值为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 高三位全为0,低29位全为1,因此线程数量的表示范围为 0 ~ 2^29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
因为ctl分位来表示状态和数量,下面几个状态仅看有效位的值
*/
// 有效值为 111
private static final int RUNNING = -1 << COUNT_BITS;
// 有效值为 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 有效值为 001
private static final int STOP = 1 << COUNT_BITS;
// 有效值为 010
private static final int TIDYING = 2 << COUNT_BITS;
// 有效值为 011
private static final int TERMINATED = 3 << COUNT_BITS;
// 默认状态为RUNNING,线程数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
-
runStateOf(): 获取线程池状态 -
workerCountOf(): 获取工作线程数量
添加任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取当前ctl值
int c = ctl.get();
// 当前线程数少于最大核心线程数
if (workerCountOf(c) < corePoolSize) {
// 添加核心线程,添加线程任务
if (addWorker(command, true))
return;
// 上面的过程期间,ctl可能已被更改,获取最新值
c = ctl.get();
}
// 线程池状态为RUNNABLE,向工作队列添加任务
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查用
int recheck = ctl.get();
// 线程不处于RUNNABLE状态,移除任务
if (! isRunning(recheck) && remove(command))
// 执行拒绝任务策略
reject(command);
else if (workerCountOf(recheck) == 0)
// 执行到这里说明已没有可用的工作线程,创建新的工作现线程
addWorker(null, false);
}
// 添加非核心队列来执行线程任务
else if (!addWorker(command, false))
// 说明线程池达到饱和,或者线程池shut down,执行拒绝策略
reject(command);
}
-
如果当前核心线程数量没达到最大值corePoolSize,创建新线程来执行此任务 -
如果当前核心线程到达最大,向阻塞队列添加任务 -
如果核心线程已满,阻塞队列已满,尝试开启非核心线程来执行任务 -
如果线程池不处于RUNNABLE状态,或者处于饱和状态,执行任务拒绝策略
private boolean addWorker(Runnable firstTask, boolean core) {
// 这个是类似 goto 的语法,代码有效片段是下面第一for循环
retry:
for (;;) {
int c = ctl.get();
// 获取程序状态
int rs = runStateOf(c);
/**
这一个条件需要仔细理解。
1. 当线程处于STOP、TIDYING、TERMINATED时,线程池是拒绝执行任务的
因此不需要任务,也不添加线程
2. 当线程处于SHUTDOWN状态时,线程池需要把任务处理完,才会到达后面的
TIDYING、TERMINATED状态。因此,如果阻塞队列还有任务的话,继续添加
线程来加快处理。
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数
int wc = workerCountOf(c);
// 线程数超过或等于能表示的上限
// 或 比较 核心线程数达到上限,或比较线程池允许的最大线程数,取决于core
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS操作增加线程数,跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 上面的CAS操作没成功,检查线程池状态与开始是否一致,
// 如果一致,继续执行此for循环,否则重新执行retry代码块,
// 自旋以期CAS成功,后续才能添加线程
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 将线程任务加入Worker,新增了Worker,就是新增了线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 再次检查线程池状态
// 1. 处于RUNNABLE状态,继续添加线程执行任务
// 2. 处于SHUTDOWN状态,到这里说明队列里还有任务要执行
// 增加线程期望让任务执行快一点
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 这里说明发生了意外状况,新建的线程不可用
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加worker进集合
workers.add(w);
int s = workers.size();
// largestPoolSize可以表示线程池达到的最大并发
if (s > largestPoolSize)
largestPoolSize = s;
// 添加线程成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动新添加的线程
t.start();
// 线程启动成功
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 线程启动失败,移除work,销毁线程
addWorkerFailed(w);
}
return workerStarted;
}
-
线程池处于 RUNNBALE 或者处于 SHUTDOWN 并在阻塞队列里还有任务时,需要添加新线程。自旋确保 CAS 成功,然后添加新线程 -
线程存于Worker,线程池存有Worker信息,就能访问线程 -
线程启动失败,则移除Worker,销毁线程
执行任务
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
Worker(Runnable firstTask) {
setState(-1);
// firstTask就是addWorker()带来的Runnable
this.firstTask = firstTask;
// 通过ThreadFactory创建线程,将自己作为Runnable提交
this.thread = getThreadFactory().newThread(this);
}
......
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// task一开始是firstTask, 后面就通过getTask()从阻塞队列里拿任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 线程池状态检查
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 这个方法子类可以重写,在任务执行前有回调
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 这个方法子类可以重写,在任务执行后有回调
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程池已没有任务了,工作线程达到了可退出的状态,Worker退出
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
// 超时标志
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查线程池和阻塞队列状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 减少线程数
decrementWorkerCount();
return null;
}
// 获取线程数
int wc = workerCountOf(c);
// 线程等待方式标志位判断依据
// allowCoreThreadTimeOut代表核心线程是不是能退出,如果核心线程能退出,就更别说非核心线程了
// 另一个则是看是否存在非核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 超时,或者并且线程超标超标,返回null,让上一层函数退出线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed为true,则使用poll等待最多keepAliveTime时间获取任务
// 如果timed为false,使用take()获取任务,阻塞线程,直到可以从阻塞队列拿到任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 到这里说明线程中断,先通过decrementWorkerCount()减少线程数值
// 否则,说明是线程没有从阻塞队列获取到线程
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// completedTaskCount记录线程池总共完成的任务
// w.completedTasks则是线程完成的任务数
completedTaskCount += w.completedTasks;
// 移除Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 线程池状态改变,尝试中止线程池
tryTerminate();
int c = ctl.get();
// 检查线程池状态,线程池处于RUNNABLE或者SHUTDOWN则进入
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 线程池最小数量,取决于是否能释放核心线程
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果任务队列还有线程,最起码都要有一个线程来处理任务
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
// 因为线程中断,可能导致没有线程来执行阻塞队列里的任务
// 因此尝试创建线程去执行任务
addWorker(null, false);
}
}
-
CallerRunsPolicy: 如果线程池没有SHUTODOWN的话,直接执行任务 -
AbortPolicy: 抛出异常,说明当前情况的线程池不希望得到接收不了任务的状态 -
DiscardOldestPolicy:丢弃阻塞队列最旧的任务 -
DiscardPolicy:什么也不做
-
线程池倾向于使用核心线程来处理任务,从任务的添加策略可以看出,先考虑创建核心线程处理,再考虑放到阻塞队列,再考虑创建非核心线程处理。以上都不行,则使用任务拒绝策略 -
通过向阻塞队列取任务的不同操作,能确保线程的存活,take()保证核心线程不死,poll()保证非核心线程存活等待一定时间 -
线程池不区分核心线程和非核心线程,线程池是期望达到corePoolSize的并发状态,并允许在不得已情况下超载,达到corePoolSize ~ maximumPoolSize 的并发状态 -
线程池状态和线程数量用ctl表示,高三位为数量,低29位为当前线程池数量 -
线程池对状态的检测非常苛刻,几乎在所有稍微耗时或影响下一步操作正确性的代码前都校验ctl
总结就是这个问题的答案
通过阻塞队列poll(),让线程阻塞等待一段时间,如果没有取到任务,则线程死亡
通过阻塞队列take(),让线程一直等待,直到获取到任务
将allowCoreThreadTimeOut设置为true。可用下面代码实验
// 伪代码
{
// 允许释放核心线程,等待时间为100毫秒
es.allowCoreThreadTimeOut(true);
for(......){
// 向线程池里添加任务,任务内容为打印当前线程池线程数
Thread.currentThread().sleep(200);
}
}
线程池不区分核心线程于非核心线程,只是根据当前线程池容量状态做不同的处理来进行调整,因此看起来像是有核心线程于非核心线程,实际上是满足线程池期望达到的并发状态。
线程执行Worker,Worker不断从阻塞队列里获取任务来执行。如果任务加入线程池失败,则在拒绝策略里,还有处理机会。
这就要看任务类型是计算密集型任务还是IO密集型任务了,区别在于CPU占用率。计算密集型任务涉及内存数据的存取,CPU处于忙绿状态,因此并发数相应要低一些。而IO密集型任务,因为外部设备速度不匹配问题,CPU更多是处于等待状态,因此可以把时间片分给其他线程,因此并发数可以高一些。
-
CachedThreadPool:适合异步任务多,但周期短的场景 -
FixedThreadPool:适合有一定异步任务,周期较长的场景,能达到有效的并发状态 -
SingleThreadExecutor:适合任务串行的场景 -
ScheduledThreadPool:适合周期性执行任务的场景
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
https://blog.csdn.net/weixin_28760063/article/details/81266152
https://www.jianshu.com/p/9a8c81066201
https://www.jianshu.com/p/a5a21d48678a
https://www.jianshu.com/p/0da2939391cf
以上是关于关于线程池的这 8 个问题你都能答上来吗?的主要内容,如果未能解决你的问题,请参考以下文章