Java Executor源码解析—ThreadPoolExecutor线程池submit方法以及FutureTask源码一万字
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Executor源码解析—ThreadPoolExecutor线程池submit方法以及FutureTask源码一万字相关的知识,希望对你有一定的参考价值。
详细介绍了ThreadPoolExecutor线程池的submit方法的源码,以及FutureTask的原理。
上一篇文章我们介绍了ThreadPoolExecutor线程池的execute方法源码:Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】。除了execute方法之外,ThreadPoolExecutor 还有其他三个submit提交的方法,它们的内部都是调用的execute方法,它会返回一个FutureTask,可以从该对象中获取结果或者异常信息,下面来看看它的原理
文章目录
1 submit方法
1.1 Future submit(Runnable)
public Future< ? > submit(Runnable task)
task要求一个 Runnable 实现类,返回一个 Future 对象。这个 Future 对象可以用来检查 Runnable 是否已经执行完毕。该 Future 的 get 方法在任务成功完成时将会返回 null。
如果任务无法安排执行,那么可能抛出RejectedExecutionException。如果该任务为 null,那么抛出NullPointerException。
/**
* 位于父类AbstractExecutorService中的实现
*
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException 如果该任务为 null
*/
public Future<?> submit(Runnable task)
//首先就是task任务的null校验,如果为null直接抛出NullPointerException
if (task == null) throw new NullPointerException();
//将Runnable类型的task任务包装为RunnableFuture类型的ftask任务,返回null
RunnableFuture<Void> ftask = newTaskFor(task, null);
//调用execute方法传递ftask,RunnableFuture是Runnable的子接口,因此可以传递
execute(ftask);
return ftask;
/**
* 位于父类AbstractExecutorService中的实现
* 为给定可运行任务返回一个 RunnableFuture。
* 该RunnableFuture具有默认的返回值,并为底层任务提供取消操作。
*
* @param runnable 被包装的任务
* @param value 将要返回的默认值
* @param <T> 返回值类型
* @return 为给定可运行任务返回一个 RunnableFuture。
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
//调用FutureTask的构造器
return new FutureTask<T>(runnable, value);
后面我们再介绍FutureTask是如何对Runnable以及Callable任务进行包装的,以及FutureTask相关方法的实现。
1.2 Future submit(Runnable, T)
public < T > Future< T > submit(Runnable task,T result)
task要求一个 Runnable 实现类,并且需要一个给定的返回值result,将返回一个 Future 对象,这个 Future 对象除了可以用来检查 Runnable 是否已经执行完毕,还可以调用get()方法获取执行结果,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
如果任务无法安排执行,那么可能抛出RejectedExecutionException。如果该任务为 null,那么抛出NullPointerException。
/**
* 位于父类AbstractExecutorService中的实现
* 为给定可运行任务返回一个 RunnableFuture。该RunnableFuture具有指定默认的返回值result,并为底层任务提供取消等操作。
* 同时调用execute执行这个RunnableFuture实例。
*
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException 如果该任务为 null
*/
public <T> Future<T> submit(Runnable task, T result)
//首先就是task任务的null校验,如果为null直接抛出NullPointerException
if (task == null) throw new NullPointerException();
//将Runnable类型的task任务包装为RunnableFuture类型的ftask任务,get返回指定值
RunnableFuture<T> ftask = newTaskFor(task, result);
//调用execute方法传递ftask,RunnableFuture是Runnable的子接口,因此可以传递
execute(ftask);
return ftask;
1.3 Future submit(Callable< T >)
public < T > Future< T > submit(Callable< T > task)
task要求一个 Callable实现类。返回一个 Future 对象,这个 Future 对象除了可以用来检查 Runnable 是否已经执行完毕,还可以调用get()方法获取Callable中call方法的执行结果,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
如果任务无法安排执行,那么可能抛出RejectedExecutionException。如果该任务为 null,那么抛出NullPointerException。
/**
* 位于父类AbstractExecutorService中的实现
* 为给定可运行任务返回一个 RunnableFuture。该RunnableFuture的返回值result就是传递的Callable的call方法的返回值,并为底层任务提供取消等操作。
* 同时调用execute执行这个RunnableFuture实例。
*
* @throws RejectedExecutionException 如果任务无法安排执行
* @throws NullPointerException 如果该任务为 null
*/
public <T> Future<T> submit(Callable<T> task)
//首先就是task任务的null校验,如果为null直接抛出NullPointerException
if (task == null) throw new NullPointerException();
//将Runnable类型的task任务包装为RunnableFuture类型的ftask任务,get返回指定值
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
/**
* 为给定可运行任务返回一个 RunnableFuture。该RunnableFuture的返回值result就是传递的Callable的call方法的返回值,并为底层任务提供取消等操作。
*
* @param callable 被包装的任务
* @param <T> Callable返回值类型
* @return 为给定可运行任务返回一个 RunnableFuture。
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
//调用FutureTask的构造器
return new FutureTask<T>(callable);
2 FutureTask的原理
2.1 FutureTask的概述
public class FutureTask< V >
extends Object
implements RunnableFuture< V >
在最开始我们就简单介绍过,FutureTask作为可取消的异步计算。类提供了对 Future 的基本实现,具有开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法等方法。重要的get()方法仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。
FutureTask可以与线程池连用,也可以单独与Thread使用,因此完全可以与线程池独立开来讲解。
上面的submit方法内部调用了execute方法,传递的参数是一个FutureTask实例,类型为RunnableFuture,RunnableFuture是Runnable的子接口,因此可以传递。
工作线程在runWorker方法中调用task.run()方法执行任务。此时我们可以知道,对于submit系列方法,并不是直接运行传递的task的run或者call方法,而是运行该task的包装对象FutureTask的run方法。 可以猜测到FutureTask的run方法对于传递进来的原始的Runnable或者Callable任务的run或者call方法做了包装。现在我们来看看FutureTask到底是如何对Runnable或者Callable任务进行包装的。
先看一个简单的使用案例:
FutureTask objectFutureTask = new FutureTask(() ->
Thread.sleep(3000);
return 1;
);
//与线程连用
Thread thread = new Thread(objectFutureTask);
thread.start();
//约3秒钟之后会获取返回值
System.out.println(objectFutureTask.get());
2.2 FutureTask的重要属性
FutureTask作为一个任务,具有7种不同的运行状态,每种运行状态使用单独的int类型表示,并且它们有大小关系,这类似于ThreadPoolExecutor的状态的作用,可以通过比较大小来判断任务执行情况:NEW< COMPLETING< NORMAL< EXCEPTIONAL< CANCELLED< INTERRUPTING< INTERRUPTED。比如get方法就用到了大小关系。
内部保存的任务类型只有一个Callable类型,也就是说,实际上构造器传递进来的Runnable任务也会通过Executors.callable(runnable, result)工厂方法转换为Callable类型的任务。
具有一个Object类型的outcome保存要从get方法返回的数据或者要抛出的异常。
还有一个WaitNode类型的单链表,用于保存等待该FutureTask的返回值的线程。
/**
* 此任务的运行状态
* 运行状态仅在set、setException、cancel方法中转换
* 可能的状态转换如下:
* NEW -> COMPLETING -> NORMAL 表示正常执行完毕
* NEW -> COMPLETING -> EXCEPTIONAL 表示执行任务时发生异常
* NEW -> CANCELLED 表示任务被取消,cancel(false)。如果此任务尚未启动,则此任务将永不运行;如果任务已经启动,则不影响运行
* NEW -> INTERRUPTING -> INTERRUPTED 表示任务被取消,cancel(false)。如果此任务尚未启动,则此任务将永不运行;如果任务已经启动,则停止执行(中断任务线程)
*/
private volatile int state;
/**
* new新建任务,或者正在执行任务call()方法的状态
*/
private static final int NEW = 0;
/**
* 任务call()方法被调用完毕之后(无论成功还是抛出异常)的状态,
* 或者调用set、setException方法CAS成功之后的状态
* 此时outcome还没有记录执行结果
* 这是一个中间状态
*/
private static final int COMPLETING = 1;
/**
* 任务call()方法被调用成功之后的最终状态
* 或者调用set方法直接设置结果成功之后的最终状态
* 此时outcome已经记录执行结果
* 该状态不需要立即感知
*/
private static final int NORMAL = 2;
/**
* 任务call()方法被调用抛出异常之后的最终状态
* 或者调用setException方法直接设置异常信息成功之后的最终状态
* 此时outcome已经记录执行结果
* 该状态不需要立即感知
*/
private static final int EXCEPTIONAL = 3;
/**
* 任务还未执行或者正在执行(NEW状态)时,调用cancel(false)尝试取消任务操作成功的最终状态
*/
private static final int CANCELLED = 4;
/**
* 任务还未执行或者正在执行(NEW状态)时,调用cancel(true)尝试取消任务操作成功的状态
* 这是一个中间状态
*/
private static final int INTERRUPTING = 5;
/**
* 调用cancel(true)设置INTERRUPTING状态成功之后会继续设置的最终状态
* 如果此时任务正在执行,那么中断该任务运行线程之后设置的状态
* 该状态不需要立即感知
*/
private static final int INTERRUPTED = 6;
/**
* 保存的构造器传递进来的任务,可以看到就是Callable类型
* 实际上构造器传递进来的Runnable任务也会通过Executors.callable(runnable, result)工厂方法转换为Callable类型的任务
* 在运行完毕之后(无论是成功还是异常)将会置为null
*/
private Callable<V> callable;
/**
* 要返回的结果,或者要从get()方法中抛出的异常
* 默认为null,set或者setException方法调用成功之后被设置值
*/
private Object outcome;
/**
* 运行Callable任务的线程
* 默认为null,在run方法被成功调用之后会被赋值初始化
*/
private volatile Thread runner;
/**
* 等待该FutureTask的返回值的线程,是一个单链表队列,waiters存储头部元素
*/
private volatile WaitNode waiters;
/**
* 简单的单链表实现内部类,用于存放在get方法上等待的线程
*/
static final class WaitNode
/**
* 等待的线程
*/
volatile Thread thread;
/**
* 后继
*/
volatile WaitNode next;
/**
* 构造器
*/
WaitNode()
thread = Thread.currentThread();
2.3 FutureTask的构造器
public FutureTask(Callable< V > callable)
创建一个 FutureTask,一旦运行就执行给定的Callable任务,并且任务完成时可通过get方法返回call()的结果。
public FutureTask(Runnable runnable, V result)
创建一个 FutureTask,一旦运行就执行给定的Runnable任务,并且任务完成时可通过get方法返回指定的结果。
sumbit方法就是通过调用FutureTask的构造器返回FutureTask实例的。可以看到传递进来的Runnable任务也会通过Executors.callable(runnable, result)工厂方法转换为Callable类型的任务,这是一种设计模式—适配器模式。而FutureTask对callable任务的包装也是一种适配器模式——转换为Runnable类型。
构造器仅仅是初始化callable属性,以及FutureTask状态为NEW。
/**
* 创建一个 FutureTask,一旦运行就执行给定的Callable任务,并且任务完成时可通过get方法返回call()的结果。
*
* @param callable callable任务
* @throws NullPointerException 如果 callable 为 null。
*/
public FutureTask(Callable<V> callable)
//callable的null校验
if (callable == null)
throw new NullPointerException();
//初始化callable属性
this.callable = callable;
//初始化FutureTask状态为NEW
this.state = NEW; // ensure visibility of callable
/**
* 创建一个 FutureTask,一旦运行就执行给定的Runnable任务,并且任务完成时可通过get方法返回指定的结果。
*
* @param runnable runnable 任务
* @param result 任务完成时指定返回的结果
* @throws NullPointerException 如果 runnable 为 null。
*/
public FutureTask(Runnable runnable, V result)
//调用Executors.callable将runnable和指定返回值包装并返回一个callable
//这里的源码我们在 Executors章节部分讲解
this.callable = Executors.callable(runnable, result);
//初始化FutureTask状态为NEW
this.state = NEW; // ensure visibility of callable
2.4 run核心方法
无论是单独和一个Thread线程使用还是和线程池使用,内部的线程运行的不再是被包装的任务的run或者call方法,而是FutureTask的run。这个方法是FutureTask的核心方法,只能被执行任务的线程调用,其他线程是无法调用的。
run的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 校验任务状态state: 如果如果state不是NEW状态,或者state是NEW状态,但是CAS的将runner从null设置为当前调用线程失败。以上两个条件满足一个就立即返回,任务不会被执行,即要求任务在此前没有被执行过才能开始执行。
- 到这一步,说明任务没有被执行过,此时执行该任务的线程被记录在runner中。在cancel(true)方法中,如果runner不为null,那么任务就被算作正在执行过程中了,开启一个try块,执行任务:
- 保存要执行的任务c,校验任务: 如果c不为null,并且state还是为NEW,那么可以执行真正的任务。这里还需要校验一下state,因为在上次校验到此之间可能存在其他线程取消了任务,如果任务在真正开始前就被取消了,那么直接就不执行真正的任务。
- result变量用来保存执行结果,ran变量用来保存任务是否执行成功。
- 在一个try块中,调用c.call()方法,这里才是执行真正的任务,这个call方法中的代码就是我们自己编写的代码。如果call执行成功,那么result接收其返回值,ran置为true,表示执行成功。
- 使用catch捕获执行过程中的任何异常,如果call方法抛出异常,那么result置为null,ran置为false,表示执行失败。随后调用setException方法,设置异常的返回值,并唤醒因为get阻塞的线程。
- 如果ran为true,即任务执行成功。调用set方法,用于设置正常的返回值,并唤醒因为get阻塞的线程。
- 保存要执行的任务c,校验任务: 如果c不为null,并且state还是为NEW,那么可以执行真正的任务。这里还需要校验一下state,因为在上次校验到此之间可能存在其他线程取消了任务,如果任务在真正开始前就被取消了,那么直接就不执行真正的任务。
- 任务执行完毕或者没有执行,都会进入finally代码块中:
- 将runner设置为null,到此表示任务执行完毕。在任务的校验以及执行过程中runner必须一直非null,用来保证run()方法不会被并发的调用。
- 重新获取此时的state状态s,因为在上面的任务的校验以及执行过程中可能被cancel方法取消,state会被改变。
- 如果状态s大于等于INTERRUPTING,表示任务执行过程中其他线程执行了cancel(true)成功,那么本次任务执行的setException以及set方法都执行失败,但是此执行线程不一定被中断了,任务也不一定被取消了。可能执行线程在执行任务call()方法完毕时,cancel的线程更改了state值为INTERRUPTING,这将导致执行线程的setException或者set失败,随后执行线程将runner置空,那么cancel线程在后续代码中由于runner为null将不会中断执行线程;但是也有可能cancel线程先获取到了runner,此时runner还不为null,但是执行线程实际上将任务执行完了,此时还是会中断线程。
- 此时调用handlePossibleCancellationInterrupt处理可能的中断,等待任务状态变成INTERRUPTED状态,即等待执行线程被中断或者不被中断。
/**
* FutureTask的核心方法,被执行线程调用,用来执行任务
*/
public void run()
/*
* 如果state不是NEW状态
* 或者 state是NEW状态,但是CAS的将runner从null设置为当前调用线程失败
*
* 以上两个条件满足一个就立即返回,任务不会被执行,即要求任务在此前没有被执行过才能开始执行
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
/*
* 到这一步,说明任务没有被执行过,此时执行该任务的线程被记录在runner中
* 在cancel(true)方法中,如果runner不为null,那么任务就被算作正在执行过程中了
* 开启一个try块,执行任务
*/
try
//保存要执行的任务c
Callable<V> c = callable;
//如果c不为null,并且state还是为NEW,那么可以执行真正的任务
//这里还需要校验一下,因为在上次校验到此之间可能存在其他线程取消了任务
//如果被取消了,那么不会执行底层真正的任务
if (c != null && state == NEW)
//result用来保存执行结果
V result;
//ran用来保存任务是否执行成功
boolean ran;
try
//调用c.call()方法,这里才是执行真正的任务,这个call方法中的代码就是我们自己编写的代码
//如果call执行成功,那么result接收其返回值
result = c.call();
//ran置为true,表示执行成功
ran = true;
catch (Throwable ex)
//如果call方法抛出异常,那么result置为null
result = null;
//ran置为false,表示执行失败
ran = false;
//调用setException方法,用于设置异常的返回值,并唤醒因为get阻塞的线程
setException(ex);
//如果ran为true,即任务执行成功
if (ran)
//调用set方法,用于设置正常的返回值,并唤醒因为get阻塞的线程
set(result);
finally
//执行完毕后在finally中,将runner设置为null,到此表示任务执行完毕
//在任务执行过程中runner必须一直非null,用来保证run()方法不会被并发的调用
runner = null;
//重新获取此时的state,因为在任务执行过程中可能被中断,state会被改变
int s = state;
/*
* 如果状态大于等于INTERRUPTING,表示任务执行过程中其他线程执行了cancel(true)成功
* 那么本次任务执行的setException以及set方法都执行失败,但是此执行线程不一定被中断了
* 可能执行线程在执行任务call()方法完毕时,cancel的线程更改了state值为INTERRUPTING,这将导致执行线程的setException或者set失败
*
* 随后执行线程将runner置空,那么cancel线程在后续代码中由于runner为null将不会中断执行线程
* 但是也有可能cancel线程先获取到了runner,此时runner还不为null,但是执行线程实际上将任务执行完了,此时还是会中断线程
*/
if (s >= INTERRUPTING)
//处理中断,等待任务状态变成INTERRUPTED状态,即等待执行线程被中断或者不被中断
handlePossibleCancellationInterrupt(s);
/**
* 处理由于调用cancel(true)方法而在任务执行过程中可能的中断
* 等待任务状态变成INTERRUPTED状态,即等待执行线程被中断或者不被中断
*
* @param s 状态
*/
private void handlePossibleCancellationInterrupt(int s)
//如果状态为INTERRUPTING
if (s == INTERRUPTING)
//那么循环或去哦最新的状态,如果等于INTERRUPTING,那么让出cpu执行权
//让cancel(true)的线程尽快将状态设置为INTERRUPTED成功
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
2.4.1 set、setException设置结果
run方法中的c.call()调用成功返回或者抛出异常的时候,将分别调用set和setException方法用于尝试CAS的设置state状态为COMPLETING,设置成功之后会继续设置get方法的返回结果outcome,接着lazyset的设置最终状态NORMAL或者EXCEPTIONAL,最后会调用finishCompletion方法唤醒因为调用get方法而阻塞在内部等待队列中的外部线程,并且最后清理callable任务引用。
/**
* 除非已经设置了此 Future已不是NEW状态,否则将其get方法的结果设置为给定的值。
* 在计算成功完成时通过 run 方法内部调用此方法。
*
* @param v c.call方法的返回值
*/
protected void set(V v)
//尝试CAS的将state的值从NEW设置为COMPLETING,失败则退出方法
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING))
//CAS成功之后,就是所谓的COMPLETING状态
//随后将v的值赋给outcome,作为get方法的返回值
outcome = v;
//设置当前任务的状态为NORMAL ,也就是任务正常结束
/*
* 最终会设置成NORMAL状态,但是可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
* 关于该方法的更多信息可以参考并发编程网翻译的一篇文章《AtomicLong.lazySet是如何工作的?》,
* 文章地址是“http://ifeve.com/how-does-atomiclong-lazyset-work/”。
* AtomicLong.lazySet方法的内部就是调用的putOrdered系列方法
*
* 这里没有使用CAS 是因为对于同一个任务只可能有一个线程运行到这里。
* 在这里使用putOrderedlnt,比使用CAS或者putLongvolatile的效率要高,
* 并且这里的场景不要求其他线程马上对设置的NORMAL状态值可见。
*/
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//调用finishCompletion方法,唤醒因为调用get方法而阻塞在内部等待队列中的外部线程
//并且最后清理callable任务引用。
finishCompletion();
/**
* 除非已经设置了此 Future已不是NEW状态,否则将其get方法的结果设置为抛出一个ExecutionException,并将给定的 throwable 作为其原因。
* 在计算失败时通过 run 方法内部调用此方法。
*
* @param t c.call方法异常的原因
*/
protected void setException(Throwable t)
//尝试CAS的将state的值从NEW设置为COMPLETING,失败则退出方法
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING))
//CAS成功之后,就是所谓的COMPLETING状态
//随后将t的值赋给outcome,作为get方法要抛出的异常
outcome = t;
/*
* 最终会设置成NORMAL状态,但是可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
* 关于该方法的更多信息可以参考并发编程网翻译的一篇文章《AtomicLong.lazySet是如何工作的?》,
* 文章地址是“http://ifeve.com/how-does-atomiclong-lazyset-work/”。
* AtomicLong.lazySet方法的内部就是调用的putOrdered系列方法
*
* 这里没有使用CAS 是因为对于同一个任务只可能有一个线程运行到这里。
* 在这里使用putOrderedlnt,比使用CAS或者putLongvolatile的效率要高,
* 并且这里的场景不要求其他线程马上对设置的EXCEPTIONAL状态值可见。
*/
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//调用finishCompletion方法,唤醒因为调用get方法而阻塞在内部等待队列中的外部线程
//并且最后清理callable任务引用。
finishCompletion();
2.5 cancel取消任务
public boolean cancel(boolean mayInterruptIfRunning)
试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。
当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程,如果为true,表示应该中断执行此任务的线程,如果为false,表示允许正在运行的任务运行完成。
如果无法取消任务,则返回 false,表示它已经正常完成或者已被取消;否则返回 true,返回true也不代表任务方法没有执行完毕,有可能任务执行完了,只是不能获取返回值而已。因为中断线程不代表会中断任务,即使中断了线程,只是设置了中断标志位,任务有可能实际上还是执行完毕了的,只不过无法获得返回值(get抛出异常)。Java线程中断与停止线程详解以及案例演示。
cancel的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 任务状态state校验: 如果state不等于NEW,说明该任务已完成或者取消;或者如果state等于NEW,但是尝试将CAS的将state的值从NEW改为INTERRUPTING(mayInterruptIfRunning为true)或者CANCELLED(mayInterruptIfRunning为false)失败,表示状态改变失败。这两种情况都表示取消任务失败,直接返回false。
- 到这一步,表示任务状态state已经被改为INTERRUPTING或者CANCELLED,此时如果在run方法中c.call方法运行完毕或者抛出异常,后续的set或者setException方法也不会成功。而如果run方法中的任务还没有开始执行,那么后面也就不会执行了。开启一个try代码块:
- 如果mayInterruptIfRunning为true,说明需要中断正在执行的线程。开启一个try代码块:
- 使用t记录此时的runner,注意这里只是记录了瞬时的runner的状态,有可能刚把runner记录下来,runner在run方法中就被置为null了。
- 如果t不为null,那么将t中断。
- 在finally代码块中*,会将状态lazySet的设置为INTERRUPTED状态。
- 如果mayInterruptIfRunning为true,说明需要中断正在执行的线程。开启一个try代码块:
- 在finally代码块中,调用finishCompletion方法,唤醒因为调用get方法而阻塞在内部等待队列中的外部线程,并且最后清理callable任务,即callable不能被再次调用。
- 最后返回true。
可以看到,cancel方法就有两条任务状态转换路线:
cancel(false): NEW——CANCELLED
cancel(true): NEW——INTERRUPTING——INTERRUPTED
/**
1. 尝试取消任务的执行
2. 3. @param mayInterruptIfRunning 如果应该中断正在执行此任务的线程,则为 true;否则允许正在运行的任务运行完成
4. @return 如果无法取消任务,则返回 false,这通常是由于它已经正常完成或者已取消;否则返回 true
5. 返回true也不代表任务方法没有执行完毕,有可能任务执行完了,只是不能获取返回值而已。
*/
public boolean cancel(boolean mayInterruptIfRunning)
/*
* 1 如果state不等于NEW,说明该任务已完成或者取消
* 2 如果state等于NEW,但是尝试将CAS的将state的值从NEW改为INTERRUPTING(mayInterruptIfRunning为true)
* 或者CANCELLED(mayInterruptIfRunning为false)失败,表示状态改变失败
*
* 这两种情况都表示取消任务失败,直接返回false
*/
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
/*
* 到这一步,表示任务状态state已经被改为INTERRUPTING或者CANCELLED,
* 此时如果在run方法中c.call方法运行完毕或者抛出异常,后续的set或者setException方法也不会成功
* 而如果run方法中的任务还没有开始执行,那么后面也就不会执行了。
*/
try // in case call to interrupt throws exception
//如果mayInterruptIfRunning为true,说明需要中断正在执行的线程
if (mayInterruptIfRunning)
try
//使用t记录此时的runner,注意这里只是记录了瞬时的runner的状态,有可能刚把runner记录下来,runner在run方法中就被置为null了
Thread t = runner;
/*
* 如果t不为null,那么将t中断。可以看到,所谓的尝试停止任务仅仅是尝试中断执行任务的线程
* 如果任务线程在调用 Object 类的 wait()、wait(long) 或 wait(long, int) 方法,或者该类的 join()、join(long)、join(long,int)、
* sleep(long) 或 sleep(long, int) 方法过程中受阻,则其中断状态将被清除,它还将收到一个 InterruptedException,这是才可能中断任务执行的可能
* 而这个interrupt()不会对正常运行的线程有任何影响,仅仅是设置了中断标志位为true,因此不一定会中断任务执行
*/
if (t != null)
t.interrupt();
finally // final state
/*
* 最终会设置成INTERRUPTED状态,但是可能导致其他线程在之后的一小段时间内还是可以读到旧的值。
* 关于该方法的更多信息可以参考并发编程网翻译的一篇文章《AtomicLong.lazySet是如何工作的?》,
* 文章地址是“http://ifeve.com/how-does-atomiclong-lazyset-work/”。
* AtomicLong.lazySet方法的内部就是调用的putOrdered系列方法
*
* 这里没有使用CAS 是因为对于同一个任务只可能有一个线程运行到这里。
* 在这里使用putOrderedlnt,比使用CAS或者putLongvolatile的效率要高,
* 并且这里的场景不要求其他线程马上对设置的INTERRUPTED状态值可见。
*/
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
Java Executor源码解析—Executors线程池工厂以及四大内置线程池
Java Executor源码解析—ThreadPoolExecutor线程池submit方法以及FutureTask源码一万字
Java Executor源码解析—ThreadPoolExecutor线程池execute核心方法源码一万字