Java并发编程(十九):ScheduledThreadPoolExecutor总结与源码分析
Posted 黄智霖-blog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程(十九):ScheduledThreadPoolExecutor总结与源码分析相关的知识,希望对你有一定的参考价值。
目录
前言
ScheduledThreadPoolExecutor主要用于处理延时任务或者定时任务,在平时的工作场景中也有被广泛应用,而我们有了前文关于ThreadPoolExecutor源码的深入理解,这里理解ScheduledThreadPoolExecutor便会顺畅不少。
使用
ScheduledThreadPoolExecutor的使用还是比较简单,总体来说主要有三种方式提交任务:
- schedule:延时执行一个任务
schedule(runnable, 3, TimeUnit.SECONDS);
表示延时3秒执行runnable任务, 同时schedule还支持提交一个Callable任务,通过返回的ScheduledFuture可以在后续的流程中获取任务的执行结果。
- scheduleAtFixedRate:相对于schedule方法,多了一个period参数,表示每隔多久执行一次任务,可以使用它实现周期调度任务的目的。
scheduleAtFixedRate(runnable, 1, 2, TimeUnit.SECONDS);
表示延时1秒开始执行runnable,并且并且之后每隔2秒执行一次runnable。要注意,如果任务的执行时间超过了间隔时间(period),那么在任务执行完成后会立即执行下一次runnable,也就是说严格按照period间隔时间计算,只是到了period指定的间隔,由于任务还在执行,所以等待任务执行完成后会立即调度下一次执行。在一个任务执行完成后才会插入下一次需要执行的任务,所以不会存在任务堆积的情况。
- scheduleWithFixedDelay:用法和scheduleAtFixedRate类似,相对于schedule方法多了一个delay参数,也用于提交周期任务。和scheduleAtFixedRate的区别是,如果任务执行时间超过了delay参数,那么任务执行完后还是会再等待delay指定的时间才运行下一次任务,也就是说delay从任务结束的时间开始算。
scheduleWithFixedDelay(runnable, 3, 2, TimeUnit.SECONDS);
注:同时也支持execute和submit方法提交任务,实现就是通过schedule提交一个延时为0的任务。
源码分析
从类继承关系图中可以看出,ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以它具备ThreadPoolExecutor的能力,同时实现了ScheduledExecutorService接口,该接口定义了4个方法:
这几个接口使得ScheduledThreadPoolExecutor具备了延时执行和定时调度的能力。
首先来看构造方法,从源码看来,ScheduledThreadPoolExecutor提供了四个构造方法:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
构造器主要就是三个参数:
- corePoolSize:线程池大小
- threadFactory:线程工厂
- handler:拒绝策略
ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,但是可以从构造函数中看到,指定的maximumPoolSize都是Integer.MAX_VALUE,并且阻塞队列使用的是DelayedWorkQueue,keepAliveTime设置为0。从这些参数设置中我们就能发现,对于ScheduledThreadPoolExecutor来说,没有了核心线程池的概念,因为队列DelayedWorkQueue是一个无界队列,所以线程数不可能超过corePoolSize的大小。
schedule实现
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
//计算任务的触发时间,然后将任务command包装为一个ScheduledFutureTask,然后调用decorateTask方法进行进一步包装
//decorateTask方法在这里没有做任何包装操作,如果需要可以继承然后自己实现
RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
//提交任务
delayedExecute(t);
return t;
}
调用方法很简单,首先会根据delay和unit计算任务的触发时间,计算逻辑就是当前时间加上延时时间,这个代码这里就不贴了。然后将触发时间和任务包装成一个ScheduledFutureTask,再然后会调用decorateTask进行进一步包装,包装成一个RunnableScheduledFuture,这个decorateTask方法在ScheduledThreadPoolExecutor中没有做什么其它操作,直接返回的是上一步创建的ScheduledFutureTask,子类可以重写该方法对任务进行进一步的包装。
注:ScheduledFutureTask是ScheduledThreadPoolExecutor的子类
从ScheduledFutureTask的继承关系中可以看出,它实现了Comparable接口,具备比较大小的功能;实现了Future接口,具备获取执行结果的功能;实现了Runnable,可以作为任务被运行。
但是Runnable是不具备返回值的,这里怎么返回的ScheduledFutureTask呢?可以来看看ScheduledFutureTask关于Runnable的构造函数:
ScheduledFutureTask(Runnable r, V result, long ns) {
//result表示任务的返回值
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
该构造函数需要一个任务返回值,在前面schedule(runnable)的方法中传入的是null,表示一个空的返回,然后到父类FutureTask中看看:
public FutureTask(Runnable runnable, V result) {
//将runnable包装横一个Callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
在父类FutureTask中,Runnable被转换成了Callable,这是通过Executors.callable实现的:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
//适配器
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
这里运用了适配器模式,将一个Runnable和指定的返回值适配成了一个Callable。所以说通过schedule方法提交一个Runnable任务,也会通过适配器模式将其包装成一个Callable,最终生成一个ScheduledFutureTask。相对来说,如果schedule方法提交的是一个Callable任务,就不用适配了,这个代码就不贴了。
现在对schedule方法做一个简单总结:提交的任务如果是一个Runnable,那么会通过适配器模式将其转换成一个返回值为null的Callable,如果任务本身就是一个Callable则不用包装,最终被包装成一个ScheduledFutureTask,这里还提供了一个decorateTask方法提供给用户实现,以支持对这个任务进行进一步包装,包装完成后通过delayedExecute方法提交任务,然后返回ScheduledFuture。
那么接下来我们进入delayedExecute方法看看任务是如何提交的:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
//如果线程池不是RUNNING状态,那么执行拒绝策略
reject(task);
else {
//将任务添加到队列中
super.getQueue().add(task);
//再次检查线程池状态和参数配置,可能需要取消该任务
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
//退出任务(状态修改为CANCELLED),不中断线程
task.cancel(false);
else
//确定有线程去执行任务,可能需要新建一个工作线程
ensurePrestart();
}
}
如果线程池不是RUNNING状态,那么会执行拒绝策略,否则会将任务直接放到阻塞队列中,这里使用的队列写死在ScheduledThreadPoolExecutor的构造函数中,是DelayedWorkQueue,这是一个延时队列 。任务放到队列中之后会再次检查线程池状态,如果线程池被SHUTDOWN了,那么就表示现在的情况是先提交了任务,再关闭线程池,针对这种情况,对于周期任务和普通延时任务有两个参数来控制行为,分别是:
- continueExistingPeriodicTasksAfterShutdown:针对周期任务,true表示不会取消任务,false表示会取消任务
- executeExistingDelayedTasksAfterShutdown:针对非周期任务,true表示不会取消任务,false表示会取消任务
这两个参数都有对应的set方法可以更改,而对于这个逻辑控制主要是**canRunInCurrentRunState(task.isPeriodic())**方法调用实现的:
boolean canRunInCurrentRunState(boolean periodic) {
//periodic表示是否为周期任务
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
//如果状态RUNNING或者是SHUTDOWN并且传入的shutdownOK为true
//那么这里返回true,那么canRunInCurrentRunState返回true,取反就是false,就不会移除并且停止任务
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
当然,正常情况下的逻辑不会来到这个分支,而是直接进入ensurePrestart方法:
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
//如果workerCount小于corePoolSize,那么判断创建一个工作线程
addWorker(null, true);
else if (wc == 0)
//到这里说明wc>=corePoolSize,而wc==0,但是corePoolSize不能为-1
//说明corePoolSize==0
addWorker(null, false);
//如果workerCount>=corePoolSize,并且corePoolSize不为0,那么什么也不做
}
ensurePrestart方法主要做的事儿就是创建工作线程,ScheduledThreadPoolExecutor和ThreadPoolExecutor不一样,不会在提交任务的时候顺带就创建线程,而是这直接把任务添加到队列中。同时,如果corePoolSize配置的是0,就意味着ScheduledThreadPoolExecutor中的工作线程数量不会超过1个,也就是说corePoolSize设置为1和0效果是一样的,区别就是addWorker的方式不一样。
到这里,ScheduledThreadPoolExecutor中schedule方法的逻辑已经分析完了~ 剩余的就是ThreadPoolExecutor的逻辑:工作线程不断的从任务队列中获取任务来执行,包括runWorker、getTask等方法的调用逻辑都是一样的,具体可以参考ThreadPoolExecutor总结与源码深度分析。但是ScheduledThreadPoolExecutor最核心的内容我们还没有提到,那就是在构造函数中写死的阻塞队列:DelayedWorkQueue,先来看看它的继承关系:
DelayedWorkQueue
DelayedWorkQueue实现的是一个优先队列,这里底层则是通过二叉堆实现的,二叉堆本质上是一个完全二叉树或近似完全二叉树,分为最大堆和最小堆两种方式,通常由数组作为基础数据结构进行实现。
- 最大堆:父节点的总是大于或等于子节点的值,每一个子树都是一个最大堆
- 最小堆:父节点的值总是小于或等于子节点的值,每一个子树都是一个最小堆
DelayedWorkQueue中存储的是RunnableScheduledFuture,实际上是ScheduledFutureTask类型,在创建ScheduledFutureTask的时候会设置任务触发的时间保存到time属性中,如果是周期任务,那么周期时间会存储在period属性中。
通过最小堆来实现DelayedWorkQueue,排序主要依赖time字段(time相同使用sequenceNumber),就能保证堆顶一直是最近需要执行的任务。线程池使用阻塞队列主要就是调用定义在阻塞队列BlockingQueue中的三个方法:
- add:将任务添加到队列中,实际调用offer方法
- take:从队列中获取并移除队头元素,如果队列为空,会阻塞直到队列加入新元素
- poll:和take一样,只是在阻塞的时候会有一个超时时间,如果达到超时时间还没有获取到元素,那么会返回,可能抛出InterruptedException
- peek:从队列获取队头元素,但是不移除
注:最小堆的插入只需要在数组末尾插入元素,然后比较其和父节点大小,如果小于父节点则交换,然后继续向上调整,直到调整到根节点(自底向上调整);而堆顶元素移除操作(最小堆就是移除最小元素,最大堆就是移除最大元素)则是直接用数组最后一个元素代替被移除的元素,然后自顶向下调整,对于最小堆而言就是和两个子节点的最小值比较,如果比这个值大则交换,直到调整到尾节点。二叉堆相对来说比较简单,更多信息这里就不赘述了~
现在我们知道了DelayedWorkQueue是由最小堆实现的,那么又是如何实现任务延时执行的呢?在ScheduledThreadPoolExecutor中,一个工作线程被创建后会调用父类ThreadPoolExecutor的getTask方法获取任务,而getTask方法会根据是否需要超时获取任务来调用阻塞队列的take方法或poll方法,这里就来看看DelayedWorkQueue的take方法实现:
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//自旋
for (;;) {
//获取队头元素
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
//如果为空表示队列为空,那么线程进入条件队列阻塞
available.await();
else {
//队头元素不为空,获取其距可执行还剩余的时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//如果delay<=0,说明任务可以执行了
//完成任务获取,返回任务
return finishPoll(first);
//否则表示队头任务还没有到该执行的时候,而队头元素一定是最近需要执行的任务
//也就表明队列中没有一个任务需要现在执行
first = null; // don't retain ref while waiting
if (leader != null)
//leader是一个线程,如果不为空,那么进入条件队列阻塞,一直等待
available.await();
else {
//将当前线程设置为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//然后进入条件队列阻塞,但是超时时间是队头任务还需剩余的触发时间
//超时之后再次循环到上面去获取队头元素,判断是否需要触发,理论上这里超时了就代表队头元素
//到了该执行的时间
available.awaitNanos(delay);
} finally {
//这里leader线程可能发生了变更
if (leader == thisThread)
//又将leader置空
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
take()方法的逻辑很简单,就是从最小堆中获取堆顶元素,这个元素对应的任务一定是任务列表中需要最先执行的(因为最小堆根据任务触发时间排序),获取到之后判断当前是否到了该任务需要触发的时间,如果达到了则返回该任务,返回的时候也会移除该节点,然后最小堆自动调整,堆顶元素又成了剩余任务列表中需要最先执行的任务;而如果没有到达执行时间,则需要阻塞等待,在阻塞被唤醒后又会进入循环再次获取堆顶元素,判断触发时间。如果任务达到了触发时间,那么通过finishPoll方法返回任务。
在finishPoll方法中,就是将任务数量(size属性)减1,然后重新调整堆,使剩余任务中需要最先触发的任务排在堆顶位置:
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
//调整堆
siftDown(0, x);
setIndex(f, -1);
return f;
}
任务返回后会调用其run方法运行任务,这个是ThreadPoolExecutor的逻辑,任务是ScheduledFutureTask类型:
其间接实现了Runnable接口,继承自FutureTask,在ScheduledFutureTask中找到run方法实现:
public void run() {
//判断是否为周期任务
boolean periodic = isPeriodic();
//判断在当前线程池状态下是否能够执行该任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//不是周期任务,调用父类FutureTask的run方法
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//如果是周期任务,调用父类的runAndReset方法
//设置任务下次运行时间
setNextRunTime();
//outerTask任务重新入队,outerTask在提交周期任务的时候会设置
reExecutePeriodic(outerTask);
}
}
从这个run方法中我们能够看出来,任务执行的时候会再次判断线程池状态,根据是否为周期任务和具体的参数来决定是否能够执行任务。如果任务不是周期任务,那么调用父类的run方法执行,否则调用父类的runAndReset方法。runAndReset和run方法的主要区别是runAndReset不会保存任务执行结果,并且会将任务状态重置为NEW(关于FutureTask在后面的文章中再细说~)。而如果是周期任务,在任务执行完成并且重置任务状态后,会计算任务的下次运行时间,然后将任务重新入队,等待下次执行(下文细说)。
但是take方法中出现的leader线程是什么意思呢?这里就涉及到一个Leader-Follower模式。
Leader-Follower线程模型
在这个模型中,线程分为Leader和Follwer角色,还有一个processing状态。在线程池中只有一个Leader线程负责等待接收请求(事件),其它的都是Follwer线程,Leader接收请求后会从Follower中选拔一个新的Leader线程等待下一个请求,而自己转入processing状态,处理完成后又成为Follower,这种模式在基于事件分发的线程池场景下减少了线程上下文切换。
注:ReentrantLock创建的Condition在await的时候会调用fullRelease方法释放互斥锁(state),所以即使take方法通过lockInterruptibly施加了互斥锁,在线程await进入条件队列阻塞后,其它线程也能获取互斥锁,从而进入for循环
在DelayedWorkQueue的场景中,从队列中获取一个任务后,如果任务还没有到触发时间,那么该任务就不能被执行,这可以通过阻塞线程实现:只需要将获取到任务的线程阻塞距离任务运行的剩余时间,线程醒来后然后再在循环中重新从队列获取任务,再次判断触发时间即可。前面也提到了,DelayedWorkQueue底层是通过数组实现的一个最小二叉堆,那么队头的任务(queue[0])一定是需要最先执行的任务,假设线程1获取到互斥锁进入for循环,从队头获取任务,发现任务运行还需要t秒,那么进入条件队列进行超时(t秒)阻塞,同时释放state;接着线程2获取到互斥锁进入for循环,从队头获取任务,发现任务运行还需要tt秒,那么也需要进入条件队列进行阻塞,但是线程2的这个阻塞就不用使用超时(tt秒)阻塞了;如果还有线程3,也同理。
leader线程的引入就是为了在这种情况下减少不必要的超时阻塞,借鉴的就是Leader-Follower线程模型,对于队头任务,只有一个leader线程负责对其进行超时阻塞等待,其它线程都进行"永久"等待,leader线程从队列中取走任务之后(queue[0])则唤醒一个follower线程成为新的leader。
到现在我们已经了解到了队列DelayedWorkQueue中的任务是如何被消费的,那么回过头来看看delayedExecute方法中任务入队的逻辑,也就是DelayedWorkQueue的add方法:
public boolean add(Runnable e) {
return offer(e);
}
间接调用的offer方法:
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
//元素数量达到了队列长度(默认为16),则需要扩容
grow();
//任务数量加1,任务出队的时候减一
size = i + 1;
if (i == 0) {
//直接设置队头元素
queue[0] = e;
setIndex(e, 0);
} else {
//任务入队(最小二叉堆),并且自动调增堆,使触发时间最小的排在堆顶位置
siftUp(i, e);
}
if (queue[0] == e) {
//如果天添加的任务位于堆顶,那么说明队列之前为空,或者任务入队,堆调整
//后排在堆顶位置
//那么置空leader,唤醒一个线程,被唤醒的线程在下一次for循环中可以重新设置leader线程
//有可能发生leader线程的变更
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
/*队列扩容*/
private void grow() {
int oldCapacity = queue.length;
//增加50%的容量
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity < 0)
//溢出,使用int最大值
newCapacity = Integer.MAX_VALUE;
//直接拷贝数组
queue = Arrays.copyOf(queue, newCapacity);
}
offer入队的逻辑比较清晰,先判断元素是否达到了队列的长度,如果达到了则需要扩容50%的容量,代码中是通过oldCapacity + (oldCapacity >> 1)实现的,表示的含义就是oldCapacity+oldCapacity/2==oldCapacity*1.5,也就是扩容50%。接着如果任务入队后排在堆顶位置,那么就可能有两种情况:
- 之前堆为空,新入队任务作为第一个任务排在堆顶位置
- 之前堆不为空,但是任务入队后,经过调整被移动到了堆顶位置,也就是新加入的任务需要最先触发
对于第二种情况,将leader置null,然后唤醒一个阻塞的线程,阻塞线程被唤醒后再重新设置一个leader线程。到这里我们对schedule方法做一个总结: 以上是关于Java并发编程(十九):ScheduledThreadPoolExecutor总结与源码分析的主要内容,如果未能解决你的问题,请参考以下文章 转:Java并发编程之十九:并发新特性—Executor框架与线程池(含代码) Java并发编程(十九):ScheduledThreadPoolExecutor总结与源码分析 Java并发编程(十九):ScheduledThreadPoolExecutor总结与源码分析
schedule方法返回的是一个ScheduledFuture,如果方法传入的是一个Callable,那么将其和触发时间封装成一个ScheduledFutureTask添加到队列中;如果传入的是一个Runnable,那么会先通过适配器将其封装成一个Callbale。在ScheduledFutureTask入队之前还提供了一个空方法decorateTask供子类去重写,以实现对ScheduledFutureTask进行进一步的装饰。ScheduledThreadPoolExecutor存储任务使用的队列是DelayedWorkQueue,其本质上是用数组实现的一个最小