Java 线程池 ThreadPoolExecutor源码简析
Posted 郭梧悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 线程池 ThreadPoolExecutor源码简析相关的知识,希望对你有一定的参考价值。
如果你对线程池的概念还不甚了解,那么Java 线程池 Executor浅入浅出这篇文章可以让你一步步从0到1了解线程池的搭建过程。关于线程池的更详细的说明,《Java并发编程实战》这本书上讲的很透彻,强烈推荐阅读。
在Executors
中为我们提供了多种静态工厂方法来创建各种特性的线程池,其中大多数是返回ThreadPoolExecutor
对象。因此本篇博文直接对ThreadPoolExecutor
的原理进行剖析,加深对线程池设计的理解。ThreadPoolExecutor
有如下两个重要参数:
corePoolSize
线程池核心线程的数量。如果线程池里得线程个数小于corePoolSize
,那么就会立即创建一个新线程对象来执行任务。如果任务数大于核心线程数怎么办呢?那么就将任务放在队列中,等待执行。
比如我们去银行办理业务,有若干个人工服务窗口,当来办理业务的客户比较少的时候,可以不用排队,直接去空闲的窗口去办理业务。如果客户比较多了,那么就需要就需要领号排队了。但是服务窗口有限,来办理业务的人太多了,怎么办?maximumPoolSize就派上用场了。
我们需要注意的是,初始化ThreadPoolExecutor的时候,线程Thread不会立即创建和启动,而是等到提交任务的时候才会创建启动,当然调用了prestartAllCoreThreads的话,是可以创建并启动所有Thread.
/**
*启动所有核心线程,让他们处于idly状态等待任务的到来。
* @return 返回启动的线程的数量
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
另外ThreadPoolExecutor提供了getPoolSize()方法,该方法并不是返回初始化的时候设置的corePoolSize的大小,而是获取线程中实际存在的线程的数量,如果线程池处于TIDYING状态,则返回0:
//存储正在工作的线程
private final HashSet<Worker> workers = new HashSet<>();
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//如果线程处于TIDYING状态,说明线程池里的线程为0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
maximumPoolSize
既然有核心线程,那么也有非核心线程,但是这个线程池这个池子不能无限大,maximumPoolSize
就限定了线程池中允许的最大线程数。
在线程池阻塞队列(非无界队列)装满了任务之后,如果还有任务加入,如果当前的线程数小于maximumPoolSize
,则会新建线程来执行任务。否则就会拒绝执行。
比如去银行排队办理业务的客户越来越多,银行服务大厅已经装不下了。此时如果除了服务窗口之外,银行还有额外的资源可以办理客户的业务,那么就使用额外的资源来处理任务。如果额外的资源也已经满负荷服务客户,对于后面新来的客户,只能“欢迎下次光临”了。
workQueue
用来保存等待执行任务的队列,等待的任务必须实现Runnable
接口,线程池与工作队列WorkQueue
密切相关,因为并不是一个线程负责处理一个任务,而是一个线程负责处理好多任务。WorkQueue负责保存所有等待执行的任务,而线程池里的线程从工作队列中获取一个任务,然后执行,执行完毕后等待下一个任务。我们先来看看工作线程(下文简称WorkThread
)怎么执行任务的,具体的在ThreadPoolExecutorl
类的runWorker
方法里面,该方法的主要作用就是不断的从WorkQueue
中获取任务并执行之:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 释放锁,允许中断
boolean completedAbruptly = true;
try {
//开启一个循环,从WorkQueue中获取任务并执行之
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//空方法:任务执行之前的回调
beforeExecute(wt, task);
Throwable thrown = null;
//执行任务
task.run();
//空方法:任务执行完毕之后的回调
afterExecute(task, thrown);
}
} finally {
task = null;
//记录当前Worker已经完成任务的数量
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//使得工作线程退出
processWorkerExit(w, completedAbruptly);
}
}
注意Worker 是一个Runnable,runWorker方法就是在Worker的run方法中执行的。
Worker对象简单说明
Worker
是线程池里一个重要的对象,该类还是一个Runnable
,该类持有一个Thread
,代表着Worker
对象在哪个线程中执行,在初始化Worker
对象的时候就创建了线程。.
{
//由ThreadFactory创建,代表着Worker对象在哪个线程中执行
final Thread thread;
/** Initial task to run. Possibly null.
Worker对象第一个执行的任务,可能是null,后面在执行任务的时候就要从WorkerQueue里面获取 */
Runnable firstTask;
/** 记录每个线程完成工作的数量*/
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//创建线程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
另外Worker
提供了CAS
算法进行自旋,提供了lock
和unlock
等方法,其中1代表着处于locked
状态,而0除以非locked状态.关于自旋CAS
的基础知识,感兴趣的可以阅读博主的《java线程知识点拾遗(CAS)》。
execute方法
ThreadPoolExecutor
提供了execute
方法,主要是用新的线程或者线程池里已经有的线程,在未来某个时期执行一个任务,当线程池shutDown
或者达到最大容量的时候,任务就不会被执行,此时线程池会执行拒绝策略RejectedExecutionHandler
。下面再来大致分析下该方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果线程池里的线程小于核心数量
if (workerCountOf(c) < corePoolSize) {
//创建一个新线程,并将command作为线程执行的第一个任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程池处于Running状态,且将任务command成功加入WorkerQueue中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次校验状态,如果线程池不是处于Running状态,则从队列中移除并执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)//注意这是一个很细节的点
//线程池没有活动线程了,但是队列里还有任务没执行这种特殊情况,则重新添加一个线程执行。
addWorker(null, false);
}
//队列已经满了,则重新创建线程执行,如果失败则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
在这里有一个值得注意的细节,调用workQueue.offer(command)
将任务添加到队列之后,如果线程中有正在执行任务的线程,则这个任务会在未来某个时间内执行。但是如果任务入队之后,恰好线程池中没有活跃的线程了,怎么办呢,就再次addWorker
,启动新线程执行。(举个不恰当的例子,就好像去即将打烊的饭店去吃饭,你已经扫码付好钱(入队列),然而厨师恰巧都已经准备下班了,不得不重新给你做饭一样),注意此时addWorker
的第一个参数是null,因为我们已经将command
通过offer
添加到队列中了,具体代码段如下:
else if (workerCountOf(recheck) == 0)//注意这是一个很细节的点
//线程池没有活动线程了,但是队列里还有任务没执行这种特殊情况,则重新添加一个线程执行。
addWorker(null, false)
可以用如下流程图来大致表示下线程池的工作流程。
addWorker方法简析
addWorker的主要作用就是,在线程池当前的状态和既有的线程边界(corePoolSize,maximumPoolSize)下,新的任务是否能添加成功。
*
* @param firstTask 当Worker被创建的时候(此时线程池里的线程数量小于核心线程),firstTask将作为Worker对象的第一个任务来执行。
* 此时firstTask并不会添加到队列中。 或者当队列已经满的情况下,也是直接创建Worker,但是fistTask也不会添加到WorkQueue中去。
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {// 外层自旋
int c = ctl.get();
//获取线程状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//如果线程池处于关闭状态,并且WorkerQueue中还有任务在等待执行,则返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {//内层自旋
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//如果线程池内的线程达到最大值,比如达到corePoolSize或者maximunPoolSize,则返回false
return false;
if (compareAndIncrementWorkerCount(c))
// 以CAS的方式尝试把线程数加1
// 注意这里只是把线程池中的线程数加1,并没有在线程池中真正的创建线程
break retry;
c = ctl.get(); // Re-read ctl
//如果此时线程池的状态和上次获取的不一样,则继续执行外层自旋
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}//end inner for
}//end outter for
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建Worker,同时也就创建了Thread对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将Worker添加到HashSet种
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动线程执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker工作的大致流程如下:
1、判断线程池状态,如果需要创建Worker对象并同时创建工作线程
2、将此worker添加进workers集合中。
3、启动worker对应的线程,运行worker的run,最终执行了runWorker方法。
4、如果失败了回滚worker的创建动作(addWorkerFailed 方法),即将worker从workers集合中删除,并原子性的减少workerCount。
keepAliveTime的使用原理
该变量表示线程空闲的时间。线程的创建和销毁是需要代价的。线程执行完任务后不会立即销毁,而是继续存活一段时间,存活的时间就是keepAliveTime。默认情况下,该参数只有在线程数大于corePoolSize时才会生效.该变量在从WorkQueue里获取任务的时候会被使用,在前文runWorker里就调用了从workQueue里面获取任务的方法:getTask():
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {//开启一个循环
int c = ctl.get();
int rs = runStateOf(c);
//当线程处于SHUTDOWN 状态时,并且workQueue请求队列为空,线程数量的表示-1,并返回一个null。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取当前工作线程数
int wc = workerCountOf(c);
//如果allowCoreThreadTimeOut设置为true,或者线程数量大于核心线程数量,keepAliveTime才起作用
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//减少工作线程
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//从阻塞队列中取任务,根据情况选择一直阻塞等待还是超时等待
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//成功从队列中获取到了任务,并返回之
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
可以看出,keepAliveTime
是在 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
方法中起作用的。在runWorker方法中,开始while循环获取任务,如果超时了就可以跳出while循环,从而Thread完成了其任务而被销毁。:
final void runWorker(Worker w) {
//省略部分代码
while (task != null || (task = getTask()) != null) {
//省略部分代码
}
//省略部分代码
}
以上是关于Java 线程池 ThreadPoolExecutor源码简析的主要内容,如果未能解决你的问题,请参考以下文章