ThreadPoolExecutor源码学习
Posted ylyzty
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadPoolExecutor源码学习相关的知识,希望对你有一定的参考价值。
线程池ThreadPoolExecutor
ThreadPoolExecutor 继承结构
继承结构如图所示:ThreadPoolExecutor <- AbstractExecutorService <- ExecutorService <- Executor
public class ThreadPoolExecutor extends AbstractExecutorService
//...
/**
* 实现了部分 ExecutorService 方法
* 1. submit 方法
* 2. invokeAny 方法
* 3. invokeAll 方法
*/
public abstract class AbstractExecutorService implements ExecutorService
/**
* Callable -> FutureTask
* FutureTask<V> implements RunnableFuture<V>
* RunnableFuture<V> extends Future<V>, Runnable
*
* FutureTask Status:
* NEW(0): 初始状态, 任务刚被创建或者正在计算中
* COMPLETING(1): 中间状态, 任务计算完成正在对结果进行赋值,或者正在处理异常
* NORMAL(2): 终止状态, 任务计算完成, 结果已经完成赋值
* EXCEPTIONAL(3): 终止状态, 任务计算过程发生异常无法处理,线程中断
* CANCELLED(4): 终止状态, 任务计算过程被取消
* INTERRUPTING(5): 中间状态, 任务计算过程已开始并被中断,正在修改状态
* INTERRUPTED(6): 终止状态,任务计算过程已开始并被中断,且已经完全停止
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
return new FutureTask<T>(callable);
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
return new FutureTask<T>(runnable, value);
// 提交 callable 任务
public <T> Future<T> submit(Callable<T> task)
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
// 提交 runnable 任务,返回 null
public Future<?> submit(Runnable task)
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
// 提交 runnable 任务,返回 result
public <T> Future<T> submit(Runnable task, T result)
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
// invokeAll
// 为每一个任务创建对应的FutureTask, 并调用 execute 方法执行
// execute() 方法在 ThreadPoolExecutor 被实现
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try
for (Callable<T> t : tasks)
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
for (int i = 0, size = futures.size(); i < size; i++)
Future<T> f = futures.get(i);
// 如何任务此时还未执行完成,则阻塞获取对应的值
if (!f.isDone())
try
f.get();
catch (CancellationException ignore)
catch (ExecutionException ignore)
done = true;
return futures;
finally
// 执行过程抛出无法处理的异常
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
// 取消任务的执行,如果任务已经执行完成,则不受影响
futures.get(i).cancel(true);
// InvokeAny 方法逻辑待后续更新
/**
* 在 Executor 的基础上定义了一系列任务执行和线程池管理方法
*
* 1. submit: 提供方法执行带有返回值的任务
* 2. invokeAll: 提供方法执行指定的任务集合中的所有任务, 返回 List<Future<T>>
* 3. invokeAny: 提供方法执行指定的任务集合中的所有任务, 将第一个执行完成的任务的结果作为返回值, 并终止其他线程的执行
* 4. isShutDown/isTerminated: 判断线程池状态方法
* 5. shutdown: 不再接受新的任务, 待所有任务执行完毕后关闭线程池
* 6. shutdownNow: 不再接受新的任务,直接关闭线程池
*/
public interface extends Executor
// ...
/**
* 只定义了一个 execute 方法, 执行 Runnable 任务
*/
public interface Executor
void execute(Runnable command);
ThreadPoolExecutor 关键参数及核心方法
关键参数
线程池状态参数
public class ThreadPoolExecutor extends AbstractExecutorService
// 线程池状态,由两部分构造 runState | workerCount
// runState: 占2bit(29~30位)
// workerCount: 占29bit(0~28位)
// 符号位: 占1bit(最高位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// workerCount 最大容量: 2^29 - 1
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 线程池状态
* RUNNING: 运行状态,接受新任务,处理阻塞队列中的任务
* SHUTDOWN: 关闭状态,拒绝新任务,处理阻塞队列中的任务
* STOP: 停止状态,拒绝新任务,并中断当前正在执行的任务,不处理阻塞队列中的任务直接关闭
* TIDYING: 过度状态,当前线程池中的活动线程数降为0时的状态
* TERMINATED: 销毁状态,线程池彻底终止
*/
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池状态转移图如下所示
- RUNNING: 线程池创建后进入的状态
- SHUTDOWN: 调用
shutdown
方法进入该状态,该方法主要包含如下操作- 更新线程池状态为
SHUTDOWN
- 中断空闲线程
interruptIdleWorkers()
- 所以已经存在任务队列中的任务还是能被正常执行完成
- 执行完所有任务后,先清除所有的worker,然后调用
tryTerminate()
,进入TIDYING
状态
- 更新线程池状态为
- STOP: 调用
shutdownNow()
方法进入该状态,该方法主要包含如下操作- 更新线程池状态为
STOP
- 中断所有线程
interruptWorkers()
- 清空任务队列
drainQueue()
- 立即调用
tryTerminate()
进入TIDYING
状态
- 更新线程池状态为
- TIDYING: 调用
terminated()
方法 - TERMINATED: 执行完
terminated()
方法进入该状态- ctl.set(ctlOf(TERMINATED, 0))
线程池管理参数
public class ThreadPoolExecutor extends AbstractExecutorService
// 任务队列
private final BlockingQueue<Runnable> workQueue;
// 工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 线程池到达过的最大线程数量
private int largestPoolSize;
// 已完成任务数
private long completedTaskCount;
// 线程工厂,用于创建线程
private volatile ThreadFactory threadFactory;
// 拒绝策略处理类
private volatile RejectedExecutionHandler handler;
// 线程池中线程数量 > corePoolSize 情况下,空闲线程的最大存活时间
private volatile long keepAliveTime;
// true: 线程数量 <= corePoolSize 情况下,空闲线程的最大存活时间也设置为 keepAliveTime
// false(default): 线程数量 <= corePoolSize 情况下,空闲线程可以一直存活
private volatile boolean allowCoreThreadTimeOut;
// 设置线程池 —— 核心线程数
private volatile int corePoolSize;
// 设置线程池 —— 最大线程数
private volatile int maximumPoolSize;
// 默认任务拒绝策略: 抛出异常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
核心方法
构造函数
// corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue 必须手动设置
// threadFactory, handler 可以使用默认设置
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
execute() 方法
public void execute(Runnable command)
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// workerCount < corePoolSize,则直接添加一个 worker 执行该任务
if (workerCountOf(c) < corePoolSize)
if (addWorker(command, true))
return;
c = ctl.get();
// workerCount >= corePoolSize, 则先尝试将任务添加到 workQueue
if (isRunning(c) && workQueue.offer(command))
int recheck = ctl.get();
// 任务添加到 workQueue 后,执行recheck
// 如果线程池未处于 Running 状态,则将刚刚添加的任务从阻塞队列中删除
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果线程池处于 Running 状态,则判断是否需要添加一个新的 worker
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
// workerCount >= corePoolSize, 并且任务队列已满,添加失败
// 则尝试增加一个新的 worker 执行该任务
// 如果添加失败,则调用拒绝策略处理类
else if (!addWorker(command, false))
reject(command);
execute
提交新任务的处理策略总结如下:
workerCount < corePoolSize
: 直接添加一个新的 worker 执行任务workerCount >= corePoolSize
: 尝试添加到任务队列- 添加成功则执行
recheck
; - 添加失败则尝试创建一个新的 worker 来执行该任务,创建worker失败则调用拒绝策略处理
- 添加成功则执行
addWorker() 方法
该方法用于添加一个新的 Worker 到线程池中,包括两个参数:
- firstTask(Runnable): 创建完成后第一个执行的任务
- core(boolean):
- true: 使用 corePoolSize 为最大线程数量
- false: 使用 maxPoolSize 为最大线程数量
private boolean addWorker(Runnable firstTask, boolean core)
// 循环标签,方便跳出
retry:
for (;;)
int c = ctl.get();
int rs = runStateOf(c);
/**
* 判断线程池状态:以下状态才能添加 worker
* 1. 线程池处于 RUNNING 状态
* 2. 线程池处于 SHUTDOWN 状态 且 firstTask 为 null 且 workQueue 不为空
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;)
int wc = workerCountOf(c);
// 判断当前 worker 数量是否还能继续添加
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 更新 workerCount
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS 更新失败则自旋重试
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// worker 启动标识
boolean workerStarted = false;
// worker 加入 HashSet 集合标识
boolean workerAdded = false;
Worker w = null;
try
// Worker构造方法调用 threadFactory 创建新的线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null)
final ReentrantLock mainLock = this.mainLock;
// 加锁,保证多个线程同时添加 worker 到集合中的安全性
mainLock.lock();
try
int rs = runStateOf(ctl.get());
//
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null))
if (t.isAlive()) // 判断该线程是否已经启动
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
finally
mainLock.unlock();
if (workerAdded)
t.start(); // 启动线程
workerStarted = true;
finally
if (! workerStarted)
// worker 启动失败,则做一些回退处理
// 从 workers 集合中删除 worker
// workCount 减少1
addWorkerFailed(w);
return workerStarted;
Worker
Worker
类实现了Runnable
接口,所以在创建线程中可以传入自己作为任务,然后线程启动时调用自己的run()
方法
Worker
类继承自AQS,所以其本身也是一把锁(不可重入锁),在执行任务时通过lock()
锁住自己,保证worker正在执行时不会去获取其他任务来执行
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
Worker(Runnable firstTask)
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 传入自己作为 Runnable 实例
// 线程启动时执行 Worker.run() 方法
this.thread = getThreadFactory().newThread(this);
// run() 则调用外部 ThreadPoolExecutor 的 runWorker 方法
public void run()
runWorker(this);
runWorker() 方法
final void runWorker(Worker w)
Thread wt = Thread.currentThread();
// 初始任务
Runnable task = w.firstTask;
// firstTask 执行过一次后被置为 null
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try
// 循环获取任务执行,复用已有线程
// getTask() 从任务队列获取task
while (task != null || (task = getTask()) != null)
w.lock();
// 若线程池处于 STOP 状态,但线程没有中断执行,则调用 interrupt() 方法完成中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try
// 钩子方法,任务执行前逻辑
// 默认实现为空,可自定义线程池扩展该功能
beforeExecute(wt, task);
Throwable thrown = null;
try
// 执行任务
task.run();
catch (RuntimeException x)
thrown = x; throw x;
catch (Error x)
thrown = x; throw x;
catch (Throwable x)
thrown = x; throw new Error(x);
finally
// 钩子方法,任务执行后逻辑
// 默认实现为空,可自定义线程池扩展该功能
afterExecute(task, thrown);
finally
task = null;
w.completedTasks++;
w.unlock();
completedAbruptly = false;
finally
// 删除 worker,线程执行完毕
processWorkerExit(w, completedAbruptly);
getTask() 方法
从
workQueue
中获取任务,返回 Runnable 任务或者 null
return Runnable
: worker正常执行return null
: 获取不到任务,进入 processWorkerExit 结束当前 worker
private Runnable getTask()
boolean timedOut = false; // Did the last poll() time out?
for (;;)
int c = ctl.get();
int rs = runStateOf(c);
/**
* 判断是否回收当前线程:
* 情况1. 线程池状态为 SHUTDOWN && workQueue 为空
* 情况2. 线程池状态为 STOP || TERMINATED
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))
decrementWorkerCount();
return null;
int wc = workerCountOf(c);
// true: poll()获取任务,阻塞获取,设置超时时间
// false: take()获取任务,阻塞获取
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 判断是否回收当前线程:
* 条件1. workerCount > maxPoolSize 或 当前线程获取任务超时
* 条件2. workerCount > 1 或 workQueue 为空
*
* 同时满足条件1和条件2,则CAS减少workerCount,并返回null
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty()))
if (compareAndDecrementWorkerCount(c))
return null;
continue;
// 不满足回收当前线程的条件,则执行后续获取任务的逻辑
try
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
catch (InterruptedException retry)
timedOut = false;
processWorkerExit() 方法
从 workers 工作线程集合中删除当前 worker,回收线程。
private void processWorkerExit(Worker w, boolean completedAbruptly)
// 如果是异常退出,则需要手动完成 workerCount 的更新
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
completedTaskCount += w.completedTasks;
workers.remove(w);
finally
mainLock.unlock();
// 尝试终止线程池
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP))
if (!completedAbruptly)
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
// 1.如果是异常退出则直接添加一个新的 worker
// 2.如果 workerCount < 最小线程数要求,则添加一个新的 worker
addWorker(null, false);
总结
创建线程池提交任务,整体执行流程如下图所示:
- execute(): 提交 Runnable Task
- submit(): 提交 Callable Task
- wc: workerCount, 线程数量
- rs: runState, 线程池运行状态
- reject: 执行任务拒绝策略
Java多线程 -- JUC包源码分析11 -- ThreadPoolExecutor源码分析
在JUC包中,线程池部分本身有很多组件,可以说是前面所分析的各种技术的一个综合应用。从本文开始,将综合前面的知识,逐个分析线程池的各个组件。
-Executor/Executors
-ThreadPoolExecutor使用介绍
-ThreadPoolExecutor实现原理
–ThreadPoolExecutor的中断与优雅关闭 shutdown + awaitTermination
–shutdown的一个误区
Executor/Executors
Executor是线程池框架最基本的几个接口:
public interface Executor {
void execute(Runnable command);
}
而Executors是线程池框架的一个工具类,利用它可以方便的创建不同策略的线程池:
//单线程线程池:corePoolSize = maxPoolSize = 1, 队列用的LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//固定数目的线程池:corePoolSize = maxPoolSize = n, 队列用的LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
//1。CachedThreadPool,corePoolSize = 0, 队列为SynchronousQueue,maxPoolSize = Integer.MAX_VALUE(这也就意味着,每来一个任务,就创建一个线程。
//2。关于SynchronousQueue,后面会单独用一篇来分析。它是个特殊的队列,没本身没有容量,放进去一个,就得等有线程拿出来,才能解除阻塞
//3。从构造参数可以看出,空闲线程,60s没人用,回收
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//单线程的,具有周期调度功能的线程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
//多线程的,具有周期调度功能的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
从上面可以看出,Executors的各个工具函数,都用的ThreadPoolExecutor/ScheduledThreadPoolExecutor这2个类,下面做详细分析。
ThreadPoolExecutor
ThreadPoolExecutor构造函数详解
下面是ThreadPoolExecutor的参数最全的构造函数,搞清楚了每个参数的含义,也就明白了线程池的各种不同策略,也就明白了上述Executors工具类中的各个工具函数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize: 线程池始终维护的线程个数
maxPoolSize: corePooSize满了,队列也满的情况下,扩充线程至这个值
keepAliveTime/TimeUnit: maxPoolSize中的空闲线程,过多长时间销毁,总线程数收缩回corePoolSize
blockingQueue: 线程池所用的队列类型
threadFactory: 线程创建工厂,可以自定义,也有一个缺省的
RejectedExecutionHandler: corePoolSize满了,队列满了,maxPoolSize满了,最后的拒绝策略。
ThreadPool任务处理流程
从上述构造函数解释,可以看出每次submit的任务,有如下的处理流程:
step1: 判断当前线程数 >= corePoolSize。如果小于,新建线程执行;如果大于,进入step2
step2: 判断队列是否已满。未满,放入;已满,进入step3
step3: 判断当前线程数 >= maxPoolSize。如果小于,新建线程执行;如果大于,进入step4
step4: 根据拒绝策略,拒绝任务
总结一下:先判断corePoolSize, 再判断blockingQueue,再判断maxPoolSize,最后使用拒绝策略
ThreadPool的4中拒绝策略
ThreadPoolExecutor的4个内部类,分别定义了4种策略。缺省是AbortPolicy
//策略1:让调用者直接在自己的线程里面执行,线程池不做处理
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
//策略2:线程池直接抛异常
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
//策略3:线程池直接把任务丢掉,当作什么也没发生
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
//策略4:把队列里面最老的任务删除掉,把该任务放入队列
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
ThreadPoolExecutor实现原理
一般都知道,ThreadPool的基本实现原理就是一个队列 + 一组worker线程,调用中不断往队列中放,worker线程不断去取。但在具体实现中,有不同的实现策略:
策略1: 阻塞队列 vs. 非阻塞队列
在ThreadPoolExecutor中,使用的是阻塞队列,即如下的BlockingQueue接口:
private final BlockingQueue<Runnable> workQueue;
这也就意味着,worker内部不需要自己设置wait/notify机制,它只管从队列中取,取的到执行,取不到,自动会阻塞。
也有使用非阻塞队列的,比如Tomcat 6里面的线程池实现(以后会源码详细分析),当没有请求处理时,worker内部自己实现阻塞,然后又新的请求进来,再通知woker。
策略2:新来的请求,是直接放入队列,还是先new一个新的thread?
ThreadPool的处理方式是优先new thread处理,thread count >= corePoolSize的时候,再考虑放入队列。
策略3: 无界队列 vs. 有界队列?
如果无界队列,意味着maxPoolSize的逻辑永远不会执行。这在上面的Executors中,FixedThreadPool已有所体现。
除此之外,还有诸多实现上的细节,下面代码详细分析
源码分析
//核心结构:一个BlockingQueue + 一个线程的Set + 一把锁(控制对workers, 各种threadCount的互斥访问)
public class ThreadPoolExecutor extends AbstractExecutorService {
。。。
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
。。。
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //小于corePoolSize的判断
if (runState == RUNNING && workQueue.offer(command)) { //入队列
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command); //进入队列之后,2次检测
}
else if (!addIfUnderMaximumPoolSize(command)) //小于maxPoolSize的判断
reject(command); // 大于maxPoolSize,拒绝请求
}
}
//poolSize < corePoolSize的时候,直接new Thread,加入hashSet
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
//队列满了,poolSize < maxPoolSize,再次new thread,加入hashSet
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
Worker的实现
private final class Worker implements Runnable {
。。。
private Runnable firstTask; //至所以有firstTask这个变量,是因为创建worker的时候,可以直接赋给它一个task执行;有可以不赋给task,让它自己到blockingQueue里面去循环取
Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
//1个死循环,不断从blockingQueue中,取task执行。取不到,就会阻塞在getTask()里面
public void run() {
try {
hasRun = true;
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this); //worker线程退出
}
}
。。。
}
//getTask里面有个关键点:当poolSize <= corePoolSize时,是无限期阻塞下去,线程也就会一直存在,不会退出,死掉;当poolSize > corePoolSize或者允许coreThread也死去时,线程就只阻塞keepAliveTime的时间,时间到了,队列还是空的,没有请求,线程就退出,死掉了,同时poolSize--.
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN)
r = workQueue.poll(); //poll是非阻塞调用,没有直接返回null
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); //等待1个超时时间,默认就是构造函数里面传进去的那个60s
else
r = workQueue.take(); //take是阻塞调用,没有,一直阻塞
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN)
interruptIdleWorkers();
return null;
}
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
中断与优雅关闭
线程池状态切换图
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
初始处于RUNNING状态,当调用shutdown()之后,切换到SHUTDOWN状态;调用shutdownNow(),切换到STOP状态。
那shutdown与shutdownNow有什么区别吗?
shutdown(): 不会清空队列里面的任务,会等所有任务执行完毕。并且它只会中断那些 > corePoolSize的idle线程
shutdownNow(): 清空队列里面所有任务,同时向所有线程发送中断信号
当队列为空 && pool也为空时,线程池进入Terminated状态。
shutdown/shutdownNow源码解析
public void shutdown() {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm); //权限检查,check当前调用者,是否有权限关闭线程池。没有权限,抛出异常。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (security != null) {
for (Worker w : workers)
security.checkAccess(w.thread); //权限检查
}
int state = runState;
if (state < SHUTDOWN)
runState = SHUTDOWN; //从running切换到shutdown。不能从stop或者terminated切换到shutdown
try {
for (Worker w : workers) {
w.interruptIfIdle(); //遍历所有线程,向其发送信号
}
} catch (SecurityException se) {
runState = state;
throw se;
}
tryTerminate(); //试图终止线程池
} finally {
mainLock.unlock();
}
}
public List<Runnable> shutdownNow() {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (security != null) {
for (Worker w : workers)
security.checkAccess(w.thread);
}
int state = runState;
if (state < STOP)
runState = STOP; //切换到stop状态
try {
for (Worker w : workers) {
w.interruptNow(); //变量所有线程,发中断信号,不管是否正在执行任务
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
List<Runnable> tasks = drainQueue(); //清空队列请求
tryTerminate(); // 试图终止线程池
return tasks;
} finally {
mainLock.unlock();
}
}
从上面,可以看出,shutdown和shutdownNow的区别有3点:
(1)一个是切换到shutdown状态,一个是切换到stop状态
(2)遍历所有线程,一个调用的interruptIfIdle, 一个调用的interruptNow。
(3)shutdownNow会清空队列中的任务
那interruptIfIdle和interruptNow有什么区别呢?
private final class Worker implements Runnable {
。。。
private final ReentrantLock runLock = new ReentrantLock();
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
if (hasRun && thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
void interruptNow() {
if (hasRun)
thread.interrupt();
}
public void run() {
try {
hasRun = true;
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) { //getTask内部,也有响应中断的逻辑
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
//每次从队列中拿出一个任务,执行之前,会加锁
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
if ((runState >= STOP ||
(Thread.interrupted() && runState >= STOP)) &&
hasRun)
thread.interrupt();
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
可以看出,interruptIfIdle和interuptNow的关键区别是:前者会加锁访问,这也就意味着,如果被中断的线程,正在执行runTask,则锁是拿不到的。此时shutdown会阻塞,直到woker执行完runTask。
shutdown的一个误区
根据上面分析,是不是shutdown一定会阻塞到队列中所有请求都执行完,再返回呢?或者说,shutdown返回的时候,是不是队列里面的请求就一定执行完了呢?
不一定!shutdown返回之后,线程池不一定立即关闭!为什么呢?
请看下面的getTask函数
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) //如果线程池是shutdown状态,就不阻塞了,不管是否能拿到,都是直接返回
r = workQueue.poll(); //关键点:如果是shutdown状态,会一直循环,直到拿空队列里面所有任务
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take(); //case1: 别的线程先置了中断标志位,然后当前线程调用take //case 2: 先调用take阻塞在这,然后别的线程置了中断标志位 //2种case,都会抛出异常,进入下面的InterruptedException
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) { //阻塞的时候,收到中断,不处理,再次循环检查
// On interruption, re-check runState
}
}
}
public void run() {
try {
hasRun = true;
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) { //getTask内部,也有响应中断的逻辑
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
//每次从队列中拿出一个任务,执行之前,会加锁
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
if ((runState >= STOP ||
(Thread.interrupted() && runState >= STOP)) &&
hasRun)
thread.interrupt();
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
总结一下:当执行shutdown的时候,woker线程可能处于以下几种情况:
情况1: 正在执行runTask,此时拿着runLock锁,调用者会阻塞在shutdown上面。
情况2: 正要进入getTask。runTask执行完了,锁释放了,正要去getTask。此时shutdown不会阻塞,状态切换到shutdown状态,就返回了。 而getTask里面,会调用blockingQueue.poll
情况3: 在getTask里面,阻塞在blockQueue.take上面。此时调用shutdown, getTask里面收到中断,再次开始for(;;)循环
情况2,情况3,shutdown不会阻塞,就返回了。
所以不管是shutdown, 还是shutdownNow(),结尾都调用了tryTeminate,下面看看这个函数:
private void tryTerminate() {
if (poolSize == 0) { //线程池里线程没了
int state = runState;
if (state < STOP && !workQueue.isEmpty()) {
state = RUNNING; //关键点:线程池里线程没了,状态是shutdown状态,队列还不为空,此时把状态切会到Running状态。并且重新创建线程,消化队列中的任务
addThread(null);
}
if (state == STOP || state == SHUTDOWN) {
runState = TERMINATED;
termination.signalAll(); //通知awaitTermination函数,不要再等了,线程池关闭
terminated();
}
}
}
所以,正确的使用shutdown的方式,应该是如下代码:
executor.shutdown();//只是不能再提交新任务,等待执行的任务不受影响
//调完shutdown,要循环调用awaitTermination,等待线程池真的终止
try {
boolean loop = true;
do { //等待所有任务完成
loop = !executor.awaitTermination(2, TimeUnit.SECONDS); //阻塞,直到线程池里所有任务结束
} while(loop);
} catch (InterruptedException e) {
e.printStackTrace();
}
总结:无论是shutdown,还是shutdownDown,都无法保证线程池立即关闭。他们的本质都只是切换了线程池的状态,发送了中断信号,然后等队列里面的任务为空了,所有线程自己销毁自己。
要让主线程等待线程池彻底终止,需要调用awaitTermination函数。
关于SechduledThreadPoolExecutor,会在接下来的篇章中,详细的单独阐述。
以上是关于ThreadPoolExecutor源码学习的主要内容,如果未能解决你的问题,请参考以下文章