Java 并发包之线程池综述
Posted roman kickCode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 并发包之线程池综述相关的知识,希望对你有一定的参考价值。
■ 线程池的创建
在Java中,您可以通过调整-Xss参数来调节每个线程栈的大小(64bit系统默认1024KB),当减小该值时意味着可以创建更多的线程数,但问题是JVM资源是有限的,线程不能无限创建!
从笔者开发经验来看,线程池应该是并发包中使用频率和运用场景最多的并发框架,几乎所有并发/异步执行任务的需求都需要用到线程池,线程复用,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务。合理的使用线程池可以带来如下三个好处:
1.降低资源消耗:通过重用已创建的线程来降低线程创建和销毁的消耗
2.提高响应速度:任务到达时不需要等待线程创建就可以立即执行
3.提高线程的可管理性:线程池可以统一管理、分配、调优和监控
■ ThreadPoolExecutor —— 线程池最核心的类
- 类定义: 实现了 AbstractExecutorService 类,ExecutorService,Executor 接口
public class ThreadPoolExecutor extends AbstractExecutorService implements ExecutorService,Executor {
- 构造器:通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作
/** * 线程工厂默认为DefaultThreadFactory * 饱和策略默认为AbortPolicy */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } /** * 线程工厂可配置 * 饱和策略默认为AbortPolicy */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } /** * 线程工厂默认为DefaultThreadFactory * 饱和策略可配置 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } /** * 线程工厂可配置 * 饱和策略可配置 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
- 重要变量
//线程池控制器 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //任务队列 private final BlockingQueue<Runnable> workQueue; //全局锁 private final ReentrantLock mainLock = new ReentrantLock(); //工作线程集合 private final HashSet<Worker> workers = new HashSet<Worker>(); //终止条件 - 用于等待任务完成后才终止线程池 private final Condition termination = mainLock.newCondition(); //曾创建过的最大线程数 private int largestPoolSize; //线程池已完成总任务数 private long completedTaskCount; //工作线程创建工厂 private volatile ThreadFactory threadFactory; //饱和拒绝策略执行器 private volatile RejectedExecutionHandler handler; //工作线程活动保持时间(超时后会被回收) - 纳秒 private volatile long keepAliveTime; /** * 允许核心工作线程响应超时回收 * false:核心工作线程即使空闲超时依旧存活 * true:核心工作线程一旦超过keepAliveTime仍然空闲就被回收 */ private volatile boolean allowCoreThreadTimeOut; //核心工作线程数 private volatile int corePoolSize; //最大工作线程数 private volatile int maximumPoolSize; //默认饱和策略执行器 - AbortPolicy -> 直接抛出异常 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
■ ThreadPoolExecutor 的使用
- 创建线城池实际上就是实例化一个线程池对象,这里我们使用最完整的构造器来描述最完整的创建过程:
1. corePoolSize(核心工作线程数):无任务时,线程池允许(维护)的最小空闲线程池数;当一个任务被提交到线程池就新建一个工作线程来执行任务(即使此时有空闲的核心工作线程)直到(实际工作线程数 >= 核心工作线程数)为止;调用 prestartAllCoreThreads()方法会提前创建并启动所有核心工作线程
2. maximumPoolSize(最大工作线程数):线程池允许创建的最大工作线程数;当(队列已满 && 实际工作线程数 < 最大工作线程数)时,线程池会创建新的工作线程(即使此时仍有空闲的工作线程)执行任务直到最大工作线程数为止;设置无界队列时该参数其实无效
3. keepAliveTime(工作线程最大空闲时间):单位纳秒,满足超时条件且空闲的工作线程会被回收;超时的非核心工作线程会被回收,核心工作线程不会被回收;当allowCoreThreadTimeOut=true 时,则超时的核心工作线程也会被回收;若该值没有设置则线程会永远存活;建议当场景为任务短而多时,可以调高时间以提高线程利用率
4. unit(线程活动保持时间单位): 线程活动保持时间单位,可选的包括NANOSECONDS纳秒、MICROSECONDS微秒、MILLISECONDS毫秒、SECONDS秒、MINUTES分、HOURS时、DAYS天
5. workQueue(任务队列): 用来保存等待执行的任务的阻塞队列;当 (实际工作线程数 >= 核心工作线程数) && (任务数 < 任务队列长度)时,任务会offer()入队等待;关于任务队列详见下文的任务队列与排队策略
6. threadFactory(线程创建工厂): 顾名思义,就是用于创建线程的工厂,允许自定义创建工厂,可以线程进行初始化配置,比如名字、守护线程、异常处理等等
7. handler(饱和策略执行器): 当线程池和队列都已满,此时说明线程已无力再接收更多的任务,即任务数饱和,没法接单了;此时需要使用一种饱和策略处理新提交的任务,默认是Abort(直抛Reject异常),还包括Discard(LIFO规则丢弃)、DiscardOldest(LRU规则丢弃) 以及 CallerRuns(调用者线程执行),允许自定义执行器
从上面给出的 ThreadPoolExecutor 类的代码可以知道,ThreadPoolExecutor 继承了 AbstractExecutorService,我们来看一下 AbstractExecutorService 的实现:
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { }; protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {}; public <T> Future<T> submit(Runnable task, T result) { }; public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { };
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { }; }
AbstractExecutorService 是一个抽象类,它实现了ExecutorService 接口:
public interface ExecutorService extends Executor { void shutdown(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
而ExecutorService 又是继承了Executor 顶层接口:
public interface Executor { void execute(Runnable command); }
- 提交、执行和关闭任务 (重要方法)
1. execute(): 适用于提交无须返回值的任务
- 该方法是无法判断任务是否被线程池执行成功
2. submit(): 适用于提交需要返回值的任务
-可以通过返回的Future对象得知任务是否已经执行成功
-get() 方法会阻塞当前线程直到任务完成,但要注意防范无限阻塞!!!
-使用 get(long timeout,TimeUnit unit) 方法会阻塞当前线程直到任务完成或超时,不会有无限阻塞的发生但需要注意超时后任务可能还没完成!!!
3. shutdown() : 有序地关闭线程池,已提交的任务会被执行(包含正在执行和任务队列中的),但会拒绝新任务
shutdownNow(): 立即(尝试)停止执行所有任务(包含正在执行和任务队列中的),并返回待执行任务列表
■ ThreadPoolExecutor 实现原理
- 流程图
- 线程池的状态
线程状态的流转遵循如下顺序,即由小到大顺序排列:
RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED
* 补充:数值的变迁感觉就好比我们的年龄,越大离上帝就越近 = =
//线程池状态控制器,用于保证线程池状态和工作线程数 ps:低29位为工作线程数量,高3位为线程池状态 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //设定偏移量 Integer.SIZE = 32 -> 即COUNT_BITS = 29 private static final int COUNT_BITS = Integer.SIZE - 3; //确定最大的容量2^29-1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //获取线程池状态,取高3位 private static int runStateOf(int c) { return c & ~CAPACITY; } //获取工作线程数量,取低29位 private static int workerCountOf(int c) { return c & CAPACITY; } /** * 获取线程池状态控制器 * @param rs 表示runState 线程池状态 * @param wc 表示workerCount 工作线程数量 */ private static int ctlOf(int rs, int wc) { return rs | wc; }
这里补充一点二进制运算符基础知识方便忘却的读者理解一下:
&:与运算符,同位都为1才为1,否则为0
|:或运算符,同位有一个为1即为1,否则为0
~:非运算符,0和1互换,即若是0变成1,1则变成0
^:异或运算符,同位相同则为0,不同则为1
- 工人生产(生产者与消费者模式)
之前每个变量的作用都已经标明出来了,这里通过实例展示其作用:
/**
假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;
当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;
然后就将任务也分配给这4个临时工人做;
如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
**/
那么我们知道其实线程就相当于工人,所以我们来看下线程池的内部类 Worker:
- 继承AQS类: 实现简单的不可重入互斥锁,以提供便捷的锁操作,目的用于处理中断情况
- 实现Runnable接口: "投机取巧"的设计,主要是借用Runnable接口的统一写法,好处是不用重新写一个同功能接口
- 工作线程: Worker会通过thread变量绑定一个真正执行任务的工作线程(一对一),初始化时就由线程工厂分配好,它会反复地获取和执行任务
- 任务: Worker每次都会将新任务赋值给firstTask变量,工作线程每次通过该变量处理新获取到的任务(初始化时该值允许为null,有特殊作用,下文会详述)
/**
Worker类封装了 ( 锁 + 线程 + 任务 ) 这三个部分,从而成为了一个多面手的存在
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** 实际上真正的工作线程 - 幕后大佬,但可能因线程工厂创建失败而为null */ final Thread thread; /** 待执行任务,可能为null */ Runnable firstTask; /** 该工作线程已完成的任务数 -- 论KPI的重要性 */ volatile long completedTasks; Worker(Runnable firstTask) { //设置锁状态为-1,目的是为了阻止在runWorker()之前被中断 setState(-1); /** * 新任务,任务来源有两个: * 1.调用addWorker()方法新建线程时传入的第一个任务 * 2.调用runWorker()方法时内部循环调用getTask() -- 这就是线程复用的具现 */ this.firstTask = firstTask; /** * 创建一个新的线程 -> 这个是真正的工作线程 * 注意Worker本身就是个Runnable对象 * 因此newThread(this)中的this也是个Runnable对象 */ this.thread = getThreadFactory().newThread(this); } }
- 执行任务
/** * 工作线程运行 * runWorker方法内部会通过轮询的方式 * 不停地获取任务和执行任务直到线程被回收 */ public void run() { runWorker(this); }
(重点) 这里简单介绍一下线程在线程池执行任务的工作流程:
1.工作线程开始执行前,需先对worker加锁,任务完成解锁
2.任务执行前后分别执行beforeExecute()和afterExecute()方法
3.执行中遇到异常会向外抛出,线程是否死亡取决于您对于异常的处理
4.每个任务执行完后,当前工作线程任务完成数自增,同时会循环调用getTask()从任务队列中反复获取任务并执行,无任务可执行时线程会阻塞在该方法上
5.当工作线程因各种理由退出时,会执行processWorkerExit()回收线程(核心是将该worker从workers集合中移除,注意之前worker已经退出任务循环,因此已经不再做工了,从集合移除后就方便gc了)
- 锁方法
// Lock methods // The value 0 represents the unlocked state. 0表示未锁定 // The value 1 represents the locked state. 1表示已锁定 protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { //锁状态非0即1,即不可重入 //特殊情况:只有初始化时才为-1,目的是防止线程初始化阶段被中断 if (compareAndSetState(0, 1)) { //当前线程占有锁 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { //释放锁 setExclusiveOwnerThread(null); //状态恢复成未锁定状态 setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()){ try { t.interrupt(); } catch (SecurityException ignore) { } } }
- 动态控制
/** * 设置核心工作线程数 * 1.若新值<当前值时,将调用interruptIdleWorkers()处理超出部分线程 * 2.若新值>当前值时,新创建的线程(若有必要)直接会处理队列中的任务 */ public void setCorePoolSize(int corePoolSize) /** * 设置是否响应核心工作线程超时处理 * 1.设置false时,核心工作线程不会因为任务数不足(空闲)而被终止 * 2.设置true时,核心工作线程和非核心工作线程待遇一样,会因为超时而终止 * 注意:为了禁止出现持续性的线程替换,当设置true时,超时时间必须>0 * 注意:该方法通常应在线程池被使用之前调用 */ public void allowCoreThreadTimeOut(boolean value) /** * 设置最大工作线程数 * 1.若新值<当前值时,将调用interruptIdleWorkers()处理超出部分线程 * 注意:当新值>当前值时是无需做任何处理的,跟设置核心工作线程数不一样 */ public void setMaximumPoolSize(int maximumPoolSize) /** * 设置超时时间,超时后工作线程将被终止 * 注意:若实际工作线程数只剩一个,除非线程池被终止,否则无须响应超时 */ public void setKeepAliveTime(long time, TimeUnit unit)
■ 任务提交与执行
- execute() - 提交任务
/** * 在未来的某个时刻执行给定的任务 * 这个任务由一个新线程执行,或者用一个线程池中已经存在的线程执行 * 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭 * 要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理 */ public void execute(Runnable command) { //新任务不允许为空,空则抛出NPE if (command == null) throw new NullPointerException(); /** * 1.若实际工作线程数 < 核心工作线程数,会尝试创建一个工作线程去执行该 * 任务,即该command会作为该线程的第一个任务,即第一个firstTask * * 2.若任务入队成功,仍需要执行双重校验,原因有两点: * - 第一个是去确认是否需要新建一个工作线程,因为可能存在 * 在上次检查后已经死亡died的工作线程 * - 第二个是可能在进入该方法后线程池被关闭了, * 比如执行shutdown() * 因此需要再次检查state状态,并分别处理以上两种情况: * - 若线程池中已无可用工作线程了,则需要新建一个工作线程 * - 若线程池已被关闭,则需要回滚入队列(若有必要) * * 3.若任务入队失败(比如队列已满),则需要新建一个工作线程; * 若新建线程失败,说明线程池已停止或者已饱和,必须执行拒绝策略 */ int c = ctl.get(); /** * 情况一:当实际工作线程数 < 核心工作线程数时 * 执行方案:会创建一个新的工作线程去执行该任务 * 注意:此时即使有其他空闲的工作线程也还是会新增工作线程, * 直到达到核心工作线程数为止 */ if (workerCountOf(c) < corePoolSize) { /** * 新增工作线程,true表示要对比的是核心工作线程数 * 一旦新增成功就开始执行当前任务 * 期间也会通过自旋获取队列任务进行执行 */ if (addWorker(command, true)) return; /** * 需要重新获取控制器状态,说明新增线程失败 * 线程失败的原因可能有两种: * - 1.线程池已被关闭,非RUNNING状态的线程池是不允许接收新任务的 * - 2.并发时,假如都通过了workerCountOf(c) < corePoolSize校验,但其他线程 * 可能会在addWorker先创建出线程,导致workerCountOf(c) >= corePoolSize, * 即实际工作线程数 >= 核心工作线程数,此时需要进入情况二 */ c = ctl.get(); } /** * 情况二:当实际工作线程数>=核心线程数时,新提交任务需要入队 * 执行方案:一旦入队成功,仍需要处理线程池状态突变和工作线程死亡的情况 */ if (isRunning(c) && workQueue.offer(command)) { //双重校验 int recheck = ctl.get(); /** * recheck的目的是为了防止线程池状态的突变 - 即被关闭 * 一旦线程池非RUNNING状态时,除了从队列中移除该任务(回滚)外 * 还需要执行任务拒绝策略处理新提交的任务 */ if (!isRunning(recheck) && remove(command)) //执行任务拒绝策略 reject(command); /** * 若线程池还是RUNNING状态 或 队列移除失败(可能正好被一个工作线程拿到处理了) * 此时需要确保至少有一个工作线程还可以干活 * 补充一句:之所有无须与核心工作线程数或最大线程数相比,而只是比较0的原因是 * 只要保证有一个工作线程可以干活就行,它会自动去获取任务 */ else if (workerCountOf(recheck) == 0) /** * 若工作线程都已死亡,需要新增一个工作线程去干活 * 死亡原因可能是线程超时或者异常等等复杂情况 * * 第一个参数为null指的是传入一个空任务, * 目的是创建一个新工作线程去处理队列中的剩余任务 * 第二个参数为false目的是提示可以扩容到最大工作线程数 */ addWorker(null, false); } /** * 情况三:一旦线程池被关闭 或者 新任务入队失败(队列已满) * 执行方案:会尝试创建一个新的工作线程,并允许扩容到最大工作线程数 * 注意:一旦创建失败,比如超过最大工作线程数,需要执行任务拒绝策略 */ else if (!addWorker(command, false)) //执行任务拒绝策略 reject(command); }
- addWorker() - 新增工作线程
/** * 新增工作线程需要遵守线程池控制状态规定和边界限制 * * @param core core为true时允许扩容到核心工作线程数,否则为最大工作线程数 * @return 新增成功返回true,失败返回false */ private boolean addWorker(Runnable firstTask, boolean core) { //重试标签 retry: /*** * 外部自旋 -> 目的是确认是否能够新增工作线程 * 允许新增线程的条件有两个: * 1.满足线程池状态条件 -> 条件一 * 2.实际工作线程满足数量边界条件 -> 条件二 * 不满足条件时会直接返回false,表示新增工作线程失败 */ for (;;) { //读取原子控制量 - 包含workerCount(实际工作线程数)和runState(线程池状态) int c = ctl.get(); //读取线程池状态 int rs = runStateOf(c); /** * 条件一.判断是否满足线程池状态条件 * 1.只有两种情况允许新增线程: * 1.1 线程池状态==RUNNING * 1.2 线程池状态==SHUTDOWN且firstTask为null同时队列非空 * * 2.线程池状态>=SHUTDOWN时不允许接收新任务,具体如下: * 2.1 线程池状态>SHUTDOWN,即为STOP、TIDYING、TERMINATED * 2.2 线程池状态==SHUTDOWN,但firstTask非空 * 2.3 线程池状态==SHUTDOWN且firstTask为空,但队列为空 * 补充:针对1.2、2.2、2.3的情况具体请参加后面的"小问答"环节 */ if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; /*** * 内部自旋 -> 条件二.判断实际工作线程数是否满足数量边界条件 * -数量边界条件满足会对尝试workerCount实现CAS自增,否则新增失败 * -当CAS失败时会再次重新判断是否满足新增条件: * 1.若此期间线程池状态突变(被关闭),重新判断线程池状态条件和数量边界条件 * 2.若此期间线程池状态一致,则只需重新判断数量边界条件 */ for (;;) { //读取实际工作线程数 int wc = workerCountOf(c); /** * 新增工作线程会因两种实际工作线程数超标情况而失败: * 1.实际工作线程数 >= 最大容量 * 2.实际工作线程数 > 工作线程比较边界数(当前最大扩容数) * -若core = true,比较边界数 = 核心工作线程数 * -若core = false,比较边界数 = 最大工作线程数 */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; /** * 实际工作线程计数CAS自增: * 1.一旦成功直接退出整个retry循环,表明新增条件都满足 * 2.因并发竞争导致CAS更新失败的原因有三种: * 2.1 线程池刚好已新增一个工作线程 * -> 计数增加,只需重新判断数量边界条件 * 2.2 刚好其他工作线程运行期发生错误或因超时被回收 * -> 计数减少,只需重新判断数量边界条件 * 2.3 刚好线程池被关闭 * -> 计数减少,工作线程被回收, * 需重新判断线程池状态条件和数量边界条件 */ if (compareAndIncrementWorkerCount(c)) break retry; //重新读取原子控制量 -> 原因是在此期间可能线程池被关闭了 c = ctl.get(); /** * 快速检测是否发生线程池状态突变 * 1.若状态突变,重新判断线程池状态条件和数量边界条件 * 2.若状态一致,则只需重新判断数量边界条件 */ if (runStateOf(c) != rs) continue retry; } } /** * 这里是addWorker方法的一个分割线 * 前面的代码的作用是决定了线程池接受还是拒绝新增工作线程 * 后面的代码的作用是真正开始新增工作线程并封装成Worker接着执行后续操作 * PS:虽然笔者觉得这个方法其实可以拆分成两个方法的(在break retry的位置) */ //记录新增的工作线程是否开始工作 boolean workerStarted = false; //记录新增的worker是否成功添加到workers集合中 boolean workerAdded = false; Worker w = null; try { //将新提交的任务和当前线程封装成一个Worker w = new Worker(firstTask); //获取新创建的实际工作线程 final Thread t = w.thread; /** * 检测是否有可执行任务的线程,即是否成功创建了新的工作线程 * 1.若存在,则选择执行任务 * 2.若不存在,则需要执行addWorkerFailed()方法 */ if (t != null) { /** * 新增工作线程需要加全局锁 * 目的是为了确保安全更新workers集合和largestPoolSize */ final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /** * 获得全局锁后,需再次检测当前线程池状态 * 原因在于预防两种非法情况: * 1.线程工厂创建线程失败 * 2.在锁被获取之前,线程池就被关闭了 */ int rs = runStateOf(ctl.get()); /** * 只有两种情况是允许添加work进入works集合的 * 也只有进入workers集合后才是真正的工作线程,并开始执行任务 * 1.线程池状态为RUNNING(即rs<SHUTDOWN) * 2.线程池状态为SHUTDOWN且传入一个空任务 * (理由参见:小问答之快速检测线程池状态?) */ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { /** * 若线程处于活动状态时,说明线程已启动,需要立即抛出"线程状态非法异常" * 原因是线程是在后面才被start的,已被start的不允许再被添加到workers集合中 * 换句话说该方法新增线程时,而线程是新的,本身应该是初始状态(new) * 可能出现的场景:自定义线程工厂newThread有可能会提前启动线程 */ if (t.isAlive()) throw new IllegalThreadStateException(); //由于加锁,所以可以放心的加入集合 workers.add(w); int s = workers.size(); //更新最大工作线程数,由于持有锁,所以无需CAS if以上是关于Java 并发包之线程池综述的主要内容,如果未能解决你的问题,请参考以下文章