深入理解ThreadPoolExecutor第三弹
Posted 风在哪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解ThreadPoolExecutor第三弹相关的知识,希望对你有一定的参考价值。
从源头解析ThreadPoolExecutor第三弹—属性及execute方法详解
首先我们看看ThreadPoolExecutor都有哪些属性:
ctl属性是ThreadPoolExecutor中出现的第一个属性,所以我们首先来看看ctl属性的作用。
ctl属性
ctl是线程池中的重要标识,它主要有两个作用:
- 线程池的运行状态
- 线程池中工作线程数
这里为什么要用一个变量代表线程池中两个重要标识呢,我们可以从如下两点考虑:
- 首先,我们需要考虑的是多线程中线程安全的问题。在多线程环境中,Java中主要使用synchronized关键字或者CAS+volatile去解决多线程访问共享资源的问题。但是synchronized适合写多读少的场景,而CAS+volatile适合读多写少的场景。在ThreadPoolExecutor中对于ctl的访问正是读多写少的场景,所以这里使用了CAS+volatile的方法解决多线程问题。
- CAS保证了操作的原子性
- volatile保证了操作的可见性
- 当采用CAS+volatile操作时,不得不面对cas操作的缺点:
- cas在写多读少的情况下,会出现大量的循环,增加CPU的开销
- cas只能保证单一变量的原子性操作,所以这里使用一个变量标识两种状态。
- CAS无法解决ABA问题
了解了上述的知识以后,我们才能真正了解ctl的作用。
我们看看ThreadPoolExecutor源码是怎么实现用ctl表示线程池工作线程数量和线程池运行状态的。
以下是涉及上述ctl作用的源码。
// Integer.SIZE = 32 ==》 COUNT_BITS = 29;
private static final int COUNT_BITS = Integer.SIZE - 3;
// 1 << COUNT_BITS ==》 00100000000000000000000000000000
// (1 << COUNT_BITS) - 1 ==》 00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/*
下面这些变量表示的就是线程池的运行状态
从他们的二进制表示可以看出来线程池的运行状态是通过高三位来表示的,其余的位都为0
*/
/*
-1:11111111 11111111 11111111 11111111
RUNNING ==》 -1 << 29:11100000 00000000 00000000 00000000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/*
SHUTDOWN ==》 00000000 00000000 00000000 00000000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/*
STOP ==》 00100000 00000000 00000000 00000000
*/
private static final int STOP = 1 << COUNT_BITS;
/*
TIDYING ==》01000000 00000000 00000000 00000000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/*
TERMINATED ==》10100000 00000000 00000000 00000000
*/
private static final int TERMINATED = 3 << COUNT_BITS;
// 通过下面的方法对ctl进行拆箱和装箱
/*
获取线程池的运行状态,对ctl进行拆箱工作,ctl的高三位表示的正是线程池的运行状态
我们看看为什么是这样
~CAPACITY:11100000 00000000 00000000 00000000
ctl & ~CAPACITY操作就保留了ctl高三位的信息,这高三位就代表了线程池的运行状态
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/*
获取线程池中工作线程的数量,对ctl进行拆箱工作
CAPACITY:00011111 11111111 11111111 11111111
ctl & CAPACITY操作就清除了ctl的高三位信息,用剩余的位数的信息表示正在运行的工作线程的数量
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根据rs和wc产生ctl的初始值
private static int ctlOf(int rs, int wc) { return rs | wc; }
我们来看看ctl的初始化操作:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl就是根据运行状态running和工作线程数0来初始化的。
看看线程池的各种状态的含义:
状态 | 解释 |
---|---|
RUNNING | 运行态,可处理新任务并执行队列中的任务 |
SHUTDOW | 关闭态,不接受新任务,但处理队列中的任务 |
STOP | 停止态,不接受新任务,不处理队列中任务,且打断运行中任务 |
TIDYING | 整理态,所有任务已经结束,workerCount = 0 ,将执行terminated()方法 |
TERMINATED | 结束态,terminated() 方法已完成 |
状态转换流程:
其他属性
/*
用于保存任务并将其传递给工作线程的队列
是一种阻塞队列
*/
private final BlockingQueue<Runnable> workQueue;
/*
通过可重入锁控制对Workers的访问,这里是为了防止多线程并发时c
*/
private final ReentrantLock mainLock = new ReentrantLock();
/*
存储线程池的工作线程,只有持有mainLock锁才能访问workers
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/*
用于等待终止的条件
*/
private final Condition termination = mainLock.newCondition();
/*
当前线程池中包含的工作线程的数量,持有mainLock才能访问
*/
private int largestPoolSize;
/*
已完成任务的计数器,仅在终止工作线程时更新,持有mainLock才能访问
*/
private long completedTaskCount;
/*
用于创建线程的工厂,保证了多线程之间的可见性
*/
private volatile ThreadFactory threadFactory;
/*
线程池的决绝策略
*/
private volatile RejectedExecutionHandler handler;
/*
闲置线程的最大存活时间,主要用于销毁超过核心线程数量的那部分线程
当allowCoreThreadTimeOut为false时核心线程不受此参数的影响,否则核心线程也会被销毁
*/
private volatile long keepAliveTime;
/*
控制核心线程是否被超时时间影响的变量
*/
private volatile boolean allowCoreThreadTimeOut;
/*
核心线程数量
*/
private volatile int corePoolSize;
/*
最大线程数量
*/
private volatile int maximumPoolSize;
构造方法
ThreadPoolExecutor有四种构造方法,它们都是重载的构造方法,最终都调用了参数最全面的构造方法.
// 如果我们不提供拒绝策略,这里默认的拒绝策略就是拒绝执行任务并抛出异常
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:核心线程数量,当提交任务时,如果工作线程数量没有达到核心线程数量,就直接创建新的工作线程,否则就直接加入阻塞队列
maximumPoolSize:最大线程数量,线程池允许存在的最大线程数量,也就是当阻塞队列满时,且工作线程数量大于corePoolSize小于maximumPoolSize时,可以直接创建新的工作线程用于执行这个新任务
keepAliveTime:闲置线程的最大生存事件,当非核心线程的闲置时间大于keepAliveTime时,就回收这个线程
unit:时间的单位
workQueue:阻塞队列,当工作线程数量大于corePoolSize时,直接将提交的任务加入阻塞队列
threadFactory:线程工厂,用于生产线程的工厂,如果我们不提供的话,默认通过Executors.defaultThreadFactory()获得线程工厂
handler:拒绝策略,当工作线程数量达到maximumPoolSize且阻塞队列已满时,丢弃任务的策略
execute(Runnable command)方法
该方法用于异步执行提交的任务。提交的任务可能在新的线程中执行也可能在线程池中存在的线程执行。如果要执行的任务无法提交,可能是因为线程池关闭或者线程池到达了最大容量,此时会采用我们定义的拒绝策略来拒绝新提交的任务。
首先来看看execute方法的源码:
public void execute(Runnable command) {
// 如果新提交的任务为null,就抛出异常
if (command == null)
throw new NullPointerException();
/*
接下来将可能会有三个流程:
1、如果当前的工作线程数量小于corePoolSize定义的数量,就尝试启动一个新的工作线程去执行这个给定的任务
2、如果任务成功加入队列,我们需要进行检查是否需要添加一个线程或者进入该方法后线程池是否关闭
3、如果不能将新的任务加入队列中,我们尝试添加一个新的线程,如果不能启动新的线程,那么就拒绝执行任务
*/
int c = ctl.get();
// workerCountOf获取当前的工作线程数量,如果小于corePoolSize的话就执行addWork方法添加工作线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// isRunning判断线程池当前的状态,并将任务加入阻塞队列,如果成功加入就执行if的逻辑
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检查线程池是否在执行,如果没有处于running状态就移除并拒绝任务
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池没有处于running状态,检查工作线程的数量是否为0,是的话就添加一个空的新阿成
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果不能成功加入就执行addWorker方法尝试增加工作线程,不成功就拒绝任务
else if (!addWorker(command, false))
reject(command);
}
execute就是用于提交任务的,提交任务的过程中会有以下三种情况:
1、如果当前的工作线程数量小于corePoolSize定义的数量,就尝试启动一个新的工作线程去执行这个给定的任务
2、如果任务成功加入队列,我们需要进行检查是否需要添加一个线程或者进入该方法后线程池是否关闭
3、如果不能将新的任务加入队列中,我们尝试添加一个新的线程,如果不能启动新的线程,那么就拒绝执行任务
addWorker方法
addWorker方法主要用于添加工作线程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 获取线程池当前的运行状态
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池处于非running状态
// 且不满足(线程处于shutdown状态并且任务为空并且任务队列不为空)条件,就直接返回
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 自旋
for (;;) {
// 获取当前工作线程的数量
int wc = workerCountOf(c);
// 如果工作线程数量wc大于CAPACITY,或者根据core判断wc是否大于核心或者最大线程数量,满足任一条件就返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加工作线程的数量,并跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 工作线程启动或者添加完成的标志
boolean workerStarted = false;
boolean workerAdded = false;
// 新建一个空的工作线程
Worker w = null;
try {
// 将提交的任务包装成工作线程
w = new Worker(firstTask);
// 获取工作线程的线程
final Thread t = w.thread;
// 如果工作线程中的线程不为空就继续执行
// worker使用ThreadFactory创建线程有可能为空
if (t != null) {
// 获取线程池的mainLock锁
final ReentrantLock mainLock = this.mainLock;
// 加锁,获取共享资源的访问权限
// 这里为什么要加锁呢?因为我们要访问共享变量largestPoolSize
mainLock.lock();
try {
// 继续检查线程池状态
int rs = runStateOf(ctl.get());
// 如果线程池状态为running(因为只有running状态的值小于shutdown)
// 或者线程池处于中断状态且提交的任务为空则进入处理流程
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 检查工作线程包含的线程是否是可启动的
if (t.isAlive())
throw new IllegalThreadStateException();
// 如果是的话,就将工作线程加入workers中进行同一管理
workers.add(w);
// 如果工作线程的数量大于largestPoolSize,就更新largestPoolSize的值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 并标志成功添加工作线程
workerAdded = true;
}
} finally {
// 解锁
mainLock.unlock();
}
// 如果成功添加工作线程,就调用当前工作线程的start方法启动,并标志工作线程的线程启动
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程没有成功启动证明添加工作线程的方法失败了,调用addWorkerFailed方法进行处理
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 该方法主要用于在添加失败之后减少工作线程的数量
// 因为addWorker会在添加工作线程之前就增加其数量,所以添加失败后将相应工作线程数量减1
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 如果Worker对象不为空,则从workers中移除该对象
if (w != null)
workers.remove(w);
// 减少正在工作的线程数量
decrementWorkerCount();
// 尝试终止线程
tryTerminate();
} finally {
mainLock.unlock();
}
}
// 当线程池处于shutdown状态并且线程池和队列为空,或者处于stop状态且线程池为空时
// 将线程池状态改为terminated
final void tryTerminate() {
// 自旋地改变状态
for (;;) {
int c = ctl.get();
/*
如果线程池处于running状态,还在运行不需要终止
或者c处于tidying或terminated状态,此时已经终止了
或者线程池状态为shutdown并且阻塞队列非空,此时还有任务未处理完不能终止
直接返回,不需要终止线程池
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果工作线程的数量不为0,那就中断闲置的线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 获取mainLock锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置ctl的值为tidying,也就是当前线程池的状态为tidying,工作线程的数量为0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 这个方法为空的实现方法
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
// 中断闲置的线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历workers,中断worker对应的线程
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
protected void terminated() { }
从源码的分析中可以看出,addWorker方法首先判断线程池的状态,如果线程池处于运行状态,或者当线程池处于关闭状态时,提交的任务为空,阻塞队列不为空时,该方法才会继续向下执行.
然后会判断工作线程的数量,如果不大于核心线程或者最大线程数量,那就增加当前工作线程的数量.
然后会将提交的任务包装成worker对象添加到workers中,如果成功添加了的话,就直接启动该线程执行提交的任务.
添加失败的话就会调用addWorkerFailed方法.
总的来说,addWorker方法会执行两部分内容:
- 通过自旋操作,判断能否继续创建新的工作线程,只有通过这部分判断,才能继续下一步
- 创建并添加工作线程,此时还会启动工作线程执行任务
这些提交的任务何时才执行的,下一节将会继续讲解。
getTask方法
runWorker是线程池中执行worker的关键方法,这个方法是一个while循环,当提交的任务或者阻塞队列的任务不为空时,需要一直运行线程池中的任务。所以一个Worker一旦执行,调用自己的run方法,接着就会调用runWorker方法一直参与任务的执行。
我们首先看看runWorker使用的getTask方法。
getTask方法是为了从阻塞队列中获取任务,然后交给线程池执行。
private Runnable getTask() {
// 判断是否超时
boolean timedOut = false; // Did the last poll() time out?
// 自旋获取任务
for (;;) 深入理解ThreadPoolExecutor第一弹