Java Executor源码解析—ThreadPoolExecutor线程池execute核心方法源码一万字
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Executor源码解析—ThreadPoolExecutor线程池execute核心方法源码一万字相关的知识,希望对你有一定的参考价值。
基于JDK1.8详细介绍了ThreadPoolExecutor线程池的execute方法源码!
上一篇文章中,我们介绍了:Java Executor源码解析(2)—ThreadPoolExecutor线程池的介绍和基本属性。这一篇文章,我们将会介绍ThreadPoolExecutor线程池的execute方法源码,该方法是线程池的核心方法,非常重要。
文章目录
1 execute核心提交方法
public void execute(Runnable command)
传递一个Runnable任务对象,然后由线程池对它进行异步执行。没有办法检查Runnable是否执行完毕。如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出RejectedExecutionException;如果任务为null,那么抛出NullPointerException。
execute方法是线程池的绝对核心方法,很多其他内部方法都是为该方法服务的,涉及到的流程和代码非常复杂。
execute方法的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 首先就是command任务的null校验,如果为null直接抛出NullPointerException;
- 调用workerCountOf计算运行线程数量,如果小于corePoolSize,即目前在行线程数小于核心线程数:
- 调用addWorker方法尝试启动新线程去执行command任务,使用corePoolSize作为线程数量上限。如果任务提交给新线程成功,那么直接返回true。
- 到这一步,肯定是addWorker方法返回false。表示可能线程池被关闭了,或者线程数量达到了corePoolSize,或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等等情况。
- 到这一步,可能线程池被关闭了,或者线程数量达到了corePoolSize,或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等等情况。判断线程池是否是RUNNING状态,以及尝试加入任务队列。如果线程池还是RUNNING状态并且成功加入任务队列:
- 再次检查线程池是否不是RUNNING状态,以及尝试将任务移除任务队列。若果线程池不是RUNNING状态,并且将任务移除任务队列成功,那么对任务执行拒绝策略。
- 否则,表示线程池还是RUNNING状态,或者线程池不是RUNNING状态,但是任务移除队列失败。判断当前工作线程数是否为0,如果是,尝试启动新线程,从队列中获取任务,使用maximumPoolSize作为线程数量上限,这一步为了维持线程池的活性。 因为有可能在新任务offer加入队列的时候,其他工作线程都因为超时或者SHOTDOWN而被清理,此时仍然需要新开线程对加入进来的任务进行完成。
- 否则,表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是加入任务队列失败,即任务队列满了。尝试启动新线程去执行command任务,使用maximumPoolSize作为线程数量上限。如果失败,则表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是任务队列满了且工作线程数量达到了最大值maximumPoolSize,那么执行拒绝策略。
上面的判断都没有加锁,因此状态可能都是瞬时性的判断,不保证一直有效。
/**
* 传递一个Runnable任务对象,然后由线程池对它进行异步执行。没有办法检查Runnable是否执行完毕。
* <p>
* 如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出 RejectedExecutionException;
* 如果任务为null,那么抛出NullPointerException。
*
* @param command 要执行的任务
* @throws RejectedExecutionException 如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出 RejectedExecutionException
* @throws NullPointerException 如果任务为null
*/
public void execute(Runnable command)
//首先就是command任务的null校验,如果为null直接抛出NullPointerException
if (command == null)
throw new NullPointerException();
//获取ctl的值c
int c = ctl.get();
/*1 调用workerCountOf计算线程数量,如果小于corePoolSize*/
if (workerCountOf(c) < corePoolSize)
//尝试启动新线程去执行command任务,使用corePoolSize作为线程数量上限
if (addWorker(command, true))
return;
//到这里还没有返回,那么表示addWorker失败。
//可能线程池被关闭了,或者线程数量达到了corePoolSize,或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等情况
//重新获取ctl的值c
c = ctl.get();
/*
* 到这一步,可能线程池被关闭了,或者线程数量达到了corePoolSize,
* 或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等等情况。
*
* 2 如果线程池还是RUNNING状态,并且成功加入任务队列
*/
if (isRunning(c) && workQueue.offer(command))
//获取ctl的值c
int recheck = ctl.get();
/*再次检查,如果线程池不是RUNNING状态,并且将任务移除队列成功*/
if (!isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
/*
* 否则,表示线程池还是RUNNING状态,或者线程池不是RUNNING状态,但是任务移除队列失败。
* 判断如果当前工作线程数为0
*/
else if (workerCountOf(recheck) == 0)
//尝试启动新线程,从队列中获取任务,使用maximumPoolSize作为线程数量上限,无论成功还是失败
//这一步为了维持线程池的活性。
addWorker(null, false);
/*
* 3 否则,表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是加入任务队列失败,即任务队列满了
*
* 尝试启动新线程去执行command任务,使用maximumPoolSize作为线程数量上限
* 如果失败,则表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是任务队列满了且工作线程数量达到了最大值maximumPoolSize
*/
else if (!addWorker(command, false))
//执行拒绝策略
reject(command);
/**
1. 该方法用于执行拒绝策略
*/
final void reject(Runnable command)
handler.rejectedExecution(command, this);
2 addWorker尝试添加新线程
addWorker尝试新建一个工作线程(Worker)并启动去执行任务,同样作为线程池的核心方法。大概步骤为:检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,如果不满足,那么返回false,如果满足,那么表示可以新增Worker,首先CAS的增加ctl的线程计数。随后尝试新增一个Worker,通过线程工厂创建一个线程,将参数中的任务作为第一个任务执行并返回true。如果线程工厂无法创建线程或者返回null,那么将会回滚此前的操作,比如减少线程计数,移除创建的Worker等,最后返回false。
addWorker的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 开启一个死循环,相当于自旋,使用retry标记。检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,如果不满足,那么返回false,如果满足,那么表示可以新增Worker,那么首先CAS的增加ctl的线程计数,成功后退出循环。
- 获取此时最新的ctl值c,通过c获取此时线程池的状态rs。
- 线程池状态校验:如果线程状态的值大于等于SHUTDOWN,那么表示线程池不是运行状态,并且(rs等于SHUTDOWN 并且 firstTask为null 并且 workQueue不为空)这三个条件有一个成立,那么直接返回false,adderWorker失败。即只有RUNNING状态,或者SHUTDOWN状态并且不是新添加任务的请求并且任务队列不为空,满足这两种情况的一种,才进行下一步;
- 到这一步,说明通过判断线程池状态校验通过。内部再开启一个死循环:首先校验线程数量,如果不符合规则,那么直接返回false。随后尝试预先CAS的将WorkerCount线程数量自增1,成功之后退出整个双层大循环,继续下一步。失败之后会重新获取ctl的值,然后判断线程状态是否改变,如果状态改变了那么进入下一次外层循环,再次进行状态校验如果状态没有改变,那么那么进入下一次内层循环,不断地循环重试CAS操作即可。
- 获取此时最新的线程数量wc。
- 线程数量校验:如果wc大于等于CAPACITY(最大线程数),或者wc大于等于匹配最大值边界(如果core参数为true就是corePoolSize,否则就是maximumPoolSize)。满足两种条件的一个,那么直接返回false,表示线程数量不符合要求。
- 到这一步,表示满足线程数量要求。那么尝试预先CAS的将WorkerCount线程数量自增1,即ctl值自增1,CAS操作成功之后,直接break retry跳出外层循环,这一步就算完成了,进入下一步尝试新增Worker。
- 到这一步返回跳出循环,表示CAS失败,重新获取最新的ctl值c。
- 继续判断如果线程池运行状态不等于rs,线程池的状态改变了,我们知道线程池的状态是不可逆的,那么continue retry结束内层循环,结束本次外层循环,继续下一次外层循环,将可能在下一次的外层循环中因为线程状态的原因而返回false 并退出。
- 到这一步还没跳出循环,表示CAS失败,并且线程池状态也没有改变,那么重新开始下一次内层循环重试CAS即可。
- 到这一步,表示线程池状态和线程数量校验通过,并且已经预先新增了WorkerCount线程数量的值,下面是尝试新增Worker并启动线程的逻辑。
- workerStarted表示新增的工作线程是否已启动的标志位,初始化为false,表示未启动。workerAdded表示新增的工作线程是否已加入workers集合,初始化为false,表示未加入。w表示需要新增的Worker对象,初始化为null。
- 开启一个try代码块:
- 新建一个Worker赋给w,传入firstTask参数,作为将要执行的第一个任务,在构造器中还会通过线程工厂初始化一个新线程。
- 获取w内部的新线程t。如果t不为null:
- 下面的步骤涉及到workers集合的改动以及新线程的执行,甚至其他参数比如largestPoolSize的改动,需要获取mainLock锁,成功之后进行下一步并开启一个try代码块:
- 获取此时的线程池运行状态值rs。
- 首先进行的是再次的检查线程池状态,因为可能在最上面的循环之后-获取锁之前,线程池的状态发生改变,比如被停止。,如果rs小于SHUTDOWN,即属于RUNNING状态,或者rs属于SHUTDOWN状态,并且firstTask为null(不是新增任务的请求)满足这两个条件中的一个,才可以真正的开启线程:
- 继续校验线程t是否是活动状态,因为如果线程已经处于活动状态,表示已经执行了start()方法,即已经开始执行了run方法。那么这个线程就不能再执行新任务,不符合要求,由调用方直接抛出IllegalThreadStateException异常。
- 上面的校验通过,那么将新建的Worker加入workers的set集合,这是blockingQueue自己的方法。
- 获取workers的数量s,就表示目前工作线程数量,如果s大于largestPoolSize,即大于历史最大线程数量,那么largestPoolSize更新为s。
- workerAdded置为true,表示新增的工作线程已加入workers集合。
- 最终,无论上面的代码是成功了还是发生了异常,都需要在finally中解锁mainLock。
- 解锁成功之后的代码。如果workerAdded为true,即新增的工作线程已加入workers集合,说明操作完全没问题,一切正常:
- 那么启动线程t,线程t将会执行Worker中的run方法。workerStarted置为true,表示新增的工作线程已启动。
- 下面的步骤涉及到workers集合的改动以及新线程的执行,甚至其他参数比如largestPoolSize的改动,需要获取mainLock锁,成功之后进行下一步并开启一个try代码块:
- 最终,无论上面的try代码是成功了还是发生了异常,都走finally语句:
- 如果workerStarted为false,即新增的工作线程最终没能启动成功,那么调用addWorkerFailed对此前可能做出的改变进行“回滚”。否则finally中什么也不做。
- 返回新增的工作线程已启动的标志workerStarted,如果已启动线程,表示新增工作线程成功,否则表示新增工作线程失败。
/**
1. 检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,如果不满足,那么返回false,如果满足,那么表示可以新增Worker,
2. 首先CAS的增加ctl的线程计数。随后尝试新增一个Worker,通过线程工厂创建一个线程,将参数中的任务作为第一个任务执行并返回true。
3. 如果线程工厂无法创建线程或者返回null,那么将会回滚此前的操作,比如减少线程计数,移除创建的Worker等,最后返回false。
4. 5. @param firstTask 新线程应首先运行的任务,如果没有,则为null,将会从队列中获取任务
6. 当线程数量少于corePoolSize时,总会启动一个线程来执行新添加的任务,或者当队列已满时,同样可能需要新启动要给线程来执行新添加的任务
7. @param core 如果为true,那么使用corePoolSize作为线程数量边界大小,否则使用maximumPoolSize作为线程数量边界大小
8. 这里不是用具体的参数值而是boolean类型,是因为,比较大小的时候需要随时获取最新值
9. @return 添加Worker成功,返回true;否则返回false
*/
private boolean addWorker(Runnable firstTask, boolean core)
/*
* 1 开启一个死循环,相当于自旋
* 检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,
* 如果不满足,那么返回false,如果满足,那么表示可以新增Worker,那么首先CAS的增加ctl的线程计数,成功后退出循环
*/
retry:
for (; ; )
//获取此时的ctl值
int c = ctl.get();
//通过ctl获取此时线程池的状态rs
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果线程状态的值大于等于SHUTDOWN,那么表示线程池不是运行态
* 并且 !(rs等于SHUTDOWN 并且 firstTask为null 并且 workQueue不为空),即这三个中至少有一个不成立
* 那么直接返回false,adderWorker失败
*
* 分析一下:
* 1 首先是要线程状态不是RUNNING运行态了,那么就是后面的状态比如SHUTDOWN、STOP等,此时才有可能直接返回,运行态是不会在这里返回的
* 2 随后,如果线程状态rs等于SHUTDOWN,那么不一定返回,因为此时任务队列可能存在任务,此时可以新增线程去执行;如果大于SHUTDOWN,那么一定返回,即所有放弃任务
* 继续,如果firstTask为null,那么不一定返回,因为此时可能存在新添加到队列的任务,需要去执行;如果不为null,那么一定是新添加的任务,那么不接受该任务,直接返回
* 继续,如果workQueue.isEmpty()返回false,那么不一定返回,因为此时可能存在新添加到队列的任务,需要去执行;如果为空,那么同样直接返回,因为线程池停止且没有任务可执行了
*
* 即,如果非RUNNING状态,并且是SHUTDOWN状态且firstTask为null(不是新添的任务)并且任务队列不为空
* 那么对于SHUTDOWN状态,此时需要线程去执行已经添加到任务队列中的任务,因此不会在这里返回;如果是其他状态,那么直接返回false即可
*/
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
/*
* 到这一步,说明通过判断线程状态不需要返回,但是这里的判断没有加锁,因此这是不一定的,继续校验
*
* 内部再开启一个循环:
* 首先校验线程数量,如果不符合规则,那么直接返回false
* 随后尝试预先CAS的将WorkerCount线程数量自增1,成功之后退出整个双层大循环,继续下一步
* 失败之后会重新获取ctl的值,然后判断线程状态是否改变,如果状态改变了那么进入下一次外层循环,再次进行状态校验
* 如果状态没有改变,那么那么进入下一次内层循环,不断地循环重试CAS操作即可
*/
for (; ; )
//获取此时的线程数量wc
int wc = workerCountOf(c);
/*
* 如果wc大于等于CAPACITY最大线程数
* 或者 wc大于等于匹配最大值边界(如果core参数为true就是corePoolSize,否则就是maximumPoolSize)
* 那么直接返回false,表示线程数量不符合要求
*
* 分析一下:
* 1 如果线程数量大于等于最大线程数量,那么肯定直接返回false,这表示线程数量达到了最大值,不能够添加新线程了
* 2 如果线程数量大于等于边界线程数量,那么同样直接返回false,这表示不符合最初调用addWorker的情景条件,不能够添加新线程了
* 在调用addWorker方法新增线程之前,如果线程数小于corePoolSize,那么core参数为true;如果如果线程数大于等于corePoolSize,那么core参数为false
* 如果core参数为true,通常希望在新增线程之后线程数要小于等于corePoolSize;如果core参数为false,那么要求在新增线程之后线程数要小于等于maximumPoolSize
* 用于将两种情况区分开来,同时在其他地方,比如核心线程预启动方法中非常有用
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//到这一步,表示满足线程数量要求
//那么尝试预先CAS的将WorkerCount线程数量自增1,即ctl值自增1
if (compareAndIncrementWorkerCount(c))
//CAS操作成功之后,直接break retry跳出外层循环,进入下一步,新增Worker
break retry;
//到这里,表示CAS失败,重新获取最新的ctl值c
c = ctl.get(); // Re-read ctl
//如果线程池运行状态不等于rs,线程池的状态改变了,我们直到线程池的状态是不可逆的,那么
//continue retry结束内层循环,结束本次外层循环,继续下一次外层循环,将可能在下一次的外层循环中因为线程状态的原因而返回false 并退出
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//到这里,表示CAS失败,并且线程池状态也没有改变,那么重新开始下一次内层循环重试CAS即可
//到这一步,表示线程池状态和线程数量校验通过,并且已经预先新增了WorkerCount线程数量的值,下面是尝试新增Worker的逻辑
//workerStarted表示新增的工作线程是否已启动的标志位,初始化为false,表示未启动
boolean workerStarted = false;
//workerAdded表示新增的工作线程是否已加入workers集合,初始化为false,表示未加入
boolean workerAdded = false;
//w表示需要新增的Worker对象,初始化为null
Worker w = null;
try
//新建一个Worker,传入firstTask参数,作为将要执行的第一个任务
//在构造器中还会通过线程工厂初始化一个新线程
w = new Worker(firstTask);
//获取w内部的新线程t
final Thread t = w.thread;
//如果t不为null
if (t != null)
final ReentrantLock mainLock = this.mainLock;
//下面的步骤涉及到workers集合的改动以及新线程的执行,甚至其他参数比如largestPoolSize的改动,需要加锁
//获取mainLock锁
mainLock.lock();
try
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取锁之后首先进行的是再次的检查,因为可能在最上面的循环之后-获取锁之前,线程池的状态发生改变,比如被停止
//获取此时的线程池运行状态值rs
int rs = runStateOf(ctl.get());
/*
* 如果rs小于SHUTDOWN,即属于RUNNING状态
* 或者 rs属于SHUTDOWN状态,并且firstTask为null(不是新增任务的请求)
*
* 满足这两个条件中的一个,才可以真正的开启线程
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null))
/*
* 继续校验线程t是否是活动状态,因为如果线程已经处于活动状态,表示已经执行了start()方法,即已经开始执行了run方法
* 那么这个线程就不能再执行新任务,不符合要求,由调用方直接抛出IllegalThreadStateException异常
* 这里也要求线程工厂返回的线程,仅仅是一个Thread对象即可,不能够start启动而帮倒忙!
*/
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//上面的校验通过,那么将新建的Worker加入workers的set集合,这是blockingQueue自己的方法
workers.add(w);
//获取workers的数量s,就表示目前工作线程数量
int s = workers.size();
//如果s大于largestPoolSize,即大于历史最大线程数量
if (s > largestPoolSize)
//那么largestPoolSize更新为s
largestPoolSize = s;
//workerAdded置为true,表示新增的工作线程已加入workers集合
workerAdded = true;
finally
//最终,无论上面的代码是成功了还是发生了异常,都需要解锁
mainLock.unlock();
//解锁成功之后,如果新增的工作线程已加入workers集合,说明操作完全没问题,一切正常
if (workerAdded)
//那么启动线程t,这是线程t将会执行Worker中的run方法
t.start();
//workerStarted置为true,表示新增的工作线程已启动
workerStarted = true;
finally
//最终,无论上面的代码是成功了还是发生了异常,都走finally语句块
//如果workerStarted为false,即新增的工作线程最终没能启动成功
if (!workerStarted)
//那么调用addWorkerFailed对此前可能做出的改变进行“回滚”
addWorkerFailed(w);
//返回新增的工作线程已启动的标志,如果已启动,表示新增工作线程成功,否则表示新增工作线程失败
return workerStarted;
2.1 addWorkerFailed添加Worker失败处理
addWorkerFailed方法仅仅在addWorker方法中被调用,表示添加Worker失败时的回滚操作,传递的参数就是代表新增Worker的w变量。
addWorkerFailed的详细步骤:
- 获取mainLock锁;
- 如果w不为null,即Worker被添加到了workers中,那么从workers中移除w;
- 调用decrementWorkerCount,循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
- 此时线程池可能不是RUNNING状态,那么调用tryTerminate方法尝试将线程池状态设置为TERMINATED彻底结束线程池。
- 最终解锁。
/**
1. 仅仅在addWorker方法中被调用,处理添加Worker失败时的回滚:
2. 1 如果Worker被添加到了workers中,那么移除Worker
3. 2 workerCount自减1
4. 3 此时线程池可能不是RUNNING或者SHUTDOWN状态,那么尝试将线程池状态设置为TERMINATED彻底结束线程池
5. 6. @param w 新建的Worker
*/
private void addWorkerFailed(Worker w)
final ReentrantLock mainLock = this.mainLock;
//获取mainLock锁
mainLock.lock();
try
//如果w不为null
if (w != null)
//从workers中移除w,这是BlockingQueue的方法
workers.remove(w);
//循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
decrementWorkerCount();
//此时线程池可能不是RUNNING状态,那么尝试将线程池状态设置为TERMINATED彻底结束线程池
tryTerminate();
finally
mainLock.unlock();
2.1.1 tryTerminate尝试终止线程池
tryTerminate方法在ThreadPoolExecutor的shutdown、shutdownNow、addWorkerFailed、remove、purge这几个方法中被调用,这几个方法被调用时,一般都是涉及到线程池状态改变、移除workers集合中的线程、移除任务队列中的任务的情况,该方法用于尝试终止线程池(将线程池状态将尝试转换为TERMINATED)。
tryTerminate的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 开启一个死循环,尝试终止线程池:
- 获取此时最新的ctl值c;
- 线程池状态校验:如果处于RUNNING状态,不能终止线程池;或者如果运行状态值大于等于TIDYING,不需要该请求去终止线程池;或者如果运行状态值等于SHUTDOWN,并且workQueue任务队列不为空,不能终止线程池。以上三个条件满足一种,即可立即返回。
- 到这里,表示:线程池位于STOP状态,或者线程池位于SHUTDOWN状态且workQueue任务队列为空,上面两种状态满足一种,表示都可以尝试关闭线程池,但是也不一定,需要继续判断。
- 如果workerCount线程计数不为0,那么不能终止线程池。此这里可能是SHUTDOWN状态,因为调用shutdown方法仅仅在interruptIdleWorkers方法中中断空闲的线程,可能存在正在运行的线程没有被中断;当然也有可能是STOP状态,即如果还有线程还没有启动(state为-1),那么该线程也不会在shutdownNow的interruptWorkers方法中被中断。
- 执行tryTerminate方法时,运行状态的线程(获取w锁)可能处于空闲状态了(释放w锁),此时主要是需要调用interruptIdleWorkers (ONLY_ONE)尝试对于空闲状态的线程传播SHUTDOWN、STOP状态,让它们都被中断并移除。随后返回。
- 到这里表示工作线程数为0,那么表示可以尝试更改线程池状态,进一步停止线程池。首先获取mainLock锁,因为下面需要唤醒在termination上等待的外部线程。
- 开启一个try代码块:
- 尝试将ctl的值CAS的从c更改为TIDYING状态的值,即转换为TERMINATED状态。如果CAS成功:
- 在另一个try块中执行terminated()钩子方法,该方法是空实现,可以由子类重写。
- 无论terminated是否抛出异常,最终都执行finally块:
- 将ctl的值CAS的从TIDYING更改为TERMINATED状态的值,即转换为TERMINATED状态,表示线程池被彻底停止。
- 唤醒所有调用awaitTermination并且还是处于等待状态的外部线程,通知线程池已经彻底关闭。
- 方法返回。
- 尝试将ctl的值CAS的从c更改为TIDYING状态的值,即转换为TERMINATED状态。如果CAS成功:
- 最终需要在finally块中解锁。
- 到这一步还没有返回,表示CAS操作更改TIDYING失败了,可能是其他线程操作成功了,因此继续下一次循环。如果其他线程操作成功,那么本次的tryTerminate操作将会在下一次循环中,因为判断线程池状态不满足而退出。这里我们能够看出来,线程池状态转换为TIDYING以及TERMINATED的过程是连续的,只会在一个线程的同一次tryTerminate方法调用中完成。
/**
* 以下两种情况,线程池状态将尝试转换为TERMINATED
* 1 SHUTDOWN状态的线程,当任务队列为空并且线程池工作线程数workerCount为0时
* 2 STOP状态的线程池,线程池中工作线程数workerCount为0时
* <p>
* 该方法不是私有的,因为子类ScheduledThreadPoolExecutor也会调用该方法
*/
final void tryTerminate()
/*开启一个死循环,尝试终止线程池*/
for (; ; )
//获取此时最新的ctl值c
int c = ctl.get();
/*
* 如果处于RUNNING状态,不能终止线程池
* 或者 如果运行状态值大于等于TIDYING,不需要该请求去终止线程池
* 或者 如果运行状态值等于SHUTDOWN,并且workQueue任务队列不为空,不能终止线程池
*
* 以上三个条件满足一种,即可立即返回
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
/*
* 到这里,表示:
* 1 线程池位于STOP状态
* 2 或者线程池位于SHUTDOWN状态,且workQueue任务队列为空
*
* 上面两种状态满足一种,表示都可以尝试关闭线程池,但是也不一定,需要继续判断
*
* 如果线程数不为0,那么不能终止线程池
* 这里可能是SHUTDOWN状态,因为调用shutdown方法仅仅在interruptIdleWorkers方法中中断空闲的线程,可能存在正在运行的线程没有被中断
* 当然也有可能是STOP状态,即如果还有线程还没有启动(state为-1),那么该线程也不会在shutdownNow的interruptWorkers方法中被中断
* 执行tryTerminate方法时,运行状态的线程(获取w锁)可能处于空闲状态了(释放w锁),此时主要是需要对于空闲状态的线程传播SHUTDOWN、STOP状态,让它们中断
*/
if (workerCountOf(c) != 0) // Eligible to terminate
/*
* 那么调用interruptIdleWorkers(ONLY_ONE)尝试中断最多一个线程
*
* tryTerminate在ThreadPoolExecutor的shutdown、shutdownNow、addWorkerFailed、remove、purge这几个方法中被调用
* 这几个方法被调用时,一般都是需要改变线程池状态、移除workers集合的线程、移除任务队列中的任务的情况
* 这里是补偿式的尝试中断此时又处于空闲状态的线程,用来方便的传播SHUTDOWN、STOP信号,因为被中断的线程会在processWorkerExit方法中继续调用该方法
* 防止当前所有线程都在等待任务,中断任何任意一个的线程就还可以确保自SHUTDOWN状态开始以来新到达的Worker最终也会退出。
* 为了保证所有线程中断,仅需要中断一个空闲的线程即可,后续中断状态会进行传播式的调用
*/
interruptIdleWorkers(ONLY_ONE);
//返回
return;
final ReentrantLock mainLock = this.mainLock;
//到这里表示工作线程数为0,那么表示可以更改线程池状态,进一步停止线程池。
//首先获取mainLock锁,因为下面需要唤醒在termination上等待的外部线程
mainLock.lock();
try
//尝试将ctl的值CAS的从c更改为TIDYING状态的值
//即删除全部wc工作线程计数部分的值,转换为TERMINATED状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0)))
//如果成功
try
//执行terminated()钩子方法,该方法是空实现,可以由子类重写
terminated();
finally
//无论terminated是否抛出异常,最终,都将ctl的值CAS的从TIDYING更改为TERMINATED状态的值
//即转换为TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
//唤醒因为调用awaitTermination并且还是处于等待状态的外部线程,通知线程池已经彻底关闭
termination.signalAll();
//返回
return;
finally
//解锁
mainLock.unlock();
// else retry on failed CAS
/*
* 到这一步,表示CAS操作ctl.compareAndSet(c, ctlOf(TIDYING, 0)失败,可能是其他线程操作成功了,因此继续下一次循环
* 如果其他线程操作成功,那么本次的tryTerminate操作将会在下一次循环中,因为判断线程池状态不满足而退出
* 这里我们能够看出来,线程池状态转换为TIDYING以及TERMINATED的过程是连续的,只会在一个线程的同一次tryTerminate方法调用中完成
*/
2.1.1.1 interruptIdleWorkers中断空闲线程
尝试中断空闲的线程,即没有获取Worker锁的线程,比如在等待任务的线程,以便它们在后续运行中可以检测到线程池停止状态(比如shutdown()或者shutdownNow()),或者配置被更改的信息(比如setMaximumPoolSize())。
shutdown()方法或者设置线程池信息的方法比如setCorePoolSize()、setMaximumPoolSize()或者tryTerminate等方法中会调用。
代码很简单,主要是理解onlyOne参数的意思,如果为true,那么最多中断一个Woker;如果为false,那么尝试中断所有空闲线程。代码中循环workers集合,如果当前Worker没有被中断,并且尝试获取Worker锁成功(此前没有获取Worker锁),那么就是中断该线程,随后根据onlyOne参数确定是否跳出集合。
/**
* 尝试中断空闲的线程,即没有获取Worker锁的线程,比如在等待任务的线程
* 以便它们在后续运行中可以检测到线程池停止状态(比如shutdown()或者shutdownNow())
* 或者配置被更改的信息(比如setMaximumPoolSize())
*
* @param onlyOne 如果为true,那么最多中断一个Woker。
* 在tryTerminate方法中启动了终止状态但是还存在Worker的时候会调用并传递true
* 补偿式的尝试中断此时处于空闲状态的线程,用来方便的传播SHUTDOWN、STOP信号,
* 因为被中断的线程会在processWorkerExit方法中继续调用该方法
* 防止当前所有线程都在等待任务,中断任何任意一个的线程就还可以确保自SHUTDOWN状态开始以来新到达的Worker最终也会退出。
* 为了保证所有线程中断,仅需要中断一个空闲的线程即可,后续中断状态会进行传播式的调用
* <p>
* 如果为false,那么尝试中断所有空闲线程
* 在shutdown()或者设置线程池信息的方法比如setCorePoolSize()、setMaximumPoolSize()等方法中会调用
*/
private void interruptIdleWorkersJava Executor源码解析—Executors线程池工厂以及四大内置线程池
Java Executor源码解析—ThreadPoolExecutor线程池submit方法以及FutureTask源码一万字
Java Executor源码解析—ThreadPoolExecutor线程池execute核心方法源码一万字