线程池那些事之ScheduledThreadPoolExecutor
Posted Java后端笔记
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池那些事之ScheduledThreadPoolExecutor相关的知识,希望对你有一定的参考价值。
前言
ScheduledThreadPoolExecutor在ThreadPoolExecutor的基础上增加了schedule方法,支持了延迟,固定间隔触发执行任务。与ThreadPoolExecutor的任务不同的是,这些任务可以重复执行。
源码解析
ScheduledThreadPoolExecutor在ThreadPoolExecutor的基础上做了一些修改
新增任务类型ScheduledFutureTask,继承于FutureTask
workqueue的实现为延迟队列DelayedWorkQueue
增加了shutdown的一些配置参数,continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown
提供包装方法decorateTask让子类扩展自定义任务类型
ScheduledExecutorService实现
相对ThreadPoolExecutor,ScheduledThreadPoolExecutor额外实现了ScheduledExecutorService接口。 ScheduledExecutorService定义了这些定时任务的方法。
//延迟任务
//delay时间后执行任务
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//延迟任务
//delay时间后执行任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
//固定频率任务
//第一次initialDelay后执行任务,之后按照固定period执行任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//固定延迟任务
//第一次initialDelay后执行任务,之后延迟period时间执行任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
调度任务分为两种,周期任务和延迟任务。周期任务又分为固定频率和固定延迟。
固定频率和固定延迟的区别在于,initialDelay为0的情况下,如果一个任务耗时2秒,固定5秒频率任务的时间为,0,5,10,15...,而固定5秒延迟任务为0,7,14,21...
shedule方法的逻辑基本一致,我们看下scheduleAtFixedRate实现
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//创建ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
//进行再次包装,用于子类扩展
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//放入延迟队列
delayedExecute(t);
return t;
}
delayedExecute封装了向workqueue放任务的逻辑
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果已经shutdown,拒绝任务
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
//再次check线程池状态
//如果已经shutdown那么取消该任务
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//判断工作线程是否足够,不够增加
ensurePrestart();
}
}
相对ThreadPoolExecutor的submit方法,schdule新增了一种任务类型ScheduledFutureTask,用于执行调度任务,接下来我们看下ScheduledFutureTask的实现
ScheduledFutureTask
ScheduledFutureTask相对FutureTask增加了Delayed接口,用于对任务进行排序。以及RunnableScheduledFuture中的isPeriodic方法,判断当前任务是否是周期性的。
新增属性
在ScheduledFutureTask新增了几个属性。
属性 | 作用 |
---|---|
sequenceNumber | 用于任务排序 |
time | 任务执行的绝对时间 |
period | 大于0代表固定频率周期,小于0代表固定延迟周期,0代表不是周期任务 |
heapIndex | 用于快速定位在DelayedWorkQueue的位置,方便删除 |
run方法重载
ScheduledFutureTask的调度逻辑主要在run方法,增加了reset的逻辑,让周期性任务能够重复执行。
public void run() {
boolean periodic = isPeriodic();
//当前线程池状态能否执行任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果是普通任务,执行用父类run方法执行
else if (!periodic)
ScheduledFutureTask.super.run();
//如果是调度任务,执行完,重新设置初始状态
else if (ScheduledFutureTask.super.runAndReset()) {
//设置下次生效的时间
setNextRunTime();
//把任务重新放到workQueue中去
reExecutePeriodic(outerTask);
}
}
runAndReset直接使用FutureTask的实现,和run方法的区别是,在执行完任务后,不会修改任务的状态(不调用set方法设置结果),还是为NEW。
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
//在出异常的情况下,会被设置为EXCEPTION状态,那么这个任务就执行不了了
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
setNextRunTime用于设置下次生效时间
private void setNextRunTime() {
long p = period;
if (p > 0)
//固定频率,直接加period得到下次运行时间
time += p;
else
//固定延迟,在triggerTime用的是当前时间+p
time = triggerTime(-p);
}
long triggerTime(long delay) {
//注意是当前时间,其他的位移逻辑就当是把负数变成正数把
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
reExecutePeriodic用于重新把任务放入队列
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
//如果线程池当前状态不能执行任务,那么取消该任务
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
cancel方法重载
下面来看下cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
//取消成功后,主动从workqueue删除该任务
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
这边一个优化点是,会主动删除任务,一般任务被取消了,工作线程还是会去获取的,但是不实际执行该任务。
Comparable接口实现
ScheduledFutureTask同时实现了Comparable
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
//根据绝对时间比较
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
//时间相同,使用任务号码比较,先加入的任务号码小
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
//使用getDelay接口实现比较
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
下面来介绍延迟队列的实现,为什么用延迟队列,延迟队列按照执行时间顺序排,我们只需要关注最前面的任务,获取有效任务更快。
延迟队列DelayedWorkQueue
DelayedWorkQueue内部使用了小根堆算法,保证时间最小也就是最近执行的任务放在队头。并且只有任务的生效时间到了,才能被工作线程获取到,不然一直阻塞在那边。
DelayedWorkQueue内部通过一个数组存储ScheduledFutureTask
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
这个数组是一个小根堆,根节点总是延迟最小的任务。
如何构造这个小根堆暂时不讲,但是它的算法还是挺有意思的。 我们关注它的take方法,因为DelayedWorkQueue是无界的,因此ScheduledThreadPoolExecutor内只有核心线程,所以在getTask的时候使用take获取任务
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
...
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
看下take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
//如果workQueue第一个任务为空,直接await
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//如果第一个任务可以获取,那么直接获取
return finishPoll(first);
first = null; // don't retain ref while waiting
//在第一个元素不满足获取条件的情况下
//只能有一个线程能获取该元素,这个线程保存在leader中
if (leader != null)
//如果leader已经被别的线程占有,那么await
available.await();
else {
//leader为空,那么占有它
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//等待直到这个任务有效
available.awaitNanos(delay);
} finally {
//放弃leader锁
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//通知其他等待的线程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
take方法的大致逻辑是 如果workQueue为空,那么直接等待 如果不为空,并且第一个任务有效,那么直接获取 如果第一个任务延迟时间还没到,第一个到达的线程通过leader锁先锁定这个任务,消费这个任务后,使用signal唤醒其他等待的工作线程来获取其他任务。 做这个优化,我觉得是为了防止过多线程竞争任务,浪费CPU资源。因为一个任务最终只能被一个线程消费。
onShutdown钩子
ScheduledThreadPoolExecutor中重载了onShutdown的钩子方法,会在shutdown方法内被调用。
Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
//如果keepDelayed和keepPeriodic都为false,cancel所有ScheduledFutureTask
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
//根据配置以及任务类型对任务进行取消
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();
}
重载这个方法用于cancel那些可以被cancel的ScheduledFutureTask任务。 continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown用于控制shutdown后是否执行周期任务(固定延迟和固定频率)和延迟任务。哪一个配置为false,那种类型的任务会被cancel。
测试
延迟任务
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
System.out.println(new Date());
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(new Date());
}
},2, TimeUnit.SECONDS);
延迟任务只会执行一次
周期定时延迟任务
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
System.out.println(new Date());
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println(new Date());
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},2,5, TimeUnit.SECONDS);
周期定时频率
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
System.out.println(new Date());
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(new Date());
try {
//这边把2000修改为6000后,其实频率会变为6s,任务会继续放到延迟队列,时间生成是对的,但是放的时间比原来时间拖了1s
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},2,5, TimeUnit.SECONDS);
总结
对调度线程池更加熟悉了,这种对子类的设计开放扩展点的方式很任性化。
以上是关于线程池那些事之ScheduledThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章
线程池那些事之ScheduledThreadPoolExecutor