Executor Framework分析 ThreadPoolExecutor部分函数分析
Posted ZhangJianIsAStark
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Executor Framework分析 ThreadPoolExecutor部分函数分析相关的知识,希望对你有一定的参考价值。
前一篇博客中,我们分析了ThreadPoolExecutor主要参数的含义。
本篇博客,继续分析下ThreadPoolExecutor中的函数。
前言
在分析代码前,我们先了解下ThreadPoolExecutor定义的生存周期。
ThreadPoolExecutor初始时处于RUNNING状态,在该状态下,
线程池可以接收新的任务,同时可以处理WorkQueue中积攒的任务。
当调用了shutdown函数后,ThreadPoolExecutor就变为SHUTDOWN状态了。
在该状态下,线程池不再接收新的任务了,但可以处理WorkQueue中积攒的任务。
当调用了shutdownNow函数后,ThreadPoolExecutor就变为STOP状态了。
在该状态下,线程池不再接收新的任务了,也不处理WorkQueue中积攒的任务,
并且会中断正在执行的任务。
若所有的任务全部执行完毕,且线程池中的线程数量为0时,
ThreadPoolExecutor就变为TIDYING状态。
到这个状态后,线程池的生命周期基本走到了尽头,
此时回调terminated函数,最后进入TERMINATED状态。
一、 shutdown
了解ThreadPoolExecutor的生命周期后,我们先看看shutdown函数:
public void shutdown()
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
//判断是否有对应的权限
checkShutdownAccess();
//将线程池的状态改为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断空闲的线程
interruptIdleWorkers();
//这个是回调的Hook函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
finally
mainLock.unlock();
//尝试结束线程,后面再分析
tryTerminate();
调用shutdown后,对线程池的影响主要体现在:
1、对新到来任务的处理;
2、对已有线程的操作。
接下来我们分析下这两部分相关的代码。
1.1 新任务的处理
为了分析shutdown后对新任务的影响,我们来看看execute相关的代码:
public void execute(Runnable command)
.............
int c = ctl.get();
//线程数小于corePoolSize时,添加线程
if (workerCountOf(c) < corePoolSize)
//shutdown后,addWorker将会失败,等下分析
if (addWorker(command, true))
return;
c = ctl.get();
//线程数大于corePoolSize时,isRunning失败
//不会将任务加入队列中
if (isRunning(c) && workQueue.offer(command))
............
//addWorker失败,直接拒绝任务
else if (!addWorker(command, false))
//从之前的博客我们知道
//如果RejectedExecutionHandler要重新处理command
//一定会先判断线程池的状态,因此任务最后不会被处理
reject(command);
从上面的代码,不难得出结论:
一旦shutdown后,线程池不会为新的任务创建线程,
不会将新的任务加入到WorkQueue中,
RejectedExecutionHandler也不会再处理任务。
为了验证说法的正确性,我们确认下addWorker的代码:
private boolean addWorker(Runnable firstTask, boolean core)
retry:
for (;;)
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//由于新任务到来调用addWorker时,firstTask不为null
//因此一旦SHUTDOWN后,将会return false, 不会再创建Worker
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
.....
1.2 已有线程的处理
shutdown函数中,改变已有线程工作行为的主要是interruptIdleWorkers函数。
我们跟进一下该函数:
private void interruptIdleWorkers()
interruptIdleWorkers(false);
//onlyOne为false,表示尝试中断所有idle workers
private void interruptIdleWorkers(boolean onlyOne)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
for (Worker w : workers)
Thread t = w.thread;
//尝试获取worker中的lock,非阻塞操作,底层使用compareAndSwap
//然后调用worker对应线程的interrupt函数
if (!t.isInterrupted() && w.tryLock())
try
t.interrupt();
catch (SecurityException ignore)
finally
w.unlock();
//从这里看出,当onlyOne为true时,
//只尝试中断一个线程
if (onlyOne)
break;
finally
mainLock.unlock();
为了更好地理解interruptIdleWorkers函数,
我们需要对比看看执行任务对应的runWorker函数:
1.2.1 runWorker
final void runWorker(Worker w)
............
try
//从workQueue中获取消息
while (task != null || (task = getTask()) != null)
//获取消息成功,worker就会lock
w.lock();
......
try
......
finally
task = null;
w.completedTasks++;
//任务执行完毕才会unlock
w.unlock();
.............
finally
processWorkerExit(w, completedAbruptly);
看完runWorker的函数后,我们就能理解interruptIdleWorkers函数如此命名的原因了。
如果一个Worker已经开始处理任务了,那么Worker将会持有锁,
interruptIdleWorkers函数无法中断该Worker。
因此,interruptIdleWorkers函数只能打断处于空闲态的Worker。
1.2.2 getTask
最后,我们看看getTask中相关的代码:
private Runnable getTask()
............
for (;;)
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 调用shutdown后, rs为SHUTDOWN
//只有workQueue为空才会返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty()))
//减少计数
decrementWorkerCount();
return null;
.....
.....
至此,我们可以看出一旦shutdown后,线程池不会处理新来的任务,同时会停止空闲的线程;
不过在shutdown时,已经从workQueue中取出任务的线程,将会执行完当前的任务。
正在工作的线程执行完当前任务后,会继续从workQueue中取出任务,
直到workQueue为空,才会被真正地移除。
由于shutdown靠”锁”机制来判定一个线程是否空闲,因此可能出现这种情况:
即workQueue中明明还有任务,但所有线程刚好完成任务,
在取出下一个任务执行前,就被shutdown了,导致workQueue中的任务未被处理完毕。
此时,就要靠runWorker中的processWorkerExit函数,来处理workQueue中遗留的任务了。
我们在后文分析processWorkerExit函数。
二、shutdownNow
了解完shutdown函数后,我们对比分析下shutdownNow函数:
public List<Runnable> shutdownNow()
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
checkShutdownAccess();
//不同的是shutdownNow直接让状态变为STOP
//即SHUTDOWN后面的一个状态
advanceRunState(STOP);
//这里调用的是interruptWorkers
//shutdown调用的是interruptIdleWorkers
interruptWorkers();
//多了调用drainQueue的步骤
tasks = drainQueue();
finally
mainLock.unlock();
//同样调用了tryTerminate
tryTerminate();
return tasks;
调用shutdown和shutdownNow后,线程池均不会再处理新到来的任务,
这一点表现一致,不再过多地分析了。
对于已有线程的处理,shutdownNow的表现要更加强势一下。
我们先来看看interruptWorkers函数:
private void interruptWorkers()
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
//不尝试获取Worker的锁,直接调用interruptIfStarted
//即不care worker是否正在处理任务,直接interrupt
for (Worker w : workers)
w.interruptIfStarted();
finally
mainLock.unlock();
跟进下Worker的interruptIfStarted函数:
void interruptIfStarted()
Thread t;
//Worker启动后,state就>=0
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted())
try
//直接interrupt worker的线程
t.interrupt();
catch (SecurityException ignore)
根据上述代码,我们可以知道,一旦调用了shutdownNow,
线程池中所有运行的线程都将被interrupt,即使此时还有任务正在执行。
根据runWorker的代码不难看出,
运行的线程被interrupt后,也会调用processWorkerExit函数。
同样,后文再分析processWorkerExit函数。
在这部分的最后,跟进下drainQueue函数:
private List<Runnable> drainQueue()
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<>();
//利用drainTo函数,移除所有workQueue中的任务,加入到taskList中
q.drainTo(taskList);
//这里是处理移除失败的场景
if (!q.isEmpty())
//一个个移除
for (Runnable r : q.toArray(new Runnable[0]))
if (q.remove(r))
taskList.add(r);
//返回被移除的task
return taskList;
至此,shutdownNow函数分析完毕。
根据上述代码,我们发现调用shutdownNow后,
线程池不会再处理新到来的任务,同时中断所有已经运行的线程,
并移除WorkQueue中所有积攒的任务。
三、processWorkerExit
现在我们来分析下,前文中多次提及的processWorkerExit函数。
//completedAbruptly表示因异常结束worker
private void processWorkerExit(Worker w, boolean completedAbruptly)
//此处是处理异常
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
//正常结束时,在getTask中就会调用decrementWorkerCount
//此处为异常结束时,校正计数
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
completedTaskCount += w.completedTasks;
//移除对应的worker
workers.remove(w);
finally
mainLock.unlock();
//移除线程时,也会调用tryTerminate
tryTerminate();
int c = ctl.get();
//相当于判断是否调用了shutdownNow
//如果没有的话,则做进一步处理
if (runStateLessThan(c, STOP))
//非异常结束时,进一步判断
if (!completedAbruptly)
//确定最小允许的线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//没有调用shutdownNow时,workQueue中的任务仍需处理
//至少要保留一个线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//当前存活的线程数足够,无需在做添加
if (workerCountOf(c) >= min)
return; // replacement not needed
//线程异常结束,或线程数不够时
//移除线程后,要重新增加一个
//此时第一个参数为null,使得即使shutdown时,所有线程都被中断,
//只要workQueue还有任务,就会重新创建个线程处理任务
//因此,调用shutdown时,workQueue中积攒的任务应该可以被全部执行完
addWorker(null, false);
从上面的代码来看,processWorkerExit主要负责清除参数中的Worker。
不过清除Worker时,processWorkerExit还需要根据情况,判断是否需要创建一个新的Worker。
该机制确保出现异常或shutdown后,WorkQueue中积攒的任务能被处理完毕。
四、tryTerminate
在这部分,我们来看看tryTerminate函数。
无论是调用shutdown、shutdownNow及线程超时被移除时,
都最终会触发tryTerminate函数。
final void tryTerminate()
//说实话这个for循环看的我很慌....
for (;;)
int c = ctl.get();
//生命周期状态小于SHUTDOWN
if (isRunning(c) ||
//生命周期状态大于TIDYING
runStateAtLeast(c, TIDYING) ||
//生命周期状态为SHUTDOWN, 但workQueue未处理完
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
//不结束
return;
//所有的Worker都被移除时,才能进行后续操作
if (workerCountOf(c) != 0) // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try
//将生命周期状态置为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0)))
try
//Hook函数
terminated();
finally
//最后状态置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//与awaitTermination函数对应,通知该函数返回
termination.signalAll();
return;
finally
mainLock.unlock();
tryTerminate的逻辑比较简单,就是判断是否满足结束条件。
若满足条件,就将线程池的状态置为TIDYING,然后回调terminated函数。
最后将线程池的状态置为TERMINATED,并通知潜在的等待线程池结束的对象。
五、Hook函数的套路
除了前文提接的onShutdown和terminated函数外,
ThreadPoolExecutor在执行任务时,也提供了一些Hook函数。
我们还是来看看runWorker函数:
final void runWorker(Worker w)
...........
try
while (task != null || (task = getTask()) != null)
...........
try
//Hook函数,在执行每个task前调用
beforeExecute(wt, task);
...............
try
task.run();
........
finally
//Hook函数,在执行每个task后调用
afterExecute(task, thrown);
finally
..............
finally
processWorkerExit(w, completedAbruptly);
上面的代码中提及了beforeExecute、afterExecute两个Hook函数。
利用这两个Hook函数,我们可以玩出许多套路,
例如官方给出的具有暂停能力的ThreadPoolExecutor。
class PausableThreadPoolExecutor extends ThreadPoolExecutor
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...)
super(...);
//执行任务前,判断是否暂停
protected void beforeExecute(Thread t, Runnable r)
super.beforeExecute(t, r);
pauseLock.lock();
try
//暂停就阻塞
while (isPaused) unpaused.await();
catch (InterruptedException ie)
t.interrupt();
finally
pauseLock.unlock();
//暂停
public void pause()
pauseLock.lock();
try
isPaused = true;
finally
pauseLock.unlock();
//恢复并通知
public void resume()
pauseLock.lock();
try
isPaused = false;
unpaused.signalAll();
finally
pauseLock.unlock();
六、总结
至此,ThreadPoolExecutor中比较重要的函数分析完毕。
剩余部分的源码比较简单,一读便知。
以上是关于Executor Framework分析 ThreadPoolExecutor部分函数分析的主要内容,如果未能解决你的问题,请参考以下文章
Executor Framework分析 ScheduledThreadPoolExecutor
Executor Framework分析 ScheduledThreadPoolExecutor
Executor Framework分析 ThreadPoolExecutor部分函数分析
Executor Framework分析 ThreadPoolExecutor部分函数分析