超详细的java线程池源码解析
Posted IT农厂
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了超详细的java线程池源码解析相关的知识,希望对你有一定的参考价值。
线程池的继承关系是这样的ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService是一个抽象类,它实现了ExecutorService接口,ExecutorService又是继承了Executor接口。
继承关系:
ThreadPoolExecutor->AbstractExecutorService->ExecutorService->Executor
线程池的核心方法是execute(Runnable command) 和submit(Runnable task) 而submit方法也是调用execute(Runnable command)完成,所以重点来看execute(Runnable command)的源码
[java] view plain copy
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果当前线程数小于核心线程数大小执行addWorker()方法,增加一个线程执行
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//成功执行addWorker()就返回
if (addWorker(command, true))
return;
//没有成功执行获取最新的当前线程数
c = ctl.get();
}
//如果是运行状态,并且加入等待队列成功执行if块(额外含义:线程池是运行状态已经达到核心线程数,优先放入队列)
if (isRunning(c) && workQueue.offer(command)) {//1
//先获取最新的线程数
int recheck = ctl.get();
//再次判断如果线程池不是运行态了并且移除本次提交任务成功,执行拒绝操作
if (! isRunning(recheck) && remove(command))
reject(command);
//如果是运行状态,或者线程不是运行态但是移除任务队列失败,
//则检查是否有工作线程在消费队列,如果有则什么都不做(可以确保刚提交进队列的任务被完成),
//如果没有需要建立一个消费线程用来消费刚刚提交的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);//2
}
//如果不是运行态或者加入队列失败那么尝试执行提交过来的任务,如果执行失败,走拒绝操作(额外含义:核心线程数满了,队列也满了,尝试建立新的线程消费,新线程数要小于最大线程数)
else if (!addWorker(command, false))
reject(command);
}
总结概述:
来了新任务,如果工作线程数还没有达到线程池的核心线程数尝试创建新的线程执行(addWork方法里)。
如果已经达到核心线程数或者开启新线程失败,检查线程池是否为运行态,是的话加入等待队列。
如果线程池是已经不再运行态或者加入等待队列失败,尝试开启一个线程执行刚提交的任务,开线程失败执行拒绝流程。
如果是运行态并且也加入到等待队列成功,检查线程池是否还是运行(可能被其他线程停止),如果不是运行态,执行移除操作,然后执行拒绝策略,
如果是运行态或者不是运行态但移除任务失败检查还有没有线程在消费任务,没有的话尝试建立一个消费线程消费刚提交到等待队列里的任务
消费任务的重要方法是addWorker(Runnable firstTask, boolean core);
其有四种组合:
一、addWorker(Runnable,true)小于核心线程数使用
二、addWorker(Runnable,false)大于核心线程数,并且等待队列也满了情况使用
三、addWorker(null,true)没有任务创建一个线程等待任务到来使用(小于核心线程数的情况)
四、addWorker(null,false)没有任务创建一个线程等待任务到来使用(小于最大线程数的情况)
[java] view plain copy
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池状态RUNNING= -1;SHUTDOWN=0;STOP=1;TIDYING=2;TERMINATED=3
// 如果线程池状态是shutdown及以后的任意一种状态,说明调用了关闭线程池的方法,
//并且不符合[rs等于shutdown,并且传进来的任务是空,并且工作队列不等于空],
//这个判断条件是为了处理上个方法代码2处的情况,
//即线程池已经不是运行态(仅仅调用了shutdown方法),并且弹出队列失败,
//这种情况需要保证提交上来的任务得到执行,因此传过来一个null的任务,
//目的是为了让线程池启动一个线程执行刚提交的任务,
//(隐含shutdown状态添加到队列中的任务(移除失败的)还是会被执行),
//如果已经不只是SHUTDOWN证明掉用过shutdownnow方法,直接返回false,
//或者仅调用shutdown后又来的新任务也返回false拒绝执行,
//或者是刚添加到队列的任务已经被其他线程消费过了,也返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//检查工作线程数,如果大于线程池最大上限CAPACITY(即使用int低29位可以容纳的最大值)
//或者跟边界值比较已经到达边界值都返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//如果增加工作数成功跳出循环往下执行
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果增加工作线程数失败(可能调用了shutdown方法),
//如果两次状态不一致则跳转到retry处重新尝试执行
if (runStateOf(c) != rs)
continue retry;
// 都没发生循环执行
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//把传进来的任务包装成worker对象
w = new Worker(firstTask);
//实际上t就是worker对象,只不过有名字等相关信息
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次检查线程池状态
int rs = runStateOf(ctl.get());
//如果是运行态直接执行,或如果是shutdown状态但传进来是个null,即前边说的移除队列失败情况
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 检查这个对象是否被其他线程执行过
throw new IllegalThreadStateException();
//加入到workers中
workers.add(w);
int s = workers.size();
//如果大于曾经执行过的最大线程数则最大线程数加1
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果增加成功启动新线程执行
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果启动失败从workers中移除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
总结概述:这个方法功能是保证在线程池为运行状态下或者虽然不是运行状态但是强制要求把已经添加到任务队列的线程执行完,执行的过程是创建一个新线程执行
从上方代码看出Worker是执行线程的核心,那么看下这个内部类是怎样的,首先它实现了Runable接口,并且继承了AbstractQueuedSynchronizer类
[java] view plain copy
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
//指向提交过来的任务
this.firstTask = firstTask;
//指向自己
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
没有太多特别的不多解释
run方法调用的是runWorker()方法这个是运行的核心
[java] view plain copy
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//当前线程
Runnable task = w.firstTask;//提交上来的任务
w.firstTask = null;
w.unlock(); // 调用Worker类的tryRelease()方法,将state置为0,
//而interruptIfStarted()中只有state>=0才允许调用中断
boolean completedAbruptly = true;
try {
//先执行提交上来的任务,完成后循环从队列中取任务执行
while (task != null || (task = getTask()) != null) {
w.lock();//加锁保证调用中断后运行的任务可以正常完成
//执行新任务前要做以下判断
//1如果线程池状态是大于等于stop(调用shutdownnow方法了),
//直接查看当前线程符合未设置中断位 则直接调用wt.interrupt()方法设置
//2如果线程池不是大于等于stop状态,则调用Thread.interrupted()清除interrupt位,
//这时如果程池为大于stop状态(有其他线程调用线程池的stopnow方法),
//再查看当前线程符合未设置中断位,如果没设置调用wt.interrupt()方法设置
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
//线程池是运行态不会走到这
wt.interrupt();//尝试终止正在执行的任务,这里仅仅设置一个标志位
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//直接调用run方法,在当前线程中执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
总结概述:此方法特别在执行线程直接在当前线程中调用线程队列中的run方法,而没有新建线程,确保了线程的重复利用
线程执行完当前任务会循环读取队列中等待的任务,下边看看如何取队列中的任务
[java] view plain copy
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//线程池状态RUNNING= -1;SHUTDOWN=0;STOP=1;TIDYING=2;TERMINATED=3
// 如果线程池大于等于SHUTDOWN(调用过shutdown方法),
//判断是否是stop(调用shutdownnow)之后的状态或者等待队列已经为空
//言外之意调用过shutdownnow将停止执行等待队列中的任务,
//还有只掉用过shutdown方法会保证工作队列中的任务会被执行完
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//已经调用shutdown或者等待队列中的任务已经执行完,如果调用shutdownnow队列中的任务还没执行完那就放弃执行
//减少工作线程数
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 工作线程数大于核心线程数或者核心线程超时时间为真(默认为false)
//allowCoreThreadTimeOut为true超时会关闭线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1工作线程数大于最大线程数或【超时关闭标志位真且真的超时了】
//2 上个条件成立(言外之意工作线程数大于最大线程数或者已经查过空闲时间没任务,
//此时可能需要关闭一个线程了),并且确实有线程在工作(有工作线程才需要关闭),
//或者任务队列没工作任务了(没任务了对应的是超时那种情况)
//可能情况:1.wc > maximumPoolSize成立,wc > 1成立
//:大于核心线程数,有线程在运行,关闭一个线程
// 2.wc > maximumPoolSize成立,workQueue.isEmpty() 成立
//:大于核心线程数,队列中已经没有任务可执行,关闭一个线程
// 3.(timed && timedOut)成立,wc > 1 成立
//:线程空闲超时,有线程在运行,关闭一个线程
// 4.(timed && timedOut)成立,workQueue.isEmpty()成立
// :线程空闲超时,队列中没有可执行的任务
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//工作数量减一并返回null 返回null上层方法就会结束当前线程
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果上述情况不满足则正常取任务执行
Runnable r = timed ?
//没有任务会挂起指定时间(言外之意已经大于核心数或者有超时时间的不能永久的阻塞下去)
workQueue.take();//没有任务会阻塞直到有任务来
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
总结概述:只要线程池没有调用shutDown就尝试取任务消费,已调用shutdown但队列还有任务没执行完,尝试取执行。大于核心线程数或者已经超时队列中没任务可执行,则尝试关闭当前线程。
整体总结:有新任务来到,如果没有达到核心线程数,则启动新线程执行,已经达到核心线程数尝试放到队列,核心线程数和队列都满但核心线程数没有达到最大线程数再建立一个线程执行,如果都满了就拒绝执行。
执行过程中要重复不断的检查线程池的状态,如果只调用过shutDown,但线程池中还有等待执行的队列则取执行完等待的任务,并拒绝新到的任务(抛出异常),如果调用shutDownNow方法则放弃执行队列中的任务,并尝试终止正则执行的任务。
如果工作线程数大于核心线程数或者线程空闲时间大于设置时间,那么尝试终止当前线程。如果没有设置超时终止则没有任务执行时线程阻塞。
以上是关于超详细的java线程池源码解析的主要内容,如果未能解决你的问题,请参考以下文章
Java Executor源码解析—Executors线程池工厂以及四大内置线程池
Java Executor源码解析—ThreadPoolExecutor线程池execute核心方法源码一万字
Java Executor源码解析—ThreadPoolExecutor线程池submit方法以及FutureTask源码一万字