并发编程之定时任务

Posted yatou-blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程之定时任务相关的知识,希望对你有一定的参考价值。

ScheduledThreadPoolExecutor

  ScheduledThreadPoolExecutor继承了ThreadPoolExecutor 实现了ScheduledExecutorService。主要用来处理延时任务和定时任务。

 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

定时线程池的执行原理与一般的线程池执行过程有点差别,具体的定时线程的执行原理如下图所示:

技术图片

 定时线程池主要是接收ScheduledFutureTask任务,是线程池调度任务的最小单位,有3种提交方式:

1. schedule:schedule方法是指任务在指定延迟时间后触发,只执行一次。
2. scheduledAtFixedRate:
3. scheduledWithFixedDelay:
具体的案例如下:
package com.test.executor;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class SchelduledThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledService=Executors.newScheduledThreadPool(5);
scheduledService.schedule(
new Runnable() { @Override public void run() { System.out.println("延时任务执行"); } }, 1, TimeUnit.SECONDS); scheduledService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // TODO Auto-generated method stub System.out.println("不管任务是否执行完成,每过3秒产生一个新线程"); } }, 1, 3, TimeUnit.SECONDS); scheduledService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { System.out.println("前一个任务执行完成以后,隔3秒执行下一个任务"); } }, 1, 3, TimeUnit.SECONDS); } }

定时线程池采用的是DelayQueue,是个无界队列,内部封装了priorityQueue,根据time时间先后进行排序,若time相同则用sequenceNumber排序。

ScheduledFutureTask 有以下三种属性:

1、private final long sequenceNumber;任务序号
2、private final long period;任务执行时间间隔

3、private long time;任务开始时间

工作线程的执行过程:

1、工作线程从DelayQueue从获取到期的任务去执行;

2、执行结束后重新设置任务的到期时间,再次放回到DelayQueue中去;

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }


 private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())//如果线程池已经关闭
            reject(task);//拒绝任务
        else {
            super.getQueue().add(task);//否则直接加入队列
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);//若当前线程无法执行,则取消
            else
                ensurePrestart();//增加一个worker,避免提交的任务没有线程去执行,原因就是该类没有像ThreadPoolExecutor一样,woker满了才放入队列
        }
    }
void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

ScheduledThreadPoolExecutor会把执行的任务放到DelayQueue中去,DelayQueue中封装了一个PriorityQueue队列,该队列会对任务ScheduledFutureTask按照时间顺序进行排序,排序算法如下:

public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0)? 0 : ((d < 0)? -1 : 1);
        }

由以上代码可以看出,

1、先按照time进行排序,时间小的排在前面,时间大的排在后面;

2、time相同的话,就按照sequenceNumber来进行排序,sequenceNumber小的排在前面,大的排在后面。即时间相同,先提交的优先执行。

定时线程池任务运行的核心就是ScheduledFutureTask的run方法,下面来看一下:

 public void run() {
            boolean periodic = isPeriodic();
        //如果当前线程已经不支持执行任务,则取消
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
    //不需要周期性执行任务,则直接执行run然后结束任务
            else if (!periodic)
                ScheduledFutureTask.super.run();
    //如果需要周期性执行任务,则执行完任务以后,设置下一次执行时间,然后重复执行
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();//设置下一次执行时间
                reExecutePeriodic(outerTask);//重复执行任务
            }
        }
总结一下,具体的执行步骤如下:
1. 如果当前线程池运行状态不可以执行任务,取消该任务,然后直接返回,否则执行
步骤2;
2. 如果不是周期性任务,调用FutureTask中的run方法执行,会设置执行结果,然后
直接返回,否则执行步骤3;
3. 如果是周期性任务,调用FutureTask中的runAndReset方法执行,不会设置执行
结果,然后直接返回,否则执行步骤4和步骤5;
4. 计算下次执行该任务的具体时间;
5. 重复执行任务。
 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

reExecutePeriodic方法与delayedExecute方法类似,但是不同的是:

1、由于调用reExecutePeriodic方法的时候,任务已经执行过一次了,所以不会拒绝当前任务;

2、传入的任务一定是周期性执行的任务。

DelayedWorkQueue

DelayedWorkQueue是一个基于堆数据结构的无界队列。在执行任务的时候每个任务的执行时间都不一样,所以它的工作就是按照时间升序排列,执行时间距离当前时间越近则越排在队列前。但是这里的顺序并不是绝对的,因为堆中排序只是保证了子节点的任务执行时间要比父节点的下次执行时间要大,各个子节点之间并不是顺序排列的。

堆的数据结构如下:

技术图片

 

 堆结构可以转换成数组,如下:

{1,3,4,8,10,15,20}

假设,索引值从0开始,子节点的索引值为k,父节点的索引值为p,则:
1. 一个节点的左子节点的索引为:k = p * 2 + 1;
2. 一个节点的右子节点的索引为:k = (p + 1) * 2;
3. 一个节点的父节点的索引为:p = (k - 1) / 2。

为什么使用DelayedWorkQueue?

 因为定时任务需要取出最近需要执行的任务,所以任务队列当中每次出队的一定是队列当中最靠前的任务,所以要用优先级队列。而DelayedWorkQueue就是一个优先级队列,可以保证每次出队的任务都是当前队列当中最靠前的任务,而且该队列还是堆结构的,在执行插入和删除的时候复杂度为O(logN).
 

DelayedWorkQueue属性

private static final int INITIAL_CAPACITY = 16;队列的初始容量
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];根据初始容量创建的RunnableScheduledFuture数组
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;

private Thread leader = null;leader线程

 private final Condition available = lock.newCondition();当较新的任务在队列的头部可用时,或者说有新的线程可能需要成为leader的时候,通过这个条件发出信号。

对于多线程网络模型来说,线程有三种身份,leader,follower,proccesser。基准就是永远最多只有一个leader,所有的follower都在等待成为leader。线程池启动会自动产生一个leader,负责等待网络IO时间,当有一个事件产生的时候leader首先通知一个follower成为leader,然后自己就去处理这个网络事件,完毕以后自己加入follower线程等待队列当中去,等待下一次成为leader。这样的话可以增强CPU的高速缓存性,消除动态内存分配以及线程间的数据交换。

 Offer方法

public boolean offer(Runnable x) {
//参数校验
    if (x == null)
        throw new NullPointerException()?
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x?
    final ReentrantLock lock = this.lock?
    lock.lock()?
    try {
//查看当前元素数量,如果大于队列长度则进行扩容
        int i = size?
        if (i >= queue.length)
            grow()?
//元素数量加1
        size = i + 1?
//如果当前队列还没有元素,则直接加入头部
        if (i == 0) {
            queue[0] = e?
//记录索引
            setIndex(e, 0)?
        } else {
  //把任务加入堆中,并调整堆结构,这里就会根据任务的触发时间排列
             //把需要最早执行的任务放在前面
            siftUp(i, e)?
        }
//如果新加入的元素就是队列头,这里有两种情况
        //1.这是用户提交的第一个任务
        //2.新任务进行堆调整以后,排在队列头
        if (queue[0] == e) {
// leader设置为null为了使在take方法中的线程在通过available.signal()?后会执行
available.awaitNanos(delay)?
            leader null?
//加入元素以后,唤醒worker线程
            available.signal()?
        }
    } finally {
        lock.unlock()?
    }
    return true?
}

来看一下siftUp()方法:

private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
   // 获取父节点
        int parent = (k ­ 1) >>> 1?
        RunnableScheduledFuture<?> e = queue[parent]?
   // 如果key节点的执行时间大于父节点的执行时间,不需要再排序了
        if (key.compareTo(e) >= 0)
            break?
  // 如果key.compareTo(e) < 0,说明key节点的执行时间小于父节点的执行时间,需要把父节点移到后面
        queue[k] = e?
        setIndex(e, k)?
// 设置索引为k
        k = parent?
    }
// key设置为排序后的位置中
    queue[k] = key?
    setIndex(key, k)?
}

简言之就是循环的根据key节点与他的父节点进行比较,key节点的时间小于父节点,则交换位置,将时间点靠前的排在前面。

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don‘t retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }

在getTask的时候使用take方法,该方法是保证当前对列取出的就是最新需要执行的任务。保证了任务只有在指定的执行时间的时候才可以被取走。

以上是关于并发编程之定时任务的主要内容,如果未能解决你的问题,请参考以下文章

十一:并发编程之定时任务&定时线程池

JUC并发编程 共享模式之工具 ThreadPoolExecutor -- 线程池应用之定时任务(在每周周四执行定时任务)

JUC并发编程 共享模式之工具 ThreadPoolExecutor -- 任务调度线程池 定时任务 / 延时执行(Timer的缺点)

JavaEE开发之Spring中的多线程编程以及任务定时器详解

并发编程(十四)—— ScheduledThreadPoolExecutor 实现原理与源码深度解析 之 DelayedWorkQueue

并发编程之多线程篇之四