知其然而知其所以然~线程池深入源码分析-手把手debug源码系列
Posted 张子行的博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了知其然而知其所以然~线程池深入源码分析-手把手debug源码系列相关的知识,希望对你有一定的参考价值。
本文将从为什么使用线程池 ?—> 线程池的使用方式----> 线程池源码逐步分析线程池
为什么使用线程池?
- 降低资源开销:通过重复利用已创建的线程降低创建销毁线程带来的开销
- 提高响应速度:当有任务到达时,任务可以不需要等待线程创建就可以立即执行
- 提高线程的可管理性:使用线程池可以对线程进行统一的监控管理
线程池的常见创建形式有哪几种?
- 通过Executors对象创建我们线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
- 通过实例化ThreadPoolExecutor创建我们的线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10,
20,
1,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
通过ThreadPoolExecutor这种方式创建线程池可能稍微有点繁琐,但是对比通过Executors创建的线程池,可控性会提高很多。推荐使用ThreadPoolExecutor()
构建ThreadPoolExecutor参数释义
- 核心线程数(corePoolSize)
- 最大线程数(maximumPoolSize)
- 空闲线程活跃时间(keepAliveTime)
- 阻塞队列(queue)
- 饱和策略(policy)
核心线程数~最大线程数~阻塞队列之间的关系图解
- 如果线程数小于核心线程数那么正常执行任务
- 如果线程数大于核心线程数且线程池状态为Running,且阻塞队列未满,那么将当前线程添加到阻塞队列。如果此时线程池状态不是运行状态,移除任务执行饱和策略,反之开启一个新的work,从阻塞队列中拿取任务然后运行
- 如果阻塞队列已经满了,当前任务添加到阻塞队列失败,执行reject(饱和策略)
创建好的线程池后,我们直接将任务submit()或者execute()就好了。
submit与execute有什么区别?
- submit用于执行有返回值的任务(CallAble)
- execute用于执行没有返回值的任务(RunnAble)
好了前言就到这吧,书写如下demo开始debug线程池源码
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10,
20,
1,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(5),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
FutureTask<Integer> result = new FutureTask<Integer>(new task2());
/**
* execute用于执行没有返回值的任务
*/
threadPoolExecutor.execute(new task());
/**
* submit用于执行有返回值的任务。
*/
Future<?> submit = threadPoolExecutor.submit(new task2());
System.out.println(submit.get());
threadPoolExecutor.shutdown();
}
static class task implements Runnable {
@Override
public void run() {
System.out.println("task runnable");
}
}
static class task2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("task callable");
return 1;
}
}
上述demo运行结果图
直接在 threadPoolExecutor.execute(new task()) 这断点,看线程池是如何执行任务的?
线程池是如何执行任务的?
public void execute(Runnable command) {
/**
* 任务为null直接抛出异常
*/
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果线程数量小于核心线程数,
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
ThreadPoolExecute变量是如何设计的?
俺看了一眼上述的代码execute()直呼瑟瑟发抖。先别急研究上面的代码,先来分析一下ThreadPoolExecutor类中的各个变量的含义如下图
从上图可知线程池的状态有那几种?
- 运行状态:RUNNING = -1 << COUNT_BITS;
- 关闭状态:SHUTDOWN = 0 << COUNT_BITS;
- 停止状态:STOP = 1 << COUNT_BITS;
- 过度状态:TIDYING = 2 << COUNT_BITS;
- 结束状态:TERMINATED = 3 << COUNT_BITS;
COUNT_BITS = 32 - 3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
-1 的补码为:11111111111111111111111111111111
RUNNING = -1 << COUNT_BITS = -1 << 29 = 11100000000000000000000000000000
所以 RUNNING = 11100000000000000000000000000000 = -536870912
同理可得 SHUTDOWN = 00000000000000000000000000000000 = 0
同理可得 STOP = 00100000000000000000000000000000 = 536870912
同理可得 TIDYING = 01000000000000000000000000000000 = 1073741824
同理可得 TERMINATED = 01100000000000000000000000000000 = 1610612736
小结:观察每个线程状态的高三位都是独一无二的,高三位记录着线程运行状态,而后29位记录着对应状态的线程数(work数量)。(初始状态后29位组成全是0)
如何确定线程池的状态?
runStateOf:确定线程池的状态的方法
COUNT_BITS = 32 - 3 = 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static int runStateOf(int c) { return c & ~CAPACITY; }
CAPACITY = 1<<29 -1 = 00011111111111111111111111111111(线程的最大容量)
~CAPACITY = 11100000000000000000000000000000
c & ~CAPACITY 计算出来的结果永远只与 c 的高三位有关,因此通过 c&~CAPACITY 操作可以确定线程池的运行状态
不论是运行状态还是什么别的状态,它们的前三位都是独一无二的,也就是说高三位是记录线程池的运行状态的
如何得到线程池中线程数量?
workerCountOf:可以确定线程池中的线程数量(work对象的数量)
private static int workerCountOf(int c) { return c & CAPACITY; }
- CAPACITY = 1<<29 -1 = 00011111111111111111111111111111
由于 CAPACITY 高三位全是0 ,所以c & CAPACITY的结果只与低29位有关。
例如:假设线程数为1 , 1转换成补码为 00000000000000000000000000000001 与 CAPACITY做 & 运算,得到的结果为 1。
如何初始化线程池状态、线程初始数量?
ctlOf:初始化线程池状态、数量的方法
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
拿ctlOf(RUNNING, 0)的返回值举例 RUNNING|0 = 11100000000000000000000000000000 | 0 = 11100000000000000000000000000000。
即:初始化了线程池状态为RUNNING 且初始线程数量是 0 。
开始真正分析线程池执行流程(execute)
- 如果线程数小于核心线程数那么正常执行任务
- 如果线程数大于核心线程数且线程池状态为Running,且阻塞队列未满,那么将当前线程添加到阻塞队列。如果此时线程池状态不是运行状态,移除任务执行饱和策略,反之从阻塞队列中拿取任务然后运行
- 如果阻塞队列已经满了,当前任务添加到阻塞队列失败,执行reject(饱和策略)
public void execute(Runnable command) {
/**
* 任务为null直接抛出异常
*/
if (command == null)
throw new NullPointerException();
/**
* 获取线程状态、数量。默认线程状态为为运行状态
*/
int c = ctl.get();
//如果当前工作线程数量 < 核心线程数量
if (workerCountOf(c) < corePoolSize) {
//将任务添加到works(hashset)中,同时尝试运行当前任务和阻塞队列中的任务
//如果线程池中线程数量大于核心线程数那么会拒绝运行此任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程池状态为运行状态,此时工作线程数量大于核心线程数量,如果此时阻塞队列没有满,那么将任务尝试放入阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//重新检查线程池状态如果此时是非运行状态,移除任务,同时执行
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
//开启一个work,运行阻塞队列中的任务
//如果当前线程池数量大于最大线程数量那么添加任务失败
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker
addWorker(Runnable firstTask, boolean core) 第二个参数:如果core 为
true,那么表示如果线程数大于核心线程数将会拒绝任务的启动,如果为false,表示如果大于最大线程数,将会拒绝任务的启动。
此方法关系到线程的启动,先过一遍代码注释,下面有总结。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//下面判断条件等价于:rs >= SHUTDOWN && rs != SHUTDOWN || rs >= SHUTDOWN && firstTask != null || rs >= SHUTDOWN && workQueue.isEmpty()
//1.线程池状态不是运行状态,是STOP、TIDYING、TERMINATED中的一种拒绝执行任务
//2.线程池状态不是运行状态,是其他状态中的一种,任务不为null,将会拒绝任务
//3.线程池状态不是运行状态,是其他状态中的一种,,任务队列为空,将会拒绝任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//死循环
for (;;) {
//线程数
int wc = workerCountOf(c);
//1.线程数超过了最大容量将会拒绝任务
//2.线程数大于核心线程数,或者大于最大最大线程数,将会拒绝任务
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS操作,对work数量加一
if (compareAndIncrementWorkerCount(c))
break retry;
//其他CAS操作失败的线程一直自旋
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//将任务包装成一个Worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//1.线程池是运行状态,将任务添加至工作集合
//2.任务为null且线程池是SHUTDOWN状态,也将任务添加进工作集合
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//判断线程是否存活
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//works为一个hashSet
workers.add(w);
//工作集合的大小
int s = workers.size();
if (s > largestPoolSize)
//更新最大线程池的大小
largestPoolSize = s;
//标记任务添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//开启任务
t.start();
//标记任务启动成功
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
拒绝任务的情况分析(addWorker失败)
rs >= SHUTDOWN && rs != SHUTDOWN || rs >= SHUTDOWN && firstTask != null || rs >= SHUTDOWN && workQueue.isEmpty()
上面的代码和下面的代码可以等价替换
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
- 线程池状态不是运行状态,是STOP、TIDYING、TERMINATED中的一种直接拒绝任务
rs >= SHUTDOWN && rs != SHUTDOWN
- 要想进入if分支必然只可能是第一点中的 rs != SHUTDOWN 为 false ,也就是说线程池状态不是运行状态,且任务不为null,将会直接拒绝任务
rs >= SHUTDOWN && firstTask != null
- 线程池状态不是运行状态,阻塞队列为空,将会直接拒绝任务
rs >= SHUTDOWN && workQueue.isEmpty()
- 线程数超过了最大容量将会直接拒绝任务
- 线程数大于核心线程数,或者大于最大最大线程数,也会直接拒绝任务
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
添加至工作集合(workers)情况分析
- 线程池是运行状态,将任务添加至工作集合
- 任务为null且线程池是SHUTDOWN状态,也将任务添加进工作集合
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null))
添加成功后,将会启动对应Work中的Thread
线程启动流程(start)
运行到 t.start(); 将会来到Work内部的run方法
runWorker
一直循环如果当前任务不为null且阻塞队列存在任务就一直执行任务,即运行work中的task.run(),里面比较重要的一个点就是
如果线程池正在停止,请确保线程被中断;如果没有,请确保线程不被中断。这需要在第二种情况下重新检查以处理shutdownNow这种情况,同时清除中断。
runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()||( (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) && !wt.isInterrupted() )
- 如果线程池状态为STOP、TIDYING、TERMINATED中的状态之一且任务线程没有被中断,将会中断当前任务线程
- 如果线程池状态为RUNNING或SHUTDOWN,但是当前线程已经中断,重新检查线程池状态如果为STOP、TIDYING、TERMINATED中的状态之一,将会中断当前任务线程。不论线程是否中断task.run()都会运行
这里设置线程中断的目的就是:无论线程池是处于什么状态,我们在task.run()期间都能获取到合理的线程状态
while (task != null || (task = getTask()) != null) {
}
上面这段代码task == null 出现的情况是 addWork(null,false),才会出现task == null,此时将会直接处理阻塞队列中的任务
runWorker源代码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//拿到Work对象中的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//task == null 的情况即为addWork(null,false)
//如果当前任务不为null或者阻塞队列中能获取到任务
while (task != null || (task = getTask()) != null) {
//加锁,与shutdown有关
w.lock();
//条件可以拆分runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()||( (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)) && !wt.isInterrupted() )
//1.如果线程池处于STOP、TIDYING、TERMINATED中的状态之一 ,且当前线程没有被中断,中断当前线程
//2.如果第一点不满足,只可能!wt.isInterrupted() 为 true ,runStateAtLeast(ctl.get(), STOP) 为 false。那么当前线程池的状态只可能处于RUNNING或SHUTDOWN状态 如果当前线程(可以理解为第一个任务线程)中断了,此时再重新检查一下线程池状态,如果线程池还处于STOP、TIDYING、TERMINATED中的状态之一,那么中断work中的线程(任务线程)
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//空方法实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//运行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//空方法实现
afterExecute(task, thrown);
}
} finally {
//滞空任务
task = null;
//记录完成任务的个数
w.completedTasks++;
//解锁
w.unlock();
}
}
深入浅出Java并发编程指南「源码分析篇」透析ThreadPoolExecutor线程池运作机制和源码体系