FutureTask源码分析
Posted 无虑的小猪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FutureTask源码分析相关的知识,希望对你有一定的参考价值。
1、Callable 与 FutureTask介绍
1.1、Callable
创建线程有两种方式,一种是继承Thread类,一种是实现Runnable接口重写run方法。其实Thread也实现了Runable接口。
在Runable接口中,仅有一个无参无返回结果的run方法。Runable接口详情:
@FunctionalInterface public interface Runnable public abstract void run();
Callable接口的功能与Runable接口类似,唯一不同的是Callable接口可以返回线程执行的结果并抛出异常,在Runable接口中,仅有一个无参有结果的call方法,可抛出异常。Callable接口详情:
@FunctionalInterface public interface Callable<V> V call() throws Exception;
1.2、FutureTask
Thread构造函数详情:
在Thread中,没有Callable类型的入参构造函数,只有Runable类型。需要一个实现了Runable接口的对象封装Callable,这个对象是FutureTask。有关FutureTask的详细介绍,在后文会做详细介绍。
2、Callable与FutureTask 的使用
示例代码如下:
import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; public class TestCallable public static void main(String[] args) throws Exception // 创建FutureTask FutureTask task = new FutureTask<>(new Callable<String>() @Override public String call() throws Exception // sleep 5 秒 TimeUnit.SECONDS.sleep(5); // 返回线程执行结果 return Thread.currentThread().getName() + " == CumCallable == "; ); // 启动线程 new Thread(task).start(); // 阻塞等待 Object o = task.get(); // 主线程执行 System.out.println(Thread.currentThread().getName() + " == main =="); System.out.println(o);
3、Callable与FutureTask源码分析
FutureTask类图关系如下;
FutureTask实现了Runable、Future接口。
1、Future
Runable接口这里不做介绍了,主要看Future接口详情如下:
// 操作线程任务 public interface Future<V> /** * 试图取消当前执行的任务 * 如果任务已经执行完成、或已经被取消、或不能被取消,返回false * 如果任务还未启动,就已经被取消,该任务永远不会被运行 * 如果任务已经启动,mayInterruptIfRunning参数决定正在执行的线程是否被中断停止该任务 */ boolean cancel(boolean mayInterruptIfRunning); /** * 在任务正常执行完成前,判断其是否被取消 */ boolean isCancelled(); /** * 判断任务是否执行完成 */ boolean isDone(); /** * 一直阻塞等待,直到获取执行结果 */ V get() throws InterruptedException, ExecutionException; /** * 阻塞等待timeout时间,获取执行结果,阻塞时间timeout已到达,抛出异常 */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
Future接口提供了操作线程的方法,如取消当前执行的任务、获取执行的结果、判断是否执行完成等。Future对线程的任务做了增强处理。
2、RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> // 线程执行未取消,通过此方法设置线程执行结果 void run();
3、FutureTask
FutureTask是基于 CAS + state + WaitNode节点链表 实现的。CAS保证多线程场景下的原子性;state线程状态,控制代码的返回及执行流程;WaitNode节点链表记录挂起的线程。
1、属性
1.1、执行任务线程状态
1.2、线程执行结果、等待队列头
callable:构造函数中传入的Callable对象,用于执行call()方法;
outcome:正常执行完,outcome记录执行结果;执行出现异常,outcome记录异常;
runner:当前正在执行的线程;
waiters:等待队列的头节点,单向链表的头。
1.3、WaitNode
waitNode是一个节点,存储在单向链表中,用于记录等待执行的线程,便于当前线程执行完后,唤醒等待的线程。
2、构造函数
FutureTask的构造函数入参支持Callable,也支持Runable。构造函数完成对callable、state属性的初始化操作。详情如下:
入参为Runable的构造函数是如何转换成Callable的呢?Executors#callable() 详情如下:
// Runable 转为 Callable,并返回入参 result public static <T> Callable<T> callable(Runnable task, T result) // 非空判断 if (task == null) throw new NullPointerException(); // 使用适配器,将Runable转换为Callable return new RunnableAdapter<T>(task, result);
RunnableAdapter采用了适配器模式,将Runable转换为Callable。我们来看看是怎么转换的。
3、run() - 入口方法
1、流程图
2、run()源码分析
FutureTask实现了Runable接口,从写了run()方法,run()作为线程执行的入口方法,先来看看具体做了哪些操作。
FutureTask#run() 详情如下:
public void run() // 当前线程不为新建状态 或者 当前线程cas获取锁资源失败,返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try // callable成员变量赋给局部变量 Callable<V> c = callable; // callable不为空,当前线程状态为 新建 if (c != null && state == NEW) // 执行结果 V result; // 执行结束标识 boolean ran; try // 执行实现的call方法,并获取返回结果 result = c.call(); // 执行结束标识设置为true -> 正常执行 ran = true; catch (Throwable ex) // 执行异常,返回null result = null; // 执行结束标识设置为true -> 执行出现异常 ran = false; // 设置异常信息 setException(ex); // 正常执行结束,存储执行结果 if (ran) set(result); finally // 线程释放锁资源 runner = null; // 中断线程的处理 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s);
在run()方法中,实际执行的是callable的call()方法,并用outcome成员变量存储线程执行的结果,若出现异常,outcome则存储异常信息。
1、正常执行结束
线程正常执行完,将线程的执行结果设置到FutureTask的outCome属性中,FutureTask#set()方法。
// 线程执行结果设置到outcome属性中 protected void set(V v) // 修改当前线程状态 NEW(新建) -> COMPLETING(运行中) if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) // 线程执行结果赋值到成员变量outcome outcome = v; // 修改线程状态 COMPLETING(运行中) -> NORMAL(正常执行结束) UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 唤醒因get()操作挂起的线程 finishCompletion();
线程执行执行结束,线程状态经过了 NEW -> COMPLETING -> NORMAL 的变化,将结果赋值给outcome,唤醒因get()操作挂起的线程。
2、异常的处理
线程执行过程中异常的处理,FutureTask#setException() 详情如下:
// 设置异常信息 protected void setException(Throwable t) // cas操作,修改当前执行的线程状态 NEW(新建) -> COMPLETING(运行中) if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) // 线程执行结果设置为异常 outcome = t; // 修改当前执行的线程状态 COMPLETING(运行中) -> EXCEPTIONAL(异常) UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 唤醒因get()操作挂起的线程 finishCompletion();
线程执行执行结束,线程状态经过了 NEW -> COMPLETING -> EXCEPTIONAL 的变化,将异常赋值给outcome,唤醒因get()操作挂起线程。
3、唤醒等待队列中的挂起线程
FutureTask#finishCompletion() 详情如下:
// 移除并唤醒所有等待执行的线程 private void finishCompletion() // q执行waiters链表的头节点 for (WaitNode q; (q = waiters) != null;) // cas操作,将waiters设置为null,担心外部线程使用 cancel 取消当前任务触发 finishCompletion() if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) // 自旋 for (;;) // 获取当前Node封装的 thread Thread t = q.thread; // 当前线程不为 null if (t != null) // 将当前等待节点的线程设置为null q.thread = null; // 唤醒当前节点因 get 操作阻塞的线程 LockSupport.unpark(t); // 获取下一个WaitNode WaitNode next = q.next; // 等待节点链表中没有待唤醒的线程,结束循环 if (next == null) break; // 将下一个待唤醒的等待节点从 WaitNode链表中移除,便于GC q.next = null; q = next; // 等待节点链表中没有待唤醒的线程,结束循环 break; // JDK提供修改当前线程执行状态的拓展方法,默认不实现 done(); // 执行完毕,callable callable = null; // JDK提供修改当前线程执行状态的拓展方法,默认不实现 protected void done()
4、处于中断过程中的线程处理
FutureTask实现了Future接口,Future提供了线程取消的方法cancel。若当前线程被取消,FutureTask执行cancel方法过程中,线程状态有一段时间是 INTERRUPTING (中断处理中) 的状态,在线程执行结束之前,FutureTask在 handlePossibleCancellationInterrupt 方法中 INTERRUPTING (中断处理中)状态 的线程做了特殊处理。
FutureTask#handlePossibleCancellationInterrupt() 详情如下:
private void handlePossibleCancellationInterrupt(int s) // 线程状态 INTERRUPTING (中断处理中) if (s == INTERRUPTING) while (state == INTERRUPTING) // 让出CPU,等待线程状态变为 INTERRUPTED (已中断) Thread.yield();
4、get() - 获取线程执行结果
1、流程图
2、get()源码分析
获取线程执行结果,FutureTask#get() 详情如下:
public V get() throws InterruptedException, ExecutionException // 当前线程执行状态 int s = state; // 当前线程处于 NEW(新建)、COMPLETING (运行中) ,阻塞挂起外部的get()获取结果的线程 if (s <= COMPLETING) s = awaitDone(false, 0L); // 线程被唤醒后,返回执行结果 return report(s);
当前线程状态为 NEW或COMPLETING ,挂起线程;等待线程正常执行结束、执行异常、线程取消调用finishCompletion()方法唤醒这些挂起的线程,再通过report方法,返回执行结果。
1、挂起新建、运行中的线程
挂起线程 FutureTask#awaitDone() 详情如下:
private int awaitDone(boolean timed, long nanos) throws InterruptedException // 是否带超时的阻塞,0不带超时 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 引用当前线程 封装成waitNode对象 WaitNode q = null; // 当前线程 waitNode对象是否 已经添加进等待链表中,默认未添加 boolean queued = false; // 自旋 for (;;) // 4.1、线程唤醒,说明线程是使用中断的方式唤醒,若interrupted() 返回为true 之后会将中断标记重置为false if (Thread.interrupted()) // 当前线程node出队 removeWaiter(q); // get方法抛出 中断异常 throw new InterruptedException(); // 4.2、当前线程被其他线程 使用unpark()方式 // 获取当前线程执行状态 int s = state; // 当前线程执行完成,将当前任务线程设置为null,并返回任务状态 // 状态是终态: NORMAL 、 EXCEPTIONAL 、 CANCELLED if (s > COMPLETING) // 当前线程已创建WaitNode对象,将其中的thread设置为null,helpGC if (q != null) q.thread = null; return s; // 当前线程处于 运行中,让出CPU,进行下一次的抢占 else if (s == COMPLETING) Thread.yield(); // 1、第一次自旋,是新线程,当前等待节点为null,为当前线程创建WaitNode对象 else if (q == null) // 创建等待节点 q = new WaitNode(); // 2、第二次自旋,线程已创建WaitNode 对象, WaitNode未入队 else if (!queued) // 当前线程node节点指向 原队列的头节点 waiters:一直指向队列的头 // cas设置waiters引用指向当前线程Node节点,若成功,queued为true;若失败,可能其他线程先一步入队了 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 3、第三次自旋,挂起线程操作 // 如果设置了超时时间,get操作的线程会被parkNanos,并过了超时时间的话,从 waiters 链表中删除当前 wait else if (timed) // 获取剩余阻塞时间 nanos = deadline - System.nanoTime(); // 剩余阻塞时间不足,将WaitNode从等待链表中移除,返回响应状态 if (nanos <= 0L) removeWaiter(q); return state; // 没有过超时时间,线程进入 TIMED_WAITING 状态 LockSupport.parkNanos(this, nanos); // 未设置超时时间,get操作的线程会被park,进入 WAITING 状态 // 除非有其他线程唤醒 或 将当前线程中断 else LockSupport.park(this);
awaitDone使用了自旋,根据每次自旋变量值的不同,走不同的分支,执行流程图如下:
2、移除链表中的WaitNode
// 试图将超时 或 已中断的等待节点 从链表中移除,同时将被删除节点的上一节点指向被删除节点的下一节点,避免gc无法回收 private void removeWaiter(WaitNode node) // 要移除的WaitNode不为null if (node != null) // 要移除的WaitNode中线程设置为null node.thread = null; retry: // 自旋 for (;;) // pred :前驱节点 // q:初始为头节点,代表当前正在遍历的节点 // s:后继节点 for (WaitNode pred = null, q = waiters, s; q != null; q = s) // 当前遍历的节点的后继节点赋值给 s s = q.next; // 1、当前遍历的节点线程不为null,将当前遍历节点赋值给 它的前驱节点pred,开始下一次自旋 if (q.thread != null) pred = q; // 当前遍历的节点不为头节点,并且当前遍历节点的线程为null,将当前遍历节点的前驱节点指向它的后继节点 else if (pred != null) pred.next = s; // 当前遍历线程节点的前驱节点线程也为null,说明它的前驱节点也要被删除,重新开始自旋 if (pred.thread == null) continue retry; // q.thread = null, pred == null ==> 头节点 // 当前遍历节点为头节点,cas操作,将队列头节点waiters的引用指向当前节点的后继节点, // 即当前节点的下一节点作为头节点 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; break;
移除等待队列中执行完成、被取消的WaitNode节点,若WaitNode是头节点,将队列头节点的引用指向当前头节点的后继节点;若不为队列头节点,将当前WaitNode的前驱节点指向它的后继节点,流程图如下 :
为了便于理解removeWaiters的流程,下面给出等待队列的大体变化过程:
3、返回执行结果
private V report(int s) throws ExecutionException // 获取线程状态 Object x = outcome; // 正常执行结束 if (s == NORMAL) // 返回执行结果 return (V)x; // 已取消 | (已中断) ,抛CancellationException异常 if (s >= CANCELLED) throw new CancellationException(); // 异常,抛ExecutionException异常 throw new ExecutionException((Throwable)x);
通过当前线程的状态,判断是抛出异常,还是返回线程执行结果。
5、cancel() - 获取线程执行结果
5.1、流程图
5.2、源码分析
public boolean cancel(boolean mayInterruptIfRunning) // 当前正执行的线程状态不为 NEW(新建) 状态 ,并且线程状态变更失败 ,返回false if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try // 允许中断正在运行线程 if (mayInterruptIfRunning) try // 获取当前正在执行的线程 Thread t = runner; // 执行interrupt()中断方法 if (t != null) t.interrupt(); finally // 修改线程状态为已中断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); finally // 唤醒因get()而挂起线程 finishCompletion(); // 返回取消中断线程结果 return true;
Java并发编程(二十):FutureTask源码分析
使用
FutureTask<String> futureTask = new FutureTask(() -> "success");
new Thread(futureTask).start();
futureTask.get();
源码分析
FutureTask提供了两个构造方法,分别是传入一个Callable,和传入一个Runnable加返回值result。如果传入的是Runnable加返回值,那么会通过适配器RunnableAdapter将其包装成一个Callable。
public FutureTask(Runnable runnable, V result)
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
通过调用Executors.callable方法包装Callable:
public static <T> Callable<T> callable(Runnable task, T result)
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
RunnableAdapter实现了Callable接口,其call方法的返回值就是result:
static final class RunnableAdapter<T> implements Callable<T>
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result)
this.task = task;
this.result = result;
public T call()
task.run();
return result;
在一个FutureTask创建成功后,其默认状态state为NEW。在FutureTask中为state定义了7个状态,并且state被volatile修饰以保证可见性,这7个状态分别是:
- NEW:新建状态(初始默认状态)
- COMPLETING:正在完成中,这个状态表示任务已经执行完成,但是还没有设置结果(正常结果或者异常)
- NORMAL:普通状态,状态变为COMPLETING并成功设置结果之后会变成这个状态
- EXCEPTIONAL:抛出了异常,任务执行抛出了异常则会变为这个状态,和NORMAL一样,在变为此状态之前也有个COMPLETING中间状态
- CANCELLED:被取消
- INTERRUPTING:正在被中断中,表示准备中断线程,但是还未中断
- INTERRUPTED:被中断
state可能的状态转换情况如下:
- NEW -> COMPLETING -> NORMAL :新建->完成中(任务已经完成,但是结果还未保存)->普通状态(结果保存完成)
- NEW -> COMPLETING -> EXCEPTIONAL :新建->完成中(任务抛出异常,但是异常还未保存)->异常状态(异常保存完成)
- NEW -> CANCELLED :新建->取消
- NEW -> INTERRUPTING -> INTERRUPTED:新建->中断中(准备中断,但还未中断)->完成中断(调用了interrupt方法)
FutureTask实现了Runnable接口,所以可以直接通过其run方法在当前线程执行任务,其run方法就是调用callable的call方法,call方法执行完毕之后,任务状态变为COMPLETING,并且call方法有一个返回值,正常执行得到返回值之后将其保存到outcome属性中,然后任务状态从COMPLETING变为NORMAL;如果任务执行抛出了异常, 那么任务状态也先变为COMPLETING,然后将异常对象设置到outcome属性,接着任务状态从COMPLETING变为EXCEPTIONAL。FutureTask的run方法如下:
public void run()
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
//检查任务状态和执行线程,如果状态不是NEW,或者通过CAS设置当前线程为任务执行线程失败,那么直接返回
return;
try
Callable<V> c = callable;
if (c != null && state == NEW)
//在此判断任务状态
V result;
boolean ran;
try
//执行任务
result = c.call();
//表示任务执行成功
ran = true;
catch (Throwable ex)
//抛出了异常,任务状态:NEW->COMPLETING->EXCEPTIONAL
result = null;
ran = false;
setException(ex);
if (ran)
//任务执行成功,任务状态:NEW->COMPLETING->NORMAL
set(result);
finally
//将runner设置为null,保证其它线程能够再次执行此任务
runner = null;
int s = state;
if (s >= INTERRUPTING)
//线程被中断
handlePossibleCancellationInterrupt(s);
注:将FutureTask交给Thread去执行和将Runnable交给Thread去执行是一样的,Thread.start方法就是负责调用本地方法启动一个线程去异步执行提交的任务,差别就是FutureTask可以调用get方法获取任务执行的结果,而结果保存在outcome属性中。
run()方法的大体逻辑相对来说比较简单,就是调用Callable的call方法获取返回值,然后将返回值设置到outcome中,此时任务的状态变化为:NEW->COMPLETING->NORMAL;如果call方法抛出了异常,那么将异常设置到outcome中,此时任务的状态变化为:NEW->COMPLETING->EXCEPTIONAL。
任务执行失败的处理逻辑在setException方法中:
protected void setException(Throwable t)
//先将任务修改为COMPLETING状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING))
//保存异常
outcome = t;
//将任务修改为EXCEPTIONAL状态
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
任务执行成功设置返回值是通过调用set方法实现的:
protected void set(V v)
//先将任务修改为COMPLETING状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING))
//保存返回值
outcome = v;
//将任务修改为NORMAL状态
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
在setException和set方法中都体现了FutureTask任务的状态转化,不过我们注意到两个方法中都调用了一个finishCompletion方法,这个finishCompletion的逻辑如下:
private void finishCompletion()
for (WaitNode q; (q = waiters) != null;)
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null))
for (;;)
//循环唤醒waiters队列中的所有阻塞线程
Thread t = q.thread;
if (t != null)
q.thread = null;
//唤醒阻塞的线程
LockSupport.unpark(t);
//唤醒了一个节点之后将其从链表中移除
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
break;
//空方法
done();
callable = null; // to reduce footprint
从状态的转换流程中可以看出,不论任务是正常完成还是抛出了异常,在FutureTask中都认为是任务完成。在finishCompletion方法中的主要逻辑是唤醒所有阻塞的线程,这些阻塞的线程是怎么来的呢?其实就是调用FutureTask的get方法生成的。FutureTask支持异步获取任务执行的结果,主要提供了两个方法get()和get(long timeout,TimeUnit unit),分别表示永久阻塞等待和超时阻塞等待,来看看get方法的实现:
public V get() throws InterruptedException, ExecutionException
int s = state;
if (s <= COMPLETING)
//阻塞线程,传入false表示不用超时阻塞
s = awaitDone(false, 0L);
//任务已经完成了,处于COMPLETING之后的状态
return report(s);
get(long timeout,TimeUnit unit)方法的实现:
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
//传入true表示需要超时阻塞
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
如果任务处于COMPLETING之后的状态,表示任务已经完成了(不论是正常完成还是抛出异常),否则进入超时阻塞等待,如果超时唤醒后发现任务还未完成,那么抛出TimeoutException异常。
我们注意到,永久阻塞和超时阻塞都是通过awaitDone方法实现的,该方法有两个入参,分别是:
- timed:布尔型参数,表示是否需要超时阻塞
- nanos:超时阻塞的时间
那么我们进入awaitDone方法看看是如何实现的:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException
//计算线程阻塞的终止时间:当前时间+最大阻塞时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;)
if (Thread.interrupted())
//如果线程被中断了,从阻塞队列中移除,抛出中断异常
removeWaiter(q);
throw new InterruptedException();
int s = state;
if (s > COMPLETING)
//如果任务已经结束,那么返回当前任务状态,后续会根据任务状态解析结果
if (q != null)
q.thread = null;
return s;
else if (s == COMPLETING) // cannot time out yet
//如果任务正在完成中,那么只需要等待任务完成即可
//这里使用yield释放cpu时间片,等待下一次cpu执行
Thread.yield();
else if (q == null)
//如果q为null,那么新建一个WaitNode,q是awaitDone方法的局部变量,初始为null
q = new WaitNode();
else if (!queued)
//如果q还没有入队,那么通过CAS将其入队(waiters相当于是单向链表的头结点)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed)
//如果需要超时等待,根据阻塞截止时间和当前时间计算需要阻塞的时间
//因为此方法的逻辑中,排队节点的创建、入队、和阻塞等都是是通过for循环一次一次的进行推进
//所以在阻塞之前要重新计算一下剩余需要阻塞的时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
//如果剩余阻塞时间小于等于0 ,说明阻塞时间已经过了,直接移除节点,然后返回任务状态
removeWaiter(q);
return state;
//超时阻塞
LockSupport.parkNanos(this, nanos);
else
//不带超时的阻塞
LockSupport.park(this);
一个线程要获取FutureTask任务的执行结果,如果任务还未完成,那么线程就需要被阻塞。首先线程会被封装成一个WaitNode对象,WaitNode是FutureTask的一个静态内部类,其结构很简单,就是包含一个线程对象(thread)和一个WaitNode对象(next):
static final class WaitNode
volatile Thread thread;
volatile WaitNode next;
WaitNode() thread = Thread.currentThread();
FutureTask通过这个WaitNode构建了一个单向链表(waiters属性),所有通过获取FutureTask任务结果而阻塞的线程都排在这个单向链表中,每个WaitNode是通过CAS入队的,这个和AQS中的同步队列和条件队列基本是一样的逻辑。
awaitDone方法有个特点,WaitNode的创建、入队、阻塞,包括线程被唤醒之后各种处理的都是通过for循环自旋推进的。比如:
- 第一次循环判断局部变量q为null,那么创建一个WaitNode
- 第二次循环判断!queue为true,说明上一次创建的WaitNode还没有入队,那么WaitNode通过CAS入队,如果CAS失败,说明有多个线程在并发入队,留待下一次for循环再进行入队
- 第三次循环判断是否需要超时阻塞,如果有超时,会重新计算一次剩余阻塞时间, 通过parkNanos方法进行阻塞;如果超时时间已经过了,则直接返回任务状态。如果不使用超时,那么直接通过park方法阻塞线程
由于通过for循环推进也需要消耗时间,所以在awaitDone方法开始的时候会先保存线程阻塞的终止时间,等待线程真正开始parkNanos阻塞的时候,会再计算一下剩余需要阻塞的时间,如果阻塞时间已经过了,那么直接返回任务状态。
可以看到,awaitDone方法的返回值是任务的状态state,最终还会调用report方法根据任务的状态返回任务的最终结果。而awaitDone方法返回有几种情况:
- 线程被中断:线程醒来后会在下次循环中判断发现线程被中断,进而抛出InterruptedException
- 线程被unpark唤醒,然后发现任务已经完成:返回任务当时的状态(>COMPLETING)
- 线程超时唤醒:线程等待超时时间到了也没有等到任务完成,方法返回后会在外层get方法里判断如果任务状态小于等于COMPLETING,表示是超时唤醒,那么抛出TimeoutException
在分析了awaitDone方法之后,我们就能明白在run方法中任务完成之后,在set方法保存结果之后调用finishCompletion方法的作用了,就是循环唤醒waiters链表中的所有线程,线程被唤醒后发现任务状态大于COMPLETING,那么awaitDone方法得以返回。 awaitDone方法返回的是任务的状态,还需要在report方法中决定get方法最终的返回值:
private V report(int s) throws ExecutionException
Object x = outcome;
if (s == NORMAL)
//任务正常完成,返回结果,其它都是异常结束
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
最后我们还需要看一下cancel方法的逻辑:
public boolean cancel(boolean mayInterruptIfRunning)
//mayInterruptIfRunning表示是否中断任务线程
//如果需要中断线程,那么需要把任务状态先调整为一个中间状态:INTERRUPTING
//否则直接将状态调整为CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
//cas失败了,说明同时有其它线程在操作
return false;
try // in case call to interrupt throws exception
if (mayInterruptIfRunning)
try
Thread t = runner;
if (t != null)
t.interrupt();
finally // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
finally
//触发任务完成的逻辑,唤醒waiters链表中的所有阻塞线程
finishCompletion();
return true;
之后NEW状态下的任务才能被cancel,该方法同时提供了一个布尔参数mayInterruptIfRunning,表示是否需要中断执行任务的线程,如果需要中断线程,那么任务状态会先调整为一个中间状态:INTERRUPTING,之后调用线程的interrupt方法后再将状态调整为INTERRUPTED,这个中间状态和COMPLETING类似;如果不需要中断线程,那么直接将任务修改为CANCELLED状态,如果CAS修改失败,那么直接返回,不再执行后面的逻辑,因为这代表有多个线程在并发cancel,只需要一个线程处理这个逻辑。
如果CAS修改状态成功,那么可以执行后面cancel
从cancel的逻辑中可以发现,通过FutureTask提供的cancel方法,如果传入的mayInterruptIfRunning为false,那么对于任务线程本身并不会做什么操作,而只是将状态修改为CANCELLED状态,然后调用finishCompletion方法唤醒阻塞线程;即使是入参为true,那么也是中断线程,如果任务没有响应中断,那么任务也不会退出,即使任务已经是CANCELED状态。
总结
FutureTask提供了两种构建方式,分别是传入Callable和传入Runnable,如果传入的是Runnable,也会通过适配器将其包装为一个Callable,FutureTask间接实现了Runnable接口,将其提交给Thread之后执行的就是其run方法,在FutureTask的run方法中调用的是Callable的call方法并且获取结果,对于同一个FutureTask,同时只能有一个线程执行该任务,这点体现在Thread类型的runner属性上,在run方法执行之前,会通过CAS将runner以null为期望值修改为当前线程,修改成功才会进入后续的逻辑。
FutureTask中对于任务规定了7种状态,任务初始默认为NEW状态,大体来说FutureTask中的任务可能会:正常执行结束、任务抛出异常结束、被取消、被中断,这些行为也反映在了定义的7种状态中。
任务在FutureTask中,正常完成或者抛出异常都算是任务完成,需要保存任务结果,如果任务正常完成,那么结果就是Callable的call方法的返回值;如果抛出异常,那么结果就是异常对象。当任务完成之后需要唤醒所有由于调用get方法获取结果而阻塞的线程,这些线程保存在一个通过WaitNode构建的单向链表中,唤醒的时候从链表头节点(waiters)开始依次unpark即可。
通过get方法获取任务执行结果而阻塞的线程,会在awaitDone方法中完成WaitNode的创建、入队和阻塞等操作,阻塞分为超时阻塞和永久阻塞,如果是超时阻塞,那么线程被唤醒的方式有中断线程、达到超时时间、unpark;如果是永久阻塞,那么线程被唤醒的方式有中断线程和unpark。当线程被唤醒后,会返回当前任务的状态,或者抛出中断异常,返回任务状态state之后,会通过report方法决定get方法最终返回结果,如果任务是正常结束的,那么返回结果值,否则会抛出相应的异常。
最后还需要注意,FutureTask的cancel方法并不能保证任务线程立即退出,不过无论如何都会唤醒阻塞线程。当然这点在线程池中也一样,即使调用了线程池的shutdownNow方法,也不能保证工作线程能够立即退出,这个要取决于任务如何响应中断请求,如果要强制结束一个线程,那么可以调用Thread类的stop方法,虽然该方法不建议调用,但是这个方法在处理一些顽固"僵死"线程时很有用。
以上是关于FutureTask源码分析的主要内容,如果未能解决你的问题,请参考以下文章