JDK 源码解析:深入浅出异步任务 FutureTask
Posted SegmentFault
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK 源码解析:深入浅出异步任务 FutureTask相关的知识,希望对你有一定的参考价值。
作者:Sumkor
来源:SegmentFault 思否社区
前言
在 Java 中,Runnable 接口表示一个没有返回结果的任务,而 Callable 接口表示具有返回结果的任务。
在并发编程中,异步执行任务,再获取任务结果,可以提高系统的吞吐量。Future 接口应运而生,它表示异步任务的执行结果,并提供了检查任务是否执行完、取消任务、获取任务执行结果等功能。FutureTask 是 Future 接口的基本实现,常与线程池实现类 ThreadPoolExecutor 配合使用。
| 本文基于 jdk1.8.0_91
1. 继承体系
RunnableFuture 接口同时实现了 Runnable 接口和 Future 接口,是一种冗余设计。
java.util.concurrent.RunnableFuture
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
*
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
FutureTask 是一个可取消的异步任务,是对 Future 接口的基本实现,具有以下功能:
启动或中断的任务的执行;
判断任务是否执行完成;
获取任务执行完成后的结果。
同时,FutureTask 可以用于包装 Callable 或 Runnable 对象。
由于它实现了 Runnable 接口,可以提交给 Executor 执行。
/**
* A cancellable asynchronous computation.
*
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this FutureTask's {@code get} methods
*/
public class FutureTask<V> implements RunnableFuture<V>
java.util.concurrent.Executor
/**
* An object that executes submitted {@link Runnable} tasks.
*
* @since 1.5
* @author Doug Lea
*/
public interface Executor {
void execute(Runnable command);
}
2. 属性
java.util.concurrent.FutureTask
// The run state of this task, initially NEW.
// 任务的执行状态,初始为 NEW。
private volatile int state;
/** The underlying callable; nulled out after running */
// 需要执行的任务,任务执行完后为空
private Callable<V> callable;
/** The result to return or exception to throw from get() */
// 任务的执行结果,或者任务抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
// 执行任务的线程
private volatile Thread runner;
/** Treiber stack of waiting threads */
// 指向栈顶的指针,栈结构用于存储等待任务执行结果的线程
private volatile WaitNode waiters;
其中 state、runner、waiters 三个属性在并发时存在争用,采用 CAS 维护其准确性。
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
2.1 状态定义
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
FutureTask 中使用 state 代表任务在运行过程中的状态。随着任务的执行,状态将不断地进行转变。
状态的说明:
NEW:新建状态,任务都从该状态开始。
COMPLETING:任务正在执行中。
NORMAL:任务正常执行完成。
EXCEPTIONAL:任务执行过程中抛出了异常。
CANCELLED:任务被取消(不响应中断)。
INTERRUPTING:任务正在被中断。
INTERRUPTED:任务已经中断。
状态转移过程:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
状态的分类:
任务的初始状态:NEW
任务的中间状态:COMPLETING、INTERRUPTING
任务的终止状态:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED
2.1 状态使用
FutureTask 中判断任务是否已取消、是否已完成,是根据 state 来判断的。
public boolean isCancelled() {
return state >= CANCELLED; // CANCELLED、INTERRUPTING、INTERRUPTED
}
public boolean isDone() {
return state != NEW;
}
可以看到:
被取消或被中断的任务(CANCELLED、INTERRUPTING、INTERRUPTED),都视为已取消。
当任务离开了初始状态 NEW,就视为任务已结束。任务的中间态很短暂,并不代表任务正在执行,而是任务已经执行完了,正在设置最终的返回结果。
根据状态值,FutureTask 可以保证已经完成的任务不会被再次运行或者被取消。
中间状态虽然是一个瞬时状态,在 FutureTask 中用于线程间的通讯。
例如:
在 FutureTask#run 中检测到状态 >= INTERRUPTING,说明其他线程发起了取消操作,当前线程需等待对方完成中断。
在 FutureTask#get 中检测到状态 <= COMPLETING,说明执行任务的线程尚未处理完,当前线程需等待对方完成任务。
2.2 栈(Treiber stack)
/** Treiber stack of waiting threads */
private volatile WaitNode waiters; // 栈顶指针
/**
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode {
volatile Thread thread; // 等待任务执行结果的线程
volatile WaitNode next; // 栈的下一个节点
WaitNode() { thread = Thread.currentThread(); }
}
FutureTask 使用链表来构造栈(Treiber stack,使用 CAS 保证栈操作的线程安全,参考 java.util.concurrent.SynchronousQueue.TransferStack)。
其中 waiters 是链表的头节点,代表栈顶的指针。
栈的作用:
FutureTask 实现了 Future 接口,如果获取结果时,任务还没有执行完毕,那么获取结果的线程就在栈中挂起,直到任务执行完毕被唤醒。
3. 构造函数
赋值任务,设置任务的初始状态。
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
值得注意的两个地方:
FutureTask 创建的时候,状态为 NEW。
由于 FutureTask 使用 Callable 表示任务,需用 Executors#callable 方法将 Runnable 转换为 Callable。
测试:
public void executors() throws Exception {
Callable<String> callable = Executors.callable(new Runnable() {
public void run() {
System.out.println("run!");
}
}, "haha");
String call = callable.call();
System.out.println("call = " + call);
}
执行结果:
run!
call = haha
更多原文内容提前看:
Runnable 实现
FutureTask#run
FutureTask#set
FutureTask#setException
FutureTask#finishCompletion
FutureTask#handlePossibleCancellationInterrupt
FutureTask#runAndReset
Future 实现
Future#get
FutureTask#awaitDone
FutureTask#report
Future#get(timeout, unit)
Future#cancel
实例
总结
以上是关于JDK 源码解析:深入浅出异步任务 FutureTask的主要内容,如果未能解决你的问题,请参考以下文章
Android 常用开源框架源码解析 系列 Rxjava 异步框架