通过实例来透彻分析线程池的源码---ThreadPoolExecutor
Posted 小猪快跑22
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过实例来透彻分析线程池的源码---ThreadPoolExecutor相关的知识,希望对你有一定的参考价值。
前言
其实大概1年前就想把线程池的源码完整的撸一遍了,但是看的时候太注重细节了,比如其中的CAS操作、AQS以及ReentrantLock的lock、tryLock等,结果就是跑偏了;所幸今年年初的时候有时间,就把AQS的源码、CAS的使用以及ReentrantLock等一些并发编程的源码撸了一遍,再来看线程池的源码,感觉还是非常的舒爽,哈哈哈。共勉。
文章中设计到的 CAS操作,以及AQS操作可以参考我之前的博文:
讲线程池的原理之前,先得了解一下线程池中几个重要的概念。
-
核心线程数 (corePoolSize):核心线程的数量;它的作用可以这样理解:向线程池中添加任务,如果线程池中的线程数量小于 corePoolSize,那么直接新建线程执行任务;如果线程池中的线程数量大于corePoolSize,那么就会往 阻塞队列workQueue中添加任务,此时如果阻塞队列满了且线程池中的线程数量小于最大线程数 maximumPoolSize,那么也会新建一个线程执行任务;如果阻塞队列满且线程数量大于最大线程数maximumPoolSize,那么会执行饱和策略,默认的策略是抛弃要加入的任务。
-
最大线程数 (maximumPoolSize):如果阻塞队列满了,则判断线程池中的线程数量是否小于 maximumPoolSize,是则直接新建一个线程来处理任务,否则执行饱和策略。
-
阻塞队列**(workQueue)**:线程池中的线程数量大于核心线程的数量,则将新建的任务加入到阻塞队列。
-
空闲线程的存活时间 (keepAliveTime):线程空闲下来之后,线程的存活时间,超过这个时间还没有任务执行,则结束该线程。注意,这个回收只是回收非核心线程,比方说核心线程数是2,最大线程数是6,假设任务非常多,最后创建了6个线程来执行任务,最后后回收4个非核心线程,而核心线程不会回收,除非你任务设置要回收核心线程。
-
饱和策略 (RejectedExecutionHandler):当等待队列已满,线程数也达到最大线程数时,线程池会根据饱和策略来执行后续操作,默认的策略是抛弃要加入的任务。
下面来一张图来说明下:
一、线程池的几种状态:
- RUNNING: 运行状态,能够接受新的任务且会处理阻塞队列中的任务。
- SHUTDOWN:关闭状态,不接受新任务,但是会处理阻塞队列中的任务,执行线程池的 shutDown()对应的就是此状态。
- STOP: 停止状态,不接受新的任务,也不会处理等待队列中的任务并且会中断正在执行的任务。调用线程池的 shutDownNow()对应的是此状态
- TIDYING: 整理,即所有的任务都停止了,线程池中线程数量等于0,会调用 terminated()如果你自己实现线程池的话。
- TERMINATED: 结束状态,terminated()方法执行完了。
下面是一些重要的变量注释:
//CAS, 它的高三位表示线程池的状态,低29位表示线程池中现有的线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//表示线程池线程数的bit数
private static final int COUNT_BITS = Integer.SIZE - 3;
//最大的线程数量,数量是完全够用了 0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//1110 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING = -1 << COUNT_BITS;
//0000 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
//0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
//0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
//获取线程池的状态
private static int runStateOf(int c) return c & ~CAPACITY;
//获取线程池中已创建线程的数量
private static int workerCountOf(int c) return c & CAPACITY;
//组装状态和数量,成为ctl
private static int ctlOf(int rs, int wc) return rs | wc;
private static boolean runStateLessThan(int c, int s)
return c < s;
private static boolean runStateAtLeast(int c, int s)
return c >= s;
//判断线程是否在运行
private static boolean isRunning(int c)
return c < SHUTDOWN;
二、线程池的添加任务的源码解析:
我这里为了能将的清楚,我将线程池的主要代码都Copy了,加了些注释,其他的没变。
为了说明例子,我这里的线程池定义如下:
ExecutorService executorService = new ThreadPoolExecutor(1, 2, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1));
其中 核心线程数等于1,最大线程数等于2,空闲线程的存活时间为100毫秒,阻塞队列是大小为1的数组类型的队列,此队列只能存放一个任务且不会自动扩容。
下面我new了3个任务放进线程池,如下:
Runnable runnable1 = () ->
Log.e("test thread pool", "r1 start");
try
Thread.sleep(3_000);
catch (InterruptedException e)
e.printStackTrace();
Log.e("test thread pool", "r1 end");
;
Runnable runnable2 = () ->
Log.e("test thread pool", "r2 start");
try
Thread.sleep(3_000);
catch (InterruptedException e)
e.printStackTrace();
Log.e("test thread pool", "r2 end");
;
Runnable runnable3 = () ->
Log.e("test thread pool", "r3 start");
try
Thread.sleep(3_000);
catch (InterruptedException e)
e.printStackTrace();
Log.e("test thread pool", "r3 end");
;
// 直线线程池的添加任务:
executorService.execute(runnable1);
executorService.execute(runnable2);
executorService.execute(runnable3);
按照上面讲的线程池原理那么执行的流程应该是这样:
先创建一个线程来执行Runnable1,此时核心线程数等于1,那么会把Runnable2放到阻塞队列中去,由于阻塞对列只能存放1个任务,且最大线程数等于2,那么会新建一个线程来执行Runnable3。
注意,我这里的3个任务我都sleep(3_000),我为什么这里要让你们注意这里呢?
因为,如果我的任务1的任务很简单就是打印一行日志的话,那么这个任务很快就会执行完,那么可能在执行任务3的时候,任务1已经执行完,那么执行任务1的线程就会去阻塞队列中将任务2出队且执行,那么任务3就会被加入到阻塞队列中。
execute 方法:
public void execute(Runnable command)
if (command == null)
throw new NullPointerException();
int c = ctl.get();
int wtc = workerCountOf(c); // 计算线程池中当前线程的数量
Log.e("test thread pool", "c = " + c + ", wtc = " + wtc);
// 如果线程的数量小于核心线程数,那么直接提交任务,并且创建线程来执行任务
if (wtc < corePoolSize)
if (addWorker(command, true))
return;
// 如果提交任务失败,可能是线程池执行了shunDown或shutDownNow操作,
// 那么重新获取ctl的值,执行下面的流程
c = ctl.get();
Log.e("test thread pool", "addWorker failed c = " + c);
// 如果线程池是运行状态,那么将任务添加到阻塞队列,执行到这里只有2个条件:
// 条件1:核心线程数已满
// 条件2:线程池执行了shunDown或shutDownNow操作
if (isRunning(c) && workQueue.offer(command))
int recheck = ctl.get();
Log.e("test thread pool", "recheck = " + recheck);
// 如果当前线程池不是运行状态,那么将任务从阻塞队列中移除并执行拒绝策略
if (!isRunning(recheck) && remove(command))
Log.e("test thread pool", "...reject...");
reject(command);
else if (workerCountOf(recheck) == 0) // 如果线程池中线程数量等于0,那么就添加一个空任务,目的就是继续执行阻塞队列中的任务
Log.e("test thread pool", "...addWorker a null task...");
addWorker(null, false);
// 如果核心线程数已满且阻塞队列已满,那么就开启一个新线程来执行任务,
// 如果添加失败则执行抛弃策略
// 这里面的失败的条件,一般是执行下面addWorker(command, false)的时候,
// 另外一个线程执行了线程池的shutDown()操作,这种情况基本不会出现,
//因为线程池的操作如extcute或shutDown一般都是主线程中的,
// 所以 addWorker和shutdown都是顺序执行的,不会出现失败的情况。
else if (!addWorker(command, false))
Log.e("test thread pool", "...reject..2....");
reject(command);
按照之前自定义的线程池executorService = new ThreadPoolExecutor(1, 2, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1))
执行的结果如下:
只看我用绿框框出来的即可,其他log是我跟踪分析的时候用的。
结果分析:
开始时因为线程池中线程个数是0,而且核心线程数是1,所以直接创建一个线程来执行任务1;
添加任务2的时候,此时线程池中线程的个数等于1了,即核心线程已满,所以将任务2添加到阻塞队列中;
添加任务3的时候,核心线程已满且阻塞队列已满(这里我队列的大小设置为1,即只能存放1个任务),但是线程池中的线程数小于最大的线程数(2个),所以会新建一个线程执行任务3。从上面的日志可以看出,执行任务1和任务3的线程分别是 :pool-1-thread-1 和 pool-1-thread-2
日志还可以看出,执行任务1和任务2的线程都是 pool-1-thread-1,这里面涉及到的就是线程池中线程的复用,到底是怎么实现的呢?
那就得看看 之前的addWorker方法啦:
private boolean addWorker(Runnable firstTask, boolean core)
retry:
for (; ; )
int c = ctl.get();
int rs = runStateOf(c); // 获取线程池的状态
Log.e("test thread pool", "addWorker rs = " + rs + ", firstTask = " + firstTask + ", ....... core = " + core);
//如果线程池的状态到了SHUTDOWN或者之上的状态时候,只有一种情况还需要继续添加线程,
//那就是线程池已经SHUTDOWN,但是队列中还有任务在排队,而且不接受新任务(所以firstTask必须为null)
//这里还继续添加线程的目的是,尽快完成阻塞队列中的任务
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (; ; )
// 获取线程个数
int wc = workerCountOf(c);
// 如果线程数大于CAPACITY 或者线程数大于等于核心线程数或者最大线程数
// 表示添加任务失败
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 线程池中线程的个数加1,如果成功的话直接跳出最外层的for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 检测当前线程状态如果发生了变化,则继续回到retry,重新开始循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try
// 用Worker类包装任务,真正的执行任务就是在Worker中
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加锁
try
int rs = runStateOf(ctl.get()); // 获取线程池的状态
// rs < SHUTDOWN 表示线程池是运行状态
// (rs == SHUTDOWN && firstTask == null) 表示线程池执行了shutDown,
// 且阻塞队列中还有任务,这时候需要添加一个空的任务,即创建新的线程来加速阻塞队列中的任务尽快完成
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null))
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
finally
mainLock.unlock();
if (workerAdded)
Log.e("test thread pool", "addWorker start");
// 这里真正的在线程中处理任务啦
t.start();
workerStarted = true;
finally
// 添加任务出错的机制
if (!workerStarted)
addWorkerFailed(w);
return workerStarted;
addWorker的代码注释的很清楚,代码也比较简单,下面主要看看里面真正的任务执行类 Worker,和 添加任务失败的方法 addWorkerFailed
真正的任务执行类 Worker
Worker 的代码很简单,主要就是继承了AQS来加、解锁,以及创建线程来执行任务,构造函数如下:
Worker(Runnable firstTask)
setState(-1); // 这里面就是设置AQS中state的值为-1,用途是调用shutDown时根据状态来响应中断操作的,
// 要执行的 Runnable任务
this.firstTask = firstTask;
// 通过线程工厂方法来创建新的线程
this.thread = getThreadFactory().newThread(this);
真正执行的任务方法 runWorker:
final void runWorker(Worker w)
Log.e("test thread pool", "runWorker begin task = " + w.firstTask);
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try
// 如果task你为空,或者去阻塞队列中去取任务不为空,这里的getTask 如果阻塞队列中任务为空 会阻塞当前线程
// 这里就是线程复用的核心,比方说当这个程执行完当前任务后,就去队列中取任务来执行,这就完成了线程的复用
while (task != null || (task = getTask()) != null)
Log.e("test thread pool", "runWorker while task = " + task);
w.lock();
// 第一个条件只要调用了shutDownNow才会成立,如果调用了shutDownNow 那么就会执行线程的中断即中断正在执行的任务
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
Log.e("test thread pool", "runWorker interrupt");
wt.interrupt(); // 中断操作
try
beforeExecute(wt, task);
Throwable thrown = null;
try
task.run(); // 开始执行任务
catch (RuntimeException x)
thrown = x;
throw x;
catch (Error x)
thrown = x;
throw x;
catch (Throwable x)
thrown = x;
throw new Error(x);
finally
afterExecute(task, thrown);
finally
task = null;
w.completedTasks++;
w.unlock();
completedAbruptly = false;
finally
Log.e("test thread pool", "runWorker processWorkerExit");
// 处理线程完成所有任务后的操作,注意,这里可能执行不到,为什么?
// 因为上面注释说了,getTask取队列中的任务时,
// 如果队列为空,那么就会阻塞住当前线程,所有这里就执行不到了;
// 但是调用shutDown就能执行到这里了,这也是为什么我们经常看到线程池的例子都是excute(r)后调用shutDown()
processWorkerExit(w, completedAbruptly);
下面再看看 getTask 方法
getTask 方法 就是从阻塞队列中获取待执行的任务,按照先进先出的原则取任务。
private Runnable getTask()
boolean timedOut = false; // Did the last poll() time out?
for (; ; )
int c = ctl.get();
int rs = runStateOf(c); // 获取线程池状态
Log.e("test thread pool", "--getTask ----- c = " + c + ", rs = " + rs);
// 线程池的状态是大于等于 SHUTDOWN 且 状态大于等于STOP或者阻塞队列为空
// 这里满足的条件有2种:
// 1. 调用shutDown 后直到阻塞队列中的任务都执行完
// 2. 调用 shutDownNow() 后线程池的状态就变成STOP了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) // **注1**
decrementWorkerCount(); // 线程池中的线程数减1
Log.e("test thread pool", "--getTask c = " + ctl.get());
// retur以上是关于通过实例来透彻分析线程池的源码---ThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章
线程池的使用及ThreadPoolExecutor的execute和addWorker源码分析
通过ThreadPoolExecutor源码分析线程池实现原理
Java 线程池 ThreadPoolExecutor源码简析