线程池那些事之ScheduledThreadPoolExecutor

Posted Java后端笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池那些事之ScheduledThreadPoolExecutor相关的知识,希望对你有一定的参考价值。

前言

ScheduledThreadPoolExecutor在ThreadPoolExecutor的基础上增加了schedule方法,支持了延迟,固定间隔触发执行任务。与ThreadPoolExecutor的任务不同的是,这些任务可以重复执行。

源码解析

ScheduledThreadPoolExecutor在ThreadPoolExecutor的基础上做了一些修改

  1. 新增任务类型ScheduledFutureTask,继承于FutureTask

  2. workqueue的实现为延迟队列DelayedWorkQueue

  3. 增加了shutdown的一些配置参数,continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown

  4. 提供包装方法decorateTask让子类扩展自定义任务类型

ScheduledExecutorService实现

相对ThreadPoolExecutor,ScheduledThreadPoolExecutor额外实现了ScheduledExecutorService接口。 ScheduledExecutorService定义了这些定时任务的方法。

 
   
   
 
  1. //延迟任务

  2. //delay时间后执行任务

  3. public ScheduledFuture<?> schedule(Runnable command,

  4.                                       long delay, TimeUnit unit);

  5. //延迟任务

  6. //delay时间后执行任务

  7. public <V> ScheduledFuture<V> schedule(Callable<V> callable,

  8.                                           long delay, TimeUnit unit);

  9. //固定频率任务

  10. //第一次initialDelay后执行任务,之后按照固定period执行任务

  11. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,

  12.                                                  long initialDelay,

  13.                                                  long period,

  14.                                                  TimeUnit unit);

  15. //固定延迟任务

  16. //第一次initialDelay后执行任务,之后延迟period时间执行任务

  17. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,

  18.                                                     long initialDelay,

  19.                                                     long delay,

  20.                                                     TimeUnit unit);

调度任务分为两种,周期任务和延迟任务。周期任务又分为固定频率和固定延迟。

固定频率和固定延迟的区别在于,initialDelay为0的情况下,如果一个任务耗时2秒,固定5秒频率任务的时间为,0,5,10,15...,而固定5秒延迟任务为0,7,14,21...

shedule方法的逻辑基本一致,我们看下scheduleAtFixedRate实现

 
   
   
 
  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,

  2.                                                  long initialDelay,

  3.                                                  long period,

  4.                                                  TimeUnit unit) {

  5.        if (command == null || unit == null)

  6.            throw new NullPointerException();

  7.        if (period <= 0)

  8.            throw new IllegalArgumentException();

  9.        //创建ScheduledFutureTask

  10.        ScheduledFutureTask<Void> sft =

  11.            new ScheduledFutureTask<Void>(command,

  12.                                          null,

  13.                                          triggerTime(initialDelay, unit),

  14.                                          unit.toNanos(period));

  15.        //进行再次包装,用于子类扩展

  16.        RunnableScheduledFuture<Void> t = decorateTask(command, sft);

  17.        sft.outerTask = t;

  18.        //放入延迟队列

  19.        delayedExecute(t);

  20.        return t;

  21.    }

delayedExecute封装了向workqueue放任务的逻辑

 
   
   
 
  1. private void delayedExecute(RunnableScheduledFuture<?> task) {

  2.        //如果已经shutdown,拒绝任务

  3.        if (isShutdown())

  4.            reject(task);

  5.        else {

  6.            super.getQueue().add(task);

  7.            //再次check线程池状态

  8.            //如果已经shutdown那么取消该任务

  9.            if (isShutdown() &&

  10.                !canRunInCurrentRunState(task.isPeriodic()) &&

  11.                remove(task))

  12.                task.cancel(false);

  13.            else

  14.                //判断工作线程是否足够,不够增加

  15.                ensurePrestart();

  16.        }

  17.    }

相对ThreadPoolExecutor的submit方法,schdule新增了一种任务类型ScheduledFutureTask,用于执行调度任务,接下来我们看下ScheduledFutureTask的实现

ScheduledFutureTask

ScheduledFutureTask相对FutureTask增加了Delayed接口,用于对任务进行排序。以及RunnableScheduledFuture中的isPeriodic方法,判断当前任务是否是周期性的。

新增属性

在ScheduledFutureTask新增了几个属性。

属性 作用
sequenceNumber 用于任务排序
time 任务执行的绝对时间
period 大于0代表固定频率周期,小于0代表固定延迟周期,0代表不是周期任务
heapIndex 用于快速定位在DelayedWorkQueue的位置,方便删除

run方法重载

ScheduledFutureTask的调度逻辑主要在run方法,增加了reset的逻辑,让周期性任务能够重复执行。

 
   
   
 
  1. public void run() {

  2.            boolean periodic = isPeriodic();

  3.            //当前线程池状态能否执行任务

  4.            if (!canRunInCurrentRunState(periodic))

  5.                cancel(false);

  6.            //如果是普通任务,执行用父类run方法执行

  7.            else if (!periodic)

  8.                ScheduledFutureTask.super.run();

  9.            //如果是调度任务,执行完,重新设置初始状态

  10.            else if (ScheduledFutureTask.super.runAndReset()) {

  11.                //设置下次生效的时间

  12.                setNextRunTime();

  13.                //把任务重新放到workQueue中去

  14.                reExecutePeriodic(outerTask);

  15.            }

  16.        }

runAndReset直接使用FutureTask的实现,和run方法的区别是,在执行完任务后,不会修改任务的状态(不调用set方法设置结果),还是为NEW。

 
   
   
 
  1. protected boolean runAndReset() {

  2.        if (state != NEW ||

  3.            !UNSAFE.compareAndSwapObject(this, runnerOffset,

  4.                                         null, Thread.currentThread()))

  5.            return false;

  6.        boolean ran = false;

  7.        int s = state;

  8.        try {

  9.            Callable<V> c = callable;

  10.            if (c != null && s == NEW) {

  11.                try {

  12.                    c.call(); // don't set result

  13.                    ran = true;

  14.                } catch (Throwable ex) {

  15.                    //在出异常的情况下,会被设置为EXCEPTION状态,那么这个任务就执行不了了

  16.                    setException(ex);

  17.                }

  18.            }

  19.        } finally {

  20.            // runner must be non-null until state is settled to

  21.            // prevent concurrent calls to run()

  22.            runner = null;

  23.            // state must be re-read after nulling runner to prevent

  24.            // leaked interrupts

  25.            s = state;

  26.            if (s >= INTERRUPTING)

  27.                handlePossibleCancellationInterrupt(s);

  28.        }

  29.        return ran && s == NEW;

  30.    }

setNextRunTime用于设置下次生效时间

 
   
   
 
  1. private void setNextRunTime() {

  2.            long p = period;

  3.            if (p > 0)

  4.                //固定频率,直接加period得到下次运行时间

  5.                time += p;

  6.            else

  7.                //固定延迟,在triggerTime用的是当前时间+p

  8.                time = triggerTime(-p);

  9.        }

  10. long triggerTime(long delay) {

  11.        //注意是当前时间,其他的位移逻辑就当是把负数变成正数把

  12.        return now() +

  13.            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));

  14.    }

reExecutePeriodic用于重新把任务放入队列

 
   
   
 
  1. void reExecutePeriodic(RunnableScheduledFuture<?> task) {

  2.        if (canRunInCurrentRunState(true)) {

  3.            super.getQueue().add(task);

  4.            //如果线程池当前状态不能执行任务,那么取消该任务

  5.            if (!canRunInCurrentRunState(true) && remove(task))

  6.                task.cancel(false);

  7.            else

  8.                ensurePrestart();

  9.        }

  10.    }

cancel方法重载

下面来看下cancel方法

 
   
   
 
  1. public boolean cancel(boolean mayInterruptIfRunning) {

  2.            boolean cancelled = super.cancel(mayInterruptIfRunning);

  3.            //取消成功后,主动从workqueue删除该任务

  4.            if (cancelled && removeOnCancel && heapIndex >= 0)

  5.                remove(this);

  6.            return cancelled;

  7.        }

这边一个优化点是,会主动删除任务,一般任务被取消了,工作线程还是会去获取的,但是不实际执行该任务。

Comparable接口实现

ScheduledFutureTask同时实现了Comparable 接口,用于在workQueue排序的时候通过延迟时间来排序

 
   
   
 
  1. public int compareTo(Delayed other) {

  2.            if (other == this) // compare zero if same object

  3.                return 0;

  4.            if (other instanceof ScheduledFutureTask) {

  5.                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;

  6.                //根据绝对时间比较

  7.                long diff = time - x.time;

  8.                if (diff < 0)

  9.                    return -1;

  10.                else if (diff > 0)

  11.                    return 1;

  12.                //时间相同,使用任务号码比较,先加入的任务号码小

  13.                else if (sequenceNumber < x.sequenceNumber)

  14.                    return -1;

  15.                else

  16.                    return 1;

  17.            }

  18.            //使用getDelay接口实现比较

  19.            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);

  20.            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;

  21.        }

下面来介绍延迟队列的实现,为什么用延迟队列,延迟队列按照执行时间顺序排,我们只需要关注最前面的任务,获取有效任务更快。

延迟队列DelayedWorkQueue

DelayedWorkQueue内部使用了小根堆算法,保证时间最小也就是最近执行的任务放在队头。并且只有任务的生效时间到了,才能被工作线程获取到,不然一直阻塞在那边。

DelayedWorkQueue内部通过一个数组存储ScheduledFutureTask

 
   
   
 
  1. private static final int INITIAL_CAPACITY = 16;

  2.        private RunnableScheduledFuture<?>[] queue =

  3.            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

这个数组是一个小根堆,根节点总是延迟最小的任务。

如何构造这个小根堆暂时不讲,但是它的算法还是挺有意思的。 我们关注它的take方法,因为DelayedWorkQueue是无界的,因此ScheduledThreadPoolExecutor内只有核心线程,所以在getTask的时候使用take获取任务

 
   
   
 
  1. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

  2. ...

  3. Runnable r = timed ?

  4.                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

  5.                    workQueue.take();

看下take方法

 
   
   
 
  1. public RunnableScheduledFuture<?> take() throws InterruptedException {

  2.            final ReentrantLock lock = this.lock;

  3.            lock.lockInterruptibly();

  4.            try {

  5.                for (;;) {

  6.                    RunnableScheduledFuture<?> first = queue[0];

  7.                    if (first == null)

  8.                        //如果workQueue第一个任务为空,直接await

  9.                        available.await();

  10.                    else {

  11.                        long delay = first.getDelay(NANOSECONDS);

  12.                        if (delay <= 0)

  13.                            //如果第一个任务可以获取,那么直接获取

  14.                            return finishPoll(first);

  15.                        first = null; // don't retain ref while waiting

  16.                        //在第一个元素不满足获取条件的情况下

  17.                        //只能有一个线程能获取该元素,这个线程保存在leader中

  18.                        if (leader != null)

  19.                            //如果leader已经被别的线程占有,那么await

  20.                            available.await();

  21.                        else {

  22.                            //leader为空,那么占有它

  23.                            Thread thisThread = Thread.currentThread();

  24.                            leader = thisThread;

  25.                            try {

  26.                                //等待直到这个任务有效

  27.                                available.awaitNanos(delay);

  28.                            } finally {

  29.                                //放弃leader锁

  30.                                if (leader == thisThread)

  31.                                    leader = null;

  32.                            }

  33.                        }

  34.                    }

  35.                }

  36.            } finally {

  37.                //通知其他等待的线程

  38.                if (leader == null && queue[0] != null)

  39.                    available.signal();

  40.                lock.unlock();

  41.            }

  42.        }

take方法的大致逻辑是 如果workQueue为空,那么直接等待 如果不为空,并且第一个任务有效,那么直接获取 如果第一个任务延迟时间还没到,第一个到达的线程通过leader锁先锁定这个任务,消费这个任务后,使用signal唤醒其他等待的工作线程来获取其他任务。 做这个优化,我觉得是为了防止过多线程竞争任务,浪费CPU资源。因为一个任务最终只能被一个线程消费。

onShutdown钩子

ScheduledThreadPoolExecutor中重载了onShutdown的钩子方法,会在shutdown方法内被调用。

 
   
   
 
  1. Override void onShutdown() {

  2.        BlockingQueue<Runnable> q = super.getQueue();

  3.        boolean keepDelayed =

  4.            getExecuteExistingDelayedTasksAfterShutdownPolicy();

  5.        boolean keepPeriodic =

  6.            getContinueExistingPeriodicTasksAfterShutdownPolicy();

  7.        //如果keepDelayed和keepPeriodic都为false,cancel所有ScheduledFutureTask

  8.        if (!keepDelayed && !keepPeriodic) {

  9.            for (Object e : q.toArray())

  10.                if (e instanceof RunnableScheduledFuture<?>)

  11.                    ((RunnableScheduledFuture<?>) e).cancel(false);

  12.            q.clear();

  13.        }

  14.        else {

  15.            // Traverse snapshot to avoid iterator exceptions

  16.            for (Object e : q.toArray()) {

  17.                if (e instanceof RunnableScheduledFuture) {

  18.                    RunnableScheduledFuture<?> t =

  19.                        (RunnableScheduledFuture<?>)e;

  20.                    //根据配置以及任务类型对任务进行取消

  21.                    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||

  22.                        t.isCancelled()) { // also remove if already cancelled

  23.                        if (q.remove(t))

  24.                            t.cancel(false);

  25.                    }

  26.                }

  27.            }

  28.        }

  29.        tryTerminate();

  30.    }

重载这个方法用于cancel那些可以被cancel的ScheduledFutureTask任务。 continueExistingPeriodicTasksAfterShutdown和executeExistingDelayedTasksAfterShutdown用于控制shutdown后是否执行周期任务(固定延迟和固定频率)和延迟任务。哪一个配置为false,那种类型的任务会被cancel。

测试

延迟任务

 
   
   
 
  1. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

  2.        System.out.println(new Date());

  3.        scheduledExecutorService.schedule(new Runnable() {

  4.            @Override

  5.            public void run() {

  6.                System.out.println(new Date());

  7.            }

  8.        },2, TimeUnit.SECONDS);

延迟任务只会执行一次

周期定时延迟任务

 
   
   
 
  1. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

  2.        System.out.println(new Date());

  3.        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

  4.            @Override

  5.            public void run() {

  6.                System.out.println(new Date());

  7.                try {

  8.                    Thread.sleep(2000L);

  9.                } catch (InterruptedException e) {

  10.                    e.printStackTrace();

  11.                }

  12.            }

  13.        },2,5, TimeUnit.SECONDS);

周期定时频率

 
   
   
 
  1. ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

  2.        System.out.println(new Date());

  3. scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

  4.            @Override

  5.            public void run() {

  6.                System.out.println(new Date());

  7.                try {

  8.                    //这边把2000修改为6000后,其实频率会变为6s,任务会继续放到延迟队列,时间生成是对的,但是放的时间比原来时间拖了1s

  9.                    Thread.sleep(2000L);

  10.                } catch (InterruptedException e) {

  11.                    e.printStackTrace();

  12.                }

  13.            }

  14.        },2,5, TimeUnit.SECONDS);

总结

对调度线程池更加熟悉了,这种对子类的设计开放扩展点的方式很任性化。 


以上是关于线程池那些事之ScheduledThreadPoolExecutor的主要内容,如果未能解决你的问题,请参考以下文章

线程池那些事之ScheduledThreadPoolExecutor

线程池那些事之Future

ScheduledExecutorService 定时任务运行原理

学习Python那些事之数据库

webSocket那些事之socket.io

C++那些事之高效率开发C++/C