Java并发编程(十九):ScheduledThreadPoolExecutor总结与源码分析

Posted 黄智霖-blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程(十九):ScheduledThreadPoolExecutor总结与源码分析相关的知识,希望对你有一定的参考价值。

前言

  ScheduledThreadPoolExecutor主要用于处理延时任务或者定时任务,在平时的工作场景中也有被广泛应用,而我们有了前文关于ThreadPoolExecutor源码的深入理解,这里理解ScheduledThreadPoolExecutor便会顺畅不少。

使用

  ScheduledThreadPoolExecutor的使用还是比较简单,总体来说主要有三种方式提交任务:

  • schedule:延时执行一个任务
schedule(runnable, 3, TimeUnit.SECONDS);

  表示延时3秒执行runnable任务, 同时schedule还支持提交一个Callable任务,通过返回的ScheduledFuture可以在后续的流程中获取任务的执行结果。

  • scheduleAtFixedRate:相对于schedule方法,多了一个period参数,表示每隔多久执行一次任务,可以使用它实现周期调度任务的目的。
scheduleAtFixedRate(runnable, 1, 2, TimeUnit.SECONDS);

  表示延时1秒开始执行runnable,并且并且之后每隔2秒执行一次runnable。要注意,如果任务的执行时间超过了间隔时间(period),那么在任务执行完成后会立即执行下一次runnable,也就是说严格按照period间隔时间计算,只是到了period指定的间隔,由于任务还在执行,所以等待任务执行完成后会立即调度下一次执行。在一个任务执行完成后才会插入下一次需要执行的任务,所以不会存在任务堆积的情况。

  • scheduleWithFixedDelay:用法和scheduleAtFixedRate类似,相对于schedule方法多了一个delay参数,也用于提交周期任务。和scheduleAtFixedRate的区别是,如果任务执行时间超过了delay参数,那么任务执行完后还是会再等待delay指定的时间才运行下一次任务,也就是说delay从任务结束的时间开始算。
scheduleWithFixedDelay(runnable, 3, 2, TimeUnit.SECONDS);

注:同时也支持execute和submit方法提交任务,实现就是通过schedule提交一个延时为0的任务。

源码分析

在这里插入图片描述
  从类继承关系图中可以看出,ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以它具备ThreadPoolExecutor的能力,同时实现了ScheduledExecutorService接口,该接口定义了4个方法:
在这里插入图片描述
  这几个接口使得ScheduledThreadPoolExecutor具备了延时执行和定时调度的能力。
  首先来看构造方法,从源码看来,ScheduledThreadPoolExecutor提供了四个构造方法:

	 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

  构造器主要就是三个参数:

  • corePoolSize:线程池大小
  • threadFactory:线程工厂
  • handler:拒绝策略

  ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,但是可以从构造函数中看到,指定的maximumPoolSize都是Integer.MAX_VALUE,并且阻塞队列使用的是DelayedWorkQueue,keepAliveTime设置为0。从这些参数设置中我们就能发现,对于ScheduledThreadPoolExecutor来说,没有了核心线程池的概念,因为队列DelayedWorkQueue是一个无界队列,所以线程数不可能超过corePoolSize的大小。

schedule实现

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        //计算任务的触发时间,然后将任务command包装为一个ScheduledFutureTask,然后调用decorateTask方法进行进一步包装
        //decorateTask方法在这里没有做任何包装操作,如果需要可以继承然后自己实现
        RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
		//提交任务
        delayedExecute(t);
        return t;
    }

  调用方法很简单,首先会根据delay和unit计算任务的触发时间,计算逻辑就是当前时间加上延时时间,这个代码这里就不贴了。然后将触发时间和任务包装成一个ScheduledFutureTask,再然后会调用decorateTask进行进一步包装,包装成一个RunnableScheduledFuture,这个decorateTask方法在ScheduledThreadPoolExecutor中没有做什么其它操作,直接返回的是上一步创建的ScheduledFutureTask,子类可以重写该方法对任务进行进一步的包装。

注:ScheduledFutureTask是ScheduledThreadPoolExecutor的子类

在这里插入图片描述
  从ScheduledFutureTask的继承关系中可以看出,它实现了Comparable接口,具备比较大小的功能;实现了Future接口,具备获取执行结果的功能;实现了Runnable,可以作为任务被运行。
  但是Runnable是不具备返回值的,这里怎么返回的ScheduledFutureTask呢?可以来看看ScheduledFutureTask关于Runnable的构造函数:

	ScheduledFutureTask(Runnable r, V result, long ns) {
			//result表示任务的返回值
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

  该构造函数需要一个任务返回值,在前面schedule(runnable)的方法中传入的是null,表示一个空的返回,然后到父类FutureTask中看看:

	public FutureTask(Runnable runnable, V result) {
		//将runnable包装横一个Callable
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

  在父类FutureTask中,Runnable被转换成了Callable,这是通过Executors.callable实现的:

	public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    //适配器
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

  这里运用了适配器模式,将一个Runnable和指定的返回值适配成了一个Callable。所以说通过schedule方法提交一个Runnable任务,也会通过适配器模式将其包装成一个Callable,最终生成一个ScheduledFutureTask。相对来说,如果schedule方法提交的是一个Callable任务,就不用适配了,这个代码就不贴了。
  现在对schedule方法做一个简单总结:提交的任务如果是一个Runnable,那么会通过适配器模式将其转换成一个返回值为null的Callable,如果任务本身就是一个Callable则不用包装,最终被包装成一个ScheduledFutureTask,这里还提供了一个decorateTask方法提供给用户实现,以支持对这个任务进行进一步包装,包装完成后通过delayedExecute方法提交任务,然后返回ScheduledFuture
  那么接下来我们进入delayedExecute方法看看任务是如何提交的:

    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
        	//如果线程池不是RUNNING状态,那么执行拒绝策略
            reject(task);
        else {
        	//将任务添加到队列中
            super.getQueue().add(task);
            //再次检查线程池状态和参数配置,可能需要取消该任务
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                //退出任务(状态修改为CANCELLED),不中断线程
                task.cancel(false);
            else
            	//确定有线程去执行任务,可能需要新建一个工作线程
                ensurePrestart();
        }
    }

  如果线程池不是RUNNING状态,那么会执行拒绝策略,否则会将任务直接放到阻塞队列中,这里使用的队列写死在ScheduledThreadPoolExecutor的构造函数中,是DelayedWorkQueue,这是一个延时队列 。任务放到队列中之后会再次检查线程池状态,如果线程池被SHUTDOWN了,那么就表示现在的情况是先提交了任务,再关闭线程池,针对这种情况,对于周期任务和普通延时任务有两个参数来控制行为,分别是:

  • continueExistingPeriodicTasksAfterShutdown:针对周期任务,true表示不会取消任务,false表示会取消任务
  • executeExistingDelayedTasksAfterShutdown:针对非周期任务,true表示不会取消任务,false表示会取消任务

  这两个参数都有对应的set方法可以更改,而对于这个逻辑控制主要是**canRunInCurrentRunState(task.isPeriodic())**方法调用实现的:

	boolean canRunInCurrentRunState(boolean periodic) {
		//periodic表示是否为周期任务
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :
                                   executeExistingDelayedTasksAfterShutdown);
    }
    
    final boolean isRunningOrShutdown(boolean shutdownOK) {
        int rs = runStateOf(ctl.get());
        //如果状态RUNNING或者是SHUTDOWN并且传入的shutdownOK为true
        //那么这里返回true,那么canRunInCurrentRunState返回true,取反就是false,就不会移除并且停止任务
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }

  当然,正常情况下的逻辑不会来到这个分支,而是直接进入ensurePrestart方法:

	 void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
        	//如果workerCount小于corePoolSize,那么判断创建一个工作线程
            addWorker(null, true);
        else if (wc == 0)
        	//到这里说明wc>=corePoolSize,而wc==0,但是corePoolSize不能为-1
        	//说明corePoolSize==0
            addWorker(null, false);
        //如果workerCount>=corePoolSize,并且corePoolSize不为0,那么什么也不做
    }

  ensurePrestart方法主要做的事儿就是创建工作线程,ScheduledThreadPoolExecutor和ThreadPoolExecutor不一样,不会在提交任务的时候顺带就创建线程,而是这直接把任务添加到队列中。同时,如果corePoolSize配置的是0,就意味着ScheduledThreadPoolExecutor中的工作线程数量不会超过1个,也就是说corePoolSize设置为1和0效果是一样的,区别就是addWorker的方式不一样。
  到这里,ScheduledThreadPoolExecutor中schedule方法的逻辑已经分析完了~ 剩余的就是ThreadPoolExecutor的逻辑:工作线程不断的从任务队列中获取任务来执行,包括runWorker、getTask等方法的调用逻辑都是一样的,具体可以参考ThreadPoolExecutor总结与源码深度分析。但是ScheduledThreadPoolExecutor最核心的内容我们还没有提到,那就是在构造函数中写死的阻塞队列:DelayedWorkQueue,先来看看它的继承关系:
在这里插入图片描述

DelayedWorkQueue

  DelayedWorkQueue实现的是一个优先队列,这里底层则是通过二叉堆实现的,二叉堆本质上是一个完全二叉树或近似完全二叉树,分为最大堆和最小堆两种方式,通常由数组作为基础数据结构进行实现。

  • 最大堆:父节点的总是大于或等于子节点的值,每一个子树都是一个最大堆
  • 最小堆:父节点的值总是小于或等于子节点的值,每一个子树都是一个最小堆

  DelayedWorkQueue中存储的是RunnableScheduledFuture,实际上是ScheduledFutureTask类型,在创建ScheduledFutureTask的时候会设置任务触发的时间保存到time属性中,如果是周期任务,那么周期时间会存储在period属性中。
  通过最小堆来实现DelayedWorkQueue,排序主要依赖time字段(time相同使用sequenceNumber),就能保证堆顶一直是最近需要执行的任务。线程池使用阻塞队列主要就是调用定义在阻塞队列BlockingQueue中的三个方法:

  • add:将任务添加到队列中,实际调用offer方法
  • take:从队列中获取并移除队头元素,如果队列为空,会阻塞直到队列加入新元素
  • poll:和take一样,只是在阻塞的时候会有一个超时时间,如果达到超时时间还没有获取到元素,那么会返回,可能抛出InterruptedException
  • peek:从队列获取队头元素,但是不移除

注:最小堆的插入只需要在数组末尾插入元素,然后比较其和父节点大小,如果小于父节点则交换,然后继续向上调整,直到调整到根节点(自底向上调整);而堆顶元素移除操作(最小堆就是移除最小元素,最大堆就是移除最大元素)则是直接用数组最后一个元素代替被移除的元素,然后自顶向下调整,对于最小堆而言就是和两个子节点的最小值比较,如果比这个值大则交换,直到调整到尾节点。二叉堆相对来说比较简单,更多信息这里就不赘述了~

  现在我们知道了DelayedWorkQueue是由最小堆实现的,那么又是如何实现任务延时执行的呢?在ScheduledThreadPoolExecutor中,一个工作线程被创建后会调用父类ThreadPoolExecutor的getTask方法获取任务,而getTask方法会根据是否需要超时获取任务来调用阻塞队列的take方法或poll方法,这里就来看看DelayedWorkQueue的take方法实现:

        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
            	//自旋
                for (;;) {
                	//获取队头元素
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                    	//如果为空表示队列为空,那么线程进入条件队列阻塞
                        available.await();
                    else {
                    	//队头元素不为空,获取其距可执行还剩余的时间
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                        	//如果delay<=0,说明任务可以执行了
                        	//完成任务获取,返回任务
                            return finishPoll(first);
                        //否则表示队头任务还没有到该执行的时候,而队头元素一定是最近需要执行的任务
                        //也就表明队列中没有一个任务需要现在执行
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                        	//leader是一个线程,如果不为空,那么进入条件队列阻塞,一直等待
                            available.await();
                        else {
                        	//将当前线程设置为leader
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                            	//然后进入条件队列阻塞,但是超时时间是队头任务还需剩余的触发时间
                            	//超时之后再次循环到上面去获取队头元素,判断是否需要触发,理论上这里超时了就代表队头元素
                            	//到了该执行的时间
                                available.awaitNanos(delay);
                            } finally {
                            	//这里leader线程可能发生了变更
                                if (leader == thisThread)
                                	//又将leader置空
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

  take()方法的逻辑很简单,就是从最小堆中获取堆顶元素,这个元素对应的任务一定是任务列表中需要最先执行的(因为最小堆根据任务触发时间排序),获取到之后判断当前是否到了该任务需要触发的时间,如果达到了则返回该任务,返回的时候也会移除该节点,然后最小堆自动调整,堆顶元素又成了剩余任务列表中需要最先执行的任务;而如果没有到达执行时间,则需要阻塞等待,在阻塞被唤醒后又会进入循环再次获取堆顶元素,判断触发时间。如果任务达到了触发时间,那么通过finishPoll方法返回任务。
  在finishPoll方法中,就是将任务数量(size属性)减1,然后重新调整堆,使剩余任务中需要最先触发的任务排在堆顶位置:

	private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
            int s = --size;
            RunnableScheduledFuture<?> x = queue[s];
            queue[s] = null;
            if (s != 0)
            	//调整堆
                siftDown(0, x);
            setIndex(f, -1);
            return f;
        }

  任务返回后会调用其run方法运行任务,这个是ThreadPoolExecutor的逻辑,任务是ScheduledFutureTask类型:
在这里插入图片描述
  其间接实现了Runnable接口,继承自FutureTask,在ScheduledFutureTask中找到run方法实现:

	public void run() {
			//判断是否为周期任务
            boolean periodic = isPeriodic();
			//判断在当前线程池状态下是否能够执行该任务
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
            	//不是周期任务,调用父类FutureTask的run方法
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
            	//如果是周期任务,调用父类的runAndReset方法
            	//设置任务下次运行时间
                setNextRunTime();
                //outerTask任务重新入队,outerTask在提交周期任务的时候会设置
                reExecutePeriodic(outerTask);
            }
        }

  从这个run方法中我们能够看出来,任务执行的时候会再次判断线程池状态,根据是否为周期任务和具体的参数来决定是否能够执行任务。如果任务不是周期任务,那么调用父类的run方法执行,否则调用父类的runAndReset方法。runAndReset和run方法的主要区别是runAndReset不会保存任务执行结果,并且会将任务状态重置为NEW(关于FutureTask在后面的文章中再细说~)。而如果是周期任务,在任务执行完成并且重置任务状态后,会计算任务的下次运行时间,然后将任务重新入队,等待下次执行(下文细说)。
  但是take方法中出现的leader线程是什么意思呢?这里就涉及到一个Leader-Follower模式。

Leader-Follower线程模型

  在这个模型中,线程分为LeaderFollwer角色,还有一个processing状态。在线程池中只有一个Leader线程负责等待接收请求(事件),其它的都是Follwer线程,Leader接收请求后会从Follower中选拔一个新的Leader线程等待下一个请求,而自己转入processing状态,处理完成后又成为Follower,这种模式在基于事件分发的线程池场景下减少了线程上下文切换。

注:ReentrantLock创建的Condition在await的时候会调用fullRelease方法释放互斥锁(state),所以即使take方法通过lockInterruptibly施加了互斥锁,在线程await进入条件队列阻塞后,其它线程也能获取互斥锁,从而进入for循环

  在DelayedWorkQueue的场景中,从队列中获取一个任务后,如果任务还没有到触发时间,那么该任务就不能被执行,这可以通过阻塞线程实现:只需要将获取到任务的线程阻塞距离任务运行的剩余时间,线程醒来后然后再在循环中重新从队列获取任务,再次判断触发时间即可。前面也提到了,DelayedWorkQueue底层是通过数组实现的一个最小二叉堆,那么队头的任务(queue[0])一定是需要最先执行的任务,假设线程1获取到互斥锁进入for循环,从队头获取任务,发现任务运行还需要t秒,那么进入条件队列进行超时(t秒)阻塞,同时释放state;接着线程2获取到互斥锁进入for循环,从队头获取任务,发现任务运行还需要tt秒,那么也需要进入条件队列进行阻塞,但是线程2的这个阻塞就不用使用超时(tt秒)阻塞了;如果还有线程3,也同理。
  leader线程的引入就是为了在这种情况下减少不必要的超时阻塞,借鉴的就是Leader-Follower线程模型,对于队头任务,只有一个leader线程负责对其进行超时阻塞等待,其它线程都进行"永久"等待,leader线程从队列中取走任务之后(queue[0])则唤醒一个follower线程成为新的leader。
  到现在我们已经了解到了队列DelayedWorkQueue中的任务是如何被消费的,那么回过头来看看delayedExecute方法中任务入队的逻辑,也就是DelayedWorkQueue的add方法:

		public boolean add(Runnable e) {
            return offer(e);
        }

  间接调用的offer方法:

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                	//元素数量达到了队列长度(默认为16),则需要扩容
                    grow();
                //任务数量加1,任务出队的时候减一
                size = i + 1;
                if (i == 0) {
                	//直接设置队头元素
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                	//任务入队(最小二叉堆),并且自动调增堆,使触发时间最小的排在堆顶位置
                    siftUp(i, e);
                }
                if (queue[0] == e) {
                	//如果天添加的任务位于堆顶,那么说明队列之前为空,或者任务入队,堆调整
                	//后排在堆顶位置
                	//那么置空leader,唤醒一个线程,被唤醒的线程在下一次for循环中可以重新设置leader线程
                	//有可能发生leader线程的变更
                    leader = null;
                    available.signal();
                }
            } finally {
                lock.unlock();
            }
            return true;
        }
        
        /*队列扩容*/
        private void grow() {
            int oldCapacity = queue.length;
            //增加50%的容量
            int newCapacity = oldCapacity + (oldCapacity >> 1);
            if (newCapacity < 0)
            	//溢出,使用int最大值
                newCapacity = Integer.MAX_VALUE;
            //直接拷贝数组
            queue = Arrays.copyOf(queue, newCapacity);
        }

  offer入队的逻辑比较清晰,先判断元素是否达到了队列的长度,如果达到了则需要扩容50%的容量,代码中是通过oldCapacity + (oldCapacity >> 1)实现的,表示的含义就是oldCapacity+oldCapacity/2==oldCapacity*1.5,也就是扩容50%。接着如果任务入队后排在堆顶位置,那么就可能有两种情况:

  • 之前堆为空,新入队任务作为第一个任务排在堆顶位置
  • 之前堆不为空,但是任务入队后,经过调整被移动到了堆顶位置,也就是新加入的任务需要最先触发

  对于第二种情况,将leader置null,然后唤醒一个阻塞的线程,阻塞线程被唤醒后再重新设置一个leader线程。到这里我们对schedule方法做一个总结:
  schedule方法返回的是一个ScheduledFuture,如果方法传入的是一个Callable,那么将其和触发时间封装成一个ScheduledFutureTask添加到队列中;如果传入的是一个Runnable,那么会先通过适配器将其封装成一个Callbale。在ScheduledFutureTask入队之前还提供了一个空方法decorateTask供子类去重写,以实现对ScheduledFutureTask进行进一步的装饰。ScheduledThreadPoolExecutor存储任务使用的队列是DelayedWorkQueue,其本质上是用数组实现的一个最小

以上是关于Java并发编程(十九):ScheduledThreadPoolExecutor总结与源码分析的主要内容,如果未能解决你的问题,请参考以下文章

转:Java并发编程之十九:并发新特性—Executor框架与线程池(含代码)

Java并发编程(十九):ScheduledThreadPoolExecutor总结与源码分析

Java并发编程(十九):ScheduledThreadPoolExecutor总结与源码分析

Java并发编程(十九):ScheduledThreadPoolExecutor总结与源码分析

Java并发编程原理与实战十九:AQS 剖析

Java并发编程原理与实战二十九:Exchanger