线程池源码分析_01 FutureTask源码分析
Posted 兴趣使然の草帽路飞
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池源码分析_01 FutureTask源码分析相关的知识,希望对你有一定的参考价值。
1、FutureTask简介
Future是我们在使用java实现异步时最常用到的一个类,我们可以向线程池提交一个Callable,并通过future对象获取执行结果。
- FutureTask的使用场景:FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其
run
方法或者放入线程池执行,之后可以在外部通过FutureTask的get
方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run
方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel
取消FutureTask的执行等。 - 关于FutureTask的基本使用可以参考文章:FutureTask的用法
2、源码分析
成员属性
// 表示当前task的状态
private volatile int state;
// 表示当前任务尚未执行
private static final int NEW = 0;
// 表示当前任务正在结束,尚未完全结束,一种临界状态
private static final int COMPLETING = 1;
// 表示当前任务正常结束
private static final int NORMAL = 2;
// 表示当前任务执行过程中发生了异常。 内部封装的 callable.run() 向上抛出异常了
private static final int EXCEPTIONAL = 3;
// 表示当前任务被取消
private static final int CANCELLED = 4;
// 表示当前任务中断中..
private static final int INTERRUPTING = 5;
// 表示当前任务已中断
private static final int INTERRUPTED = 6;
//submit(runnable/callable):runnable 使用装饰者设计模式伪装成 Callable了。
private Callable<V> callable;
// 正常情况下:任务正常执行结束,outcome保存执行结果,作为callable返回值。
// 非正常情况:callable向上抛出异常,outcome保存异常信息
private Object outcome;
// 表示当前任务被线程执行期间,保存当前执行任务的线程对象引用。
private volatile Thread runner;
// 因为会有很多线程去get当前任务的结果,所以这里使用了一种数据结构 stack **头插、头取**的一个队列。
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
构造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
// callable就是程序员自己实现的业务类
this.callable = callable;
// 设置当前任务状态为NEW: 表示当前任务尚未执行
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
// 使用装饰者模式将runnable转换为了callable接口,外部线程通过get获取
// 当前任务执行结果时,结果可能为 null 也可能为传进来的值。
this.callable = Executors.callable(runnable, result);
// 设置当前任务状态为NEW: 表示当前任务尚未执行
this.state = NEW;
}
成员方法
1. run方法
// submit(runnable/callable) -> newTaskFor(runnable) -> execute(task) -> pool
// 线程执行入口的方法
public void run() {
// 条件一:state != NEW 条件成立,说明当前task已经被执行过了或者被cancel了,
// 总之非NEW状态的任务,线程就不处理了,直接return;结束!
// 条件二:!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread())
// 条件成立:cas失败,当前任务被其它线程抢占了...
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
// 如果执行到这里,说明当前task一定是 NEW 状态,而且当前线程也抢占TASK成功!
try {
// callable 就是程序员自己封装逻辑的callable 或者装饰后的runnable
Callable<V> c = callable;
// 条件一:c != null 防止空指针异常
// 条件二:state == NEW 防止外部线程 cancel掉当前任务。
if (c != null && state == NEW) {
// 结果的引用
V result;
// true 表示callable.run 代码块执行成功 未抛出异常
// false 表示callable.run 代码块执行失败 抛出异常
boolean ran;
try {
// 调用程序员自己实现的callable 或者 装饰后的runnable
result = c.call();
// c.call未抛出任何异常,ran会设置为true 表示任务执行成功
ran = true;
} catch (Throwable ex) {
// 说明程序员自己写的逻辑块有bug了。
result = null;
// ran设置为false
ran = false;
// 将当前异常信息ex赋值给outcome
setException(ex);
}
// run:
// true -> 代码块执行成功 未抛出异常
// false -> 代码块执行失败 抛出异常
if (ran)
// 说明当前c.call正常执行结束了。
// set方法: 以CAS的方式将result结果设置给outcome
set(result);
}
} finally {
// 将当前执行任务的线程置为null
runner = null;
// 当前任务的状态
int s = state;
// s >= INTERRUPTING:如果条件成立,说明当前任务处于中断中或者已中断状态...
if (s >= INTERRUPTING)
// 后面分析cancel()方法的时候再回头看这里就明白了!
handlePossibleCancellationInterrupt(s);
}
}
/**
* 以CAS的方式设置结果v给outcome
*/
protected void set(V v) {
// 使用CAS方式设置当前任务状态为完成中...
// 有没有可能失败呢? 外部线程等不及了,直接在set执行CAS之前将task取消了。(很小概率事件)
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 将v赋值给outcome
outcome = v;
// 将结果v赋值给outcome之后,马上会将当前任务状态修改为NORMAL(正常结束状态)。
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 当线程被挂起之后,如果任务线程执行完毕,就会唤醒等待线程。
// 这一步就是在finishCompletion里面做的, 后面分析get() 的时候再说~
// 移除并唤醒所有等待线程,执行done,置空callable
finishCompletion();
}
}
/**
* 将当前异常信息t赋值给outcome
*/
protected void setException(Throwable t) {
// 使用CAS方式设置当前任务状态为 完成中..
// 有没有可能失败呢? 外部线程等不及了,直接在set执行CAS之前 将 task取消了。 很小概率事件。
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 引用的是 callable 向上层抛出来的异常。
outcome = t;
// 将当前任务的状态 修改为 EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 当线程被挂起之后,如果任务线程执行完毕,就会唤醒等待线程。
// 这一步就是在finishCompletion里面做的, 后面分析get() 的时候再说~
// 移除并唤醒所有等待线程,执行done,置空callable
finishCompletion();
}
}
2. get方法
// get获取当前任务执行结束后得到的结果:
// 注意:get不一定是只有一个线程去得到任务执行后的结果,多个线程等待获取当前任务执行完成后的结果这种场景也是有的!
public V get() throws InterruptedException, ExecutionException {
// 获取当前任务状态
int s = state;
// s <= COMPLETING:
// 条件成立:(当前任务可能正处于:未执行0、正在执行1、正在完成1 状态,总之是还没有结束)。
// 那么,此时调用get的外部线程会被阻塞在get方法上。
if (s <= COMPLETING)
// awaitDone执行完后会返回task当前状态,如果该方法执行期间,task被中断了,则会直接抛出中断异常:
// awaitDone是futureTask实现阻塞的关键方法: 等待任务执行完毕,如果任务取消或者超时则停止!
s = awaitDone(false, 0L);
// report(s)去获取最终task执行结束得到的结果
return report(s);
}
// awaitDone方法: awaitDone是futureTask实现阻塞的关键方法
/**
1. 等待任务执行完毕,如果任务取消或者超时则停止
2. @param timed 为true表示设置超时时间
3. @param nanos 超时时间
4. @return 任务完成时的状态
5. @throws InterruptedException 中断异常
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 0 不带超时
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 引用当前线程封装成 WaitNode对象(**头插、头取**的一个队列。)
WaitNode q = null;
// 表示当前线程 waitNode对象是否入队/压栈
boolean queued = false;
// 自旋
for (;;) {
// 条件成立:说明当前线程唤醒是被其它线程使用中断这种方式喊醒的。interrupted()
// 返回true 后会将 Thread的中断标记重置回false.
if (Thread.interrupted()) {
// 当前线程节点出队
removeWaiter(q);
// 上抛,使get方法抛出中断异常。
throw new InterruptedException();
}
// 假设当前线程是被其它线程使用unpark(thread) 唤醒的话,会正常自旋,走下面逻辑:
// 获取当前任务最新状态
int s = state;
// 条件成立:说明当前任务已经有结果了.. (可能是完成、异常、中断等等)
if (s > COMPLETING) {
// 条件成立:说明已经为当前线程创建过WaitNode了,此时需要将 node.thread = null helpGC
if (q != null)
q.thread = null;
// 直接返回当前状态.
return s;
}
// 条件成立:说明当前任务接近完成状态、或者接近失败状态...
// 这里让当前线程再释放cpu ,进行下一次抢占cpu:
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 条件成立:第一次自旋,当前线程还未创建 WaitNode 对象,此时为当前线程创建 WaitNode对象
else if (q == null)
q = new WaitNode();
// 条件成立:第二次自旋,当前线程已经创建 WaitNode对象了,但是node对象还未入队
else if (!queued){
// 当前线程node节点 next 指向原队列的头节点 waiters 一直指向队列的头!
q.next = waiters;
// cas方式设置waiters引用指向当前线程node, 成功的话 queued == true 否则,可能其它线程先你一步入队了。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, waiters, q);
}
// 第三次自旋,会到这里:表示是否设置了超时时间
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// 已经超时的话,移除等待节点
removeWaiter(q);
return state;
}
// 未超时,将当前线程挂起指定时间
LockSupport.parkNanos(this, nanos);
}
else
// 当前get操作的线程就会被park了。线程状态会变为 WAITING状态,相当于休眠了..
// 除非有其它线程将你唤醒 或者 将当前线程中断。
// 如果当前线程被其他线程唤醒,醒来时,还是从这里向下继续执行(继续进入自旋for进行条件判断)
// (在finishCompletion中会唤醒这个挂起的线程!)
LockSupport.park(this);
}
}
// report(s)去获取最终task执行结束得到的结果
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
// 正常情况下,outcome 保存的是callable运行结束的结果
// 非正常情况下,保存的是 callable 抛出的异常。
Object x = outcome;
// 条件成立(正常情况):当前任务状态正常结束
if (s == NORMAL)
// 直接返回callable运算结果
return (V)x;
// 条件成立(非正常情况):当前任务是被取消或中断状态
if (s >= CANCELLED)
// 抛异常!
throw new CancellationException();
// 执行到这,说明callable接口实现中,是有bug的...
throw new ExecutionException((Throwable)x);
}
// node出队方法:
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
// 当线程被挂起之后,如果任务线程执行完毕,就会唤醒等待线程。这一步就是在finishCompletion里面做的
/**
* 移除并唤醒所有等待线程,执行done,置空callable
*/
private void finishCompletion() {
// 遍历等待节点:
// q指向waiters链表的头结点
for (WaitNode q; (q = waiters) != null;) {
// 使用cas设置 waiters为null
// 为了防止外部线程使用cancel取消当前任务,也会触发finishCompletion方法。(小概率事件)
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
// 获取当前node节点封装的 thread
Thread t = q.thread;
// 条件成立:说明当前线程不为null
if (t != null) {
q.thread = null;// help GC
// 唤醒当前节点对应的线程(在awaitDone方法最后一个else判断中park,在此处唤醒)
LockSupport.unpark(t);
}
// next: 当前节点的下一个节点
WaitNode next = q.next;
// next == null 说明是最后一个节点,则直接break即可
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 模板方法,可以被覆盖
done();
// 将callable 设置为null helpGC
callable = null;
}
3. cancel方法
// 将当前线程的任务取消(中断)掉:
public boolean cancel(boolean mayInterruptIfRunning) {
// 条件一:state == NEW 成立,表示当前任务处于运行中或者处于线程池任务队列中..
// 条件二:UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))
// 条件成立:说明CAS修改状态成功,可以去执行下面逻辑了,否则返回false,表示cancel失败。
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
// 执行当前FutureTask 的线程,有可能现在是null,是null 的情况是: 当前任务在 队列中,还没有线程获取到它呢。。
Thread t = runner;
// 条件成立:说明当前线程 runner ,正在执行task.
if (t != null)
// 给runner线程一个中断信号.. 如果你的程序是响应中断 会走中断逻辑..假设你程序不是响应中断的..啥也不会发生。
t.interrupt();
} finally { // final state
// 设置任务状态为 中断完成。
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒所有get()阻塞的线程。
finishCompletion();
}
return true;
}
- 如果文章有帮助,点赞支持一下哈~
以上是关于线程池源码分析_01 FutureTask源码分析的主要内容,如果未能解决你的问题,请参考以下文章
线程之CallableFuture 和FutureTask使用及源码分析