FutureTask源码分析

Posted 无虑的小猪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FutureTask源码分析相关的知识,希望对你有一定的参考价值。

1、Callable 与 FutureTask介绍

1.1、Callable

  创建线程有两种方式,一种是继承Thread类,一种是实现Runnable接口重写run方法。其实Thread也实现了Runable接口。

  在Runable接口中,仅有一个无参无返回结果的run方法。Runable接口详情:

 @FunctionalInterface
 public interface Runnable 
     public abstract void run();
 

  Callable接口的功能与Runable接口类似,唯一不同的是Callable接口可以返回线程执行的结果并抛出异常,在Runable接口中,仅有一个无参有结果的call方法,可抛出异常。Callable接口详情:

 @FunctionalInterface
 public interface Callable<V> 
     V call() throws Exception;
 

1.2、FutureTask

  Thread构造函数详情: 

  

  在Thread中,没有Callable类型的入参构造函数,只有Runable类型。需要一个实现了Runable接口的对象封装Callable,这个对象是FutureTask。有关FutureTask的详细介绍,在后文会做详细介绍。

2、Callable与FutureTask 的使用

  示例代码如下:

 import java.util.concurrent.Callable;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 public class TestCallable 
 
     public static void main(String[] args) throws Exception 
         // 创建FutureTask
         FutureTask task = new FutureTask<>(new Callable<String>() 
             @Override
             public String call() throws Exception 
                 // sleep  5 秒
                 TimeUnit.SECONDS.sleep(5);
                 // 返回线程执行结果
                 return Thread.currentThread().getName() + " == CumCallable == ";
             
         );
         // 启动线程
         new Thread(task).start();
         // 阻塞等待
         Object o = task.get();
         // 主线程执行
         System.out.println(Thread.currentThread().getName() +  " == main ==");
         System.out.println(o);
     
 

3、Callable与FutureTask源码分析

  FutureTask类图关系如下;

  

  FutureTask实现了Runable、Future接口。

1、Future

  Runable接口这里不做介绍了,主要看Future接口详情如下:

 // 操作线程任务
 public interface Future<V> 
 
     /**
      * 试图取消当前执行的任务
      *     如果任务已经执行完成、或已经被取消、或不能被取消,返回false
      *     如果任务还未启动,就已经被取消,该任务永远不会被运行         
      *     如果任务已经启动,mayInterruptIfRunning参数决定正在执行的线程是否被中断停止该任务
      */
     boolean cancel(boolean mayInterruptIfRunning);
 
     /**
      * 在任务正常执行完成前,判断其是否被取消
      */
     boolean isCancelled();
 
     /**
      * 判断任务是否执行完成
      */
     boolean isDone();
 
     /**
      * 一直阻塞等待,直到获取执行结果 
      */
     V get() throws InterruptedException, ExecutionException;
 
     /**
      * 阻塞等待timeout时间,获取执行结果,阻塞时间timeout已到达,抛出异常
      */
     V get(long timeout, TimeUnit unit)
         throws InterruptedException, ExecutionException, TimeoutException;
 

  Future接口提供了操作线程的方法,如取消当前执行的任务、获取执行的结果、判断是否执行完成等。Future对线程的任务做了增强处理。

2、RunnableFuture

  RunnableFuture继承Runable、Future接口,并重写了run方法。FutureTask实现此方法,并在run方法中调用了call方法,获取执行结果。
 public interface RunnableFuture<V> extends Runnable, Future<V> 
     // 线程执行未取消,通过此方法设置线程执行结果
     void run();
 

3、FutureTask

  FutureTask是基于 CAS + state + WaitNode节点链表 实现的。CAS保证多线程场景下的原子性;state线程状态,控制代码的返回及执行流程;WaitNode节点链表记录挂起的线程。

1、属性

1.1、执行任务线程状态

  

1.2、线程执行结果、等待队列头 

  callable:构造函数中传入的Callable对象,用于执行call()方法;

  outcome:正常执行完,outcome记录执行结果;执行出现异常,outcome记录异常;

  runner:当前正在执行的线程;

  waiters:等待队列的头节点,单向链表的头。

  

1.3、WaitNode

  waitNode是一个节点,存储在单向链表中,用于记录等待执行的线程,便于当前线程执行完后,唤醒等待的线程。

2、构造函数

  FutureTask的构造函数入参支持Callable,也支持Runable。构造函数完成对callable、state属性的初始化操作。详情如下:  

  

  入参为Runable的构造函数是如何转换成Callable的呢?Executors#callable() 详情如下:

 // Runable 转为 Callable,并返回入参 result
 public static <T> Callable<T> callable(Runnable task, T result) 
     // 非空判断
     if (task == null)
         throw new NullPointerException();
     // 使用适配器,将Runable转换为Callable
     return new RunnableAdapter<T>(task, result);
 

  RunnableAdapter采用了适配器模式,将Runable转换为Callable。我们来看看是怎么转换的。

  RunableAdapter 与 Runable 组合。RunableAdapter实现了Callable接口,重写了call方法,在该方法中实际执行的是Runable的run方法,并返回传入的result结果。

  

3、run() - 入口方法

1、流程图

 

2、run()源码分析

  FutureTask实现了Runable接口,从写了run()方法,run()作为线程执行的入口方法,先来看看具体做了哪些操作。

  FutureTask#run() 详情如下:

 public void run() 
     // 当前线程不为新建状态  或者 当前线程cas获取锁资源失败,返回
     if (state != NEW ||
         !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                      null, Thread.currentThread()))
         return;
     try 
         // callable成员变量赋给局部变量
         Callable<V> c = callable;
         // callable不为空,当前线程状态为 新建
         if (c != null && state == NEW) 
             // 执行结果
             V result;
             // 执行结束标识
             boolean ran;
             try 
                 // 执行实现的call方法,并获取返回结果
                 result = c.call();
                 // 执行结束标识设置为true -> 正常执行
                 ran = true;
              catch (Throwable ex) 
                 // 执行异常,返回null
                 result = null;
                 // 执行结束标识设置为true -> 执行出现异常
                 ran = false;
                 // 设置异常信息
                 setException(ex);
             
             // 正常执行结束,存储执行结果
             if (ran)
                 set(result);
         
      finally 
         // 线程释放锁资源
         runner = null;
         // 中断线程的处理
         int s = state;
         if (s >= INTERRUPTING)
             handlePossibleCancellationInterrupt(s);
     
 

  在run()方法中,实际执行的是callable的call()方法,并用outcome成员变量存储线程执行的结果,若出现异常,outcome则存储异常信息。

1、正常执行结束

  线程正常执行完,将线程的执行结果设置到FutureTask的outCome属性中,FutureTask#set()方法。

 // 线程执行结果设置到outcome属性中
 protected void set(V v) 
     // 修改当前线程状态   NEW(新建) -> COMPLETING(运行中)
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) 
         // 线程执行结果赋值到成员变量outcome
         outcome = v;
         // 修改线程状态  COMPLETING(运行中) -> NORMAL(正常执行结束)
         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); 
         // 唤醒因get()操作挂起的线程
         finishCompletion();
     
 

  线程执行执行结束,线程状态经过了 NEW -> COMPLETING -> NORMAL 的变化,将结果赋值给outcome,唤醒因get()操作挂起的线程。

2、异常的处理

  线程执行过程中异常的处理,FutureTask#setException() 详情如下:

 // 设置异常信息
 protected void setException(Throwable t) 
     // cas操作,修改当前执行的线程状态  NEW(新建) -> COMPLETING(运行中)
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) 
         // 线程执行结果设置为异常
         outcome = t;
         // 修改当前执行的线程状态  COMPLETING(运行中) -> EXCEPTIONAL(异常)
         UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); 
         // 唤醒因get()操作挂起的线程
         finishCompletion();
     
 

  线程执行执行结束,线程状态经过了 NEW -> COMPLETING -> EXCEPTIONAL 的变化,将异常赋值给outcome,唤醒因get()操作挂起线程。

3、唤醒等待队列中的挂起线程

  FutureTask#finishCompletion() 详情如下:

 // 移除并唤醒所有等待执行的线程
 private void finishCompletion() 
     // q执行waiters链表的头节点
     for (WaitNode q; (q = waiters) != null;) 
         // cas操作,将waiters设置为null,担心外部线程使用 cancel 取消当前任务触发  finishCompletion()
         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) 
              // 自旋
              for (;;) 
                 // 获取当前Node封装的 thread
                 Thread t = q.thread;
                 // 当前线程不为  null
                 if (t != null) 
                     // 将当前等待节点的线程设置为null
                     q.thread = null;
                     // 唤醒当前节点因 get 操作阻塞的线程
                     LockSupport.unpark(t);
                 
                 // 获取下一个WaitNode
                 WaitNode next = q.next;
                 // 等待节点链表中没有待唤醒的线程,结束循环
                 if (next == null)
                     break;
                     
                 // 将下一个待唤醒的等待节点从  WaitNode链表中移除,便于GC
                 q.next = null;
                 q = next;
             
             // 等待节点链表中没有待唤醒的线程,结束循环
             break;
         
     
     
     // JDK提供修改当前线程执行状态的拓展方法,默认不实现
     done();
     
     // 执行完毕,callable
     callable = null;
 
 
 // JDK提供修改当前线程执行状态的拓展方法,默认不实现
 protected void done()  

4、处于中断过程中的线程处理

  FutureTask实现了Future接口,Future提供了线程取消的方法cancel。若当前线程被取消,FutureTask执行cancel方法过程中,线程状态有一段时间是 INTERRUPTING (中断处理中) 的状态,在线程执行结束之前,FutureTask在 handlePossibleCancellationInterrupt 方法中 INTERRUPTING (中断处理中)状态 的线程做了特殊处理。

  FutureTask#handlePossibleCancellationInterrupt() 详情如下:

 private void handlePossibleCancellationInterrupt(int s) 
     // 线程状态 INTERRUPTING (中断处理中) 
     if (s == INTERRUPTING)
         while (state == INTERRUPTING)
             // 让出CPU,等待线程状态变为  INTERRUPTED  (已中断)
             Thread.yield();
 

4、get() - 获取线程执行结果

1、流程图

2、get()源码分析

  获取线程执行结果,FutureTask#get() 详情如下:

 public V get() throws InterruptedException, ExecutionException 
     // 当前线程执行状态
     int s = state;
     // 当前线程处于  NEW(新建)、COMPLETING (运行中) ,阻塞挂起外部的get()获取结果的线程
     if (s <= COMPLETING)
         s = awaitDone(false, 0L);
     // 线程被唤醒后,返回执行结果
     return report(s);
 

  当前线程状态为 NEW或COMPLETING ,挂起线程;等待线程正常执行结束、执行异常、线程取消调用finishCompletion()方法唤醒这些挂起的线程,再通过report方法,返回执行结果。

1、挂起新建、运行中的线程

  挂起线程 FutureTask#awaitDone() 详情如下:

 private int awaitDone(boolean timed, long nanos) throws InterruptedException 
     // 是否带超时的阻塞,0不带超时
     final long deadline = timed ? System.nanoTime() + nanos : 0L;
     // 引用当前线程 封装成waitNode对象
     WaitNode q = null;
     // 当前线程 waitNode对象是否  已经添加进等待链表中,默认未添加
     boolean queued = false;
     // 自旋
     for (;;) 
         // 4.1、线程唤醒,说明线程是使用中断的方式唤醒,若interrupted() 返回为true 之后会将中断标记重置为false
         if (Thread.interrupted()) 
             // 当前线程node出队
             removeWaiter(q);
             // get方法抛出  中断异常
             throw new InterruptedException();
         
         
         // 4.2、当前线程被其他线程 使用unpark()方式
         
         // 获取当前线程执行状态
         int s = state;
         // 当前线程执行完成,将当前任务线程设置为null,并返回任务状态
         // 状态是终态: NORMAL 、 EXCEPTIONAL 、 CANCELLED
         if (s > COMPLETING) 
             // 当前线程已创建WaitNode对象,将其中的thread设置为null,helpGC
             if (q != null)
                 q.thread = null;
             return s;
         
         
         // 当前线程处于  运行中,让出CPU,进行下一次的抢占
         else if (s == COMPLETING) 
             Thread.yield();
             
         // 1、第一次自旋,是新线程,当前等待节点为null,为当前线程创建WaitNode对象
         else if (q == null)
             // 创建等待节点
             q = new WaitNode();
             
         // 2、第二次自旋,线程已创建WaitNode 对象, WaitNode未入队
         else if (!queued)
             // 当前线程node节点指向 原队列的头节点  waiters:一直指向队列的头
             // cas设置waiters引用指向当前线程Node节点,若成功,queued为true;若失败,可能其他线程先一步入队了
             queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                  q.next = waiters, q);
         
         // 3、第三次自旋,挂起线程操作        
         // 如果设置了超时时间,get操作的线程会被parkNanos,并过了超时时间的话,从 waiters 链表中删除当前 wait
         
         else if (timed) 
             // 获取剩余阻塞时间
             nanos = deadline - System.nanoTime();
             // 剩余阻塞时间不足,将WaitNode从等待链表中移除,返回响应状态
             if (nanos <= 0L) 
                 removeWaiter(q);
                 return state;
             
             // 没有过超时时间,线程进入 TIMED_WAITING 状态
             LockSupport.parkNanos(this, nanos);
         
         // 未设置超时时间,get操作的线程会被park,进入 WAITING 状态
         // 除非有其他线程唤醒 或 将当前线程中断
         else
             LockSupport.park(this);
     
 

    awaitDone使用了自旋,根据每次自旋变量值的不同,走不同的分支,执行流程图如下:

  

2、移除链表中的WaitNode

  将当前Node节点从链表中移除,FutureTask#removeWaiter() 详情如下:
 // 试图将超时 或 已中断的等待节点 从链表中移除,同时将被删除节点的上一节点指向被删除节点的下一节点,避免gc无法回收
 private void removeWaiter(WaitNode node) 
     // 要移除的WaitNode不为null
     if (node != null) 
         // 要移除的WaitNode中线程设置为null
         node.thread = null;
         retry:
         // 自旋
         for (;;) 
             // pred :前驱节点
             // q:初始为头节点,代表当前正在遍历的节点
             // s:后继节点
             for (WaitNode pred = null, q = waiters, s; q != null; q = s) 
                 // 当前遍历的节点的后继节点赋值给 s
                 s = q.next;
                 // 1、当前遍历的节点线程不为null,将当前遍历节点赋值给 它的前驱节点pred,开始下一次自旋
                 if (q.thread != null)
                     pred = q;
                     
                 // 当前遍历的节点不为头节点,并且当前遍历节点的线程为null,将当前遍历节点的前驱节点指向它的后继节点
                 else if (pred != null) 
                     pred.next = s;
                     // 当前遍历线程节点的前驱节点线程也为null,说明它的前驱节点也要被删除,重新开始自旋
                     if (pred.thread == null) 
                         continue retry;
                 
 
                 // q.thread = null,  pred == null  ==> 头节点
                 // 当前遍历节点为头节点,cas操作,将队列头节点waiters的引用指向当前节点的后继节点,
                 // 即当前节点的下一节点作为头节点
                 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                       q, s))
                     continue retry;
             
             break;
         
     
 

  移除等待队列中执行完成、被取消的WaitNode节点,若WaitNode是头节点,将队列头节点的引用指向当前头节点的后继节点;若不为队列头节点,将当前WaitNode的前驱节点指向它的后继节点,流程图如下 :

  

   为了便于理解removeWaiters的流程,下面给出等待队列的大体变化过程:

  

3、返回执行结果

  根据线程状态返回执行结果,FutureTask#report() 详情如下:
 private V report(int s) throws ExecutionException 
     // 获取线程状态
     Object x = outcome;
     // 正常执行结束
     if (s == NORMAL)
         // 返回执行结果
         return (V)x;
     // 已取消 | (已中断) ,抛CancellationException异常
     if (s >= CANCELLED)
         throw new CancellationException();
     // 异常,抛ExecutionException异常
     throw new ExecutionException((Throwable)x);
 

  通过当前线程的状态,判断是抛出异常,还是返回线程执行结果。

5、cancel() - 获取线程执行结果

5.1、流程图

  

5.2、源码分析

 public boolean cancel(boolean mayInterruptIfRunning) 
     // 当前正执行的线程状态不为 NEW(新建) 状态 ,并且线程状态变更失败 ,返回false
     if (!(state == NEW &&
           UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
               mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
         return false;
     try    
         // 允许中断正在运行线程
         if (mayInterruptIfRunning) 
             try 
                 // 获取当前正在执行的线程
                 Thread t = runner;
                 // 执行interrupt()中断方法
                 if (t != null)
                     t.interrupt();
              finally 
                 // 修改线程状态为已中断
                 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
             
         
      finally 
         // 唤醒因get()而挂起线程
         finishCompletion();
     
     // 返回取消中断线程结果
     return true;
 

  

Java并发编程(二十):FutureTask源码分析

使用

	FutureTask<String> futureTask = new FutureTask(() -> "success");
	new Thread(futureTask).start();
	futureTask.get();

源码分析

  FutureTask提供了两个构造方法,分别是传入一个Callable,和传入一个Runnable加返回值result。如果传入的是Runnable加返回值,那么会通过适配器RunnableAdapter将其包装成一个Callable。

	public FutureTask(Runnable runnable, V result) 
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    

  通过调用Executors.callable方法包装Callable:

	public static <T> Callable<T> callable(Runnable task, T result) 
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    

  RunnableAdapter实现了Callable接口,其call方法的返回值就是result:

	static final class RunnableAdapter<T> implements Callable<T> 
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) 
            this.task = task;
            this.result = result;
        
        public T call() 
            task.run();
            return result;
        
    

  在一个FutureTask创建成功后,其默认状态state为NEW。在FutureTask中为state定义了7个状态,并且state被volatile修饰以保证可见性,这7个状态分别是:

  • NEW:新建状态(初始默认状态)
  • COMPLETING:正在完成中,这个状态表示任务已经执行完成,但是还没有设置结果(正常结果或者异常)
  • NORMAL:普通状态,状态变为COMPLETING并成功设置结果之后会变成这个状态
  • EXCEPTIONAL:抛出了异常,任务执行抛出了异常则会变为这个状态,和NORMAL一样,在变为此状态之前也有个COMPLETING中间状态
  • CANCELLED:被取消
  • INTERRUPTING:正在被中断中,表示准备中断线程,但是还未中断
  • INTERRUPTED:被中断

  state可能的状态转换情况如下:

  • NEW -> COMPLETING -> NORMAL :新建->完成中(任务已经完成,但是结果还未保存)->普通状态(结果保存完成)
  • NEW -> COMPLETING -> EXCEPTIONAL :新建->完成中(任务抛出异常,但是异常还未保存)->异常状态(异常保存完成)
  • NEW -> CANCELLED :新建->取消
  • NEW -> INTERRUPTING -> INTERRUPTED:新建->中断中(准备中断,但还未中断)->完成中断(调用了interrupt方法)

  FutureTask实现了Runnable接口,所以可以直接通过其run方法在当前线程执行任务,其run方法就是调用callable的call方法,call方法执行完毕之后,任务状态变为COMPLETING,并且call方法有一个返回值,正常执行得到返回值之后将其保存到outcome属性中,然后任务状态从COMPLETING变为NORMAL;如果任务执行抛出了异常, 那么任务状态也先变为COMPLETING,然后将异常对象设置到outcome属性,接着任务状态从COMPLETING变为EXCEPTIONAL。FutureTask的run方法如下:

    public void run() 
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            //检查任务状态和执行线程,如果状态不是NEW,或者通过CAS设置当前线程为任务执行线程失败,那么直接返回
            return;
        try 
            Callable<V> c = callable;
            if (c != null && state == NEW) 
            	//在此判断任务状态
                V result;
                boolean ran;
                try 
                	//执行任务
                    result = c.call();
                    //表示任务执行成功
                    ran = true;
                 catch (Throwable ex) 
                	//抛出了异常,任务状态:NEW->COMPLETING->EXCEPTIONAL
                    result = null;
                    ran = false;
                    setException(ex);
                
                if (ran)
					//任务执行成功,任务状态:NEW->COMPLETING->NORMAL
                    set(result);
            
         finally 
            //将runner设置为null,保证其它线程能够再次执行此任务
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
            	//线程被中断
                handlePossibleCancellationInterrupt(s);
        
    

注:将FutureTask交给Thread去执行和将Runnable交给Thread去执行是一样的,Thread.start方法就是负责调用本地方法启动一个线程去异步执行提交的任务,差别就是FutureTask可以调用get方法获取任务执行的结果,而结果保存在outcome属性中。

  run()方法的大体逻辑相对来说比较简单,就是调用Callable的call方法获取返回值,然后将返回值设置到outcome中,此时任务的状态变化为:NEW->COMPLETING->NORMAL;如果call方法抛出了异常,那么将异常设置到outcome中,此时任务的状态变化为:NEW->COMPLETING->EXCEPTIONAL。
  任务执行失败的处理逻辑在setException方法中:

	protected void setException(Throwable t) 
		//先将任务修改为COMPLETING状态
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) 
           	//保存异常
            outcome = t;
            //将任务修改为EXCEPTIONAL状态
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        
    

  任务执行成功设置返回值是通过调用set方法实现的:

	protected void set(V v) 
		//先将任务修改为COMPLETING状态
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) 
        	//保存返回值
            outcome = v;
            //将任务修改为NORMAL状态
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        
    

  在setException和set方法中都体现了FutureTask任务的状态转化,不过我们注意到两个方法中都调用了一个finishCompletion方法,这个finishCompletion的逻辑如下:

    private void finishCompletion() 
        for (WaitNode q; (q = waiters) != null;) 
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) 
                for (;;) 
                	//循环唤醒waiters队列中的所有阻塞线程
                    Thread t = q.thread;
                    if (t != null) 
                        q.thread = null;
                        //唤醒阻塞的线程
                        LockSupport.unpark(t);
                    
                    //唤醒了一个节点之后将其从链表中移除
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                
                break;
            
        
		//空方法
        done();
        callable = null;        // to reduce footprint
    

  从状态的转换流程中可以看出,不论任务是正常完成还是抛出了异常,在FutureTask中都认为是任务完成。在finishCompletion方法中的主要逻辑是唤醒所有阻塞的线程,这些阻塞的线程是怎么来的呢?其实就是调用FutureTask的get方法生成的。FutureTask支持异步获取任务执行的结果,主要提供了两个方法get()和get(long timeout,TimeUnit unit),分别表示永久阻塞等待和超时阻塞等待,来看看get方法的实现:

	public V get() throws InterruptedException, ExecutionException 
        int s = state;
        if (s <= COMPLETING)
        	//阻塞线程,传入false表示不用超时阻塞
            s = awaitDone(false, 0L);
        //任务已经完成了,处于COMPLETING之后的状态
        return report(s);
    

  get(long timeout,TimeUnit unit)方法的实现:

	public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException 
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
        	//传入true表示需要超时阻塞
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    

  如果任务处于COMPLETING之后的状态,表示任务已经完成了(不论是正常完成还是抛出异常),否则进入超时阻塞等待,如果超时唤醒后发现任务还未完成,那么抛出TimeoutException异常。
  我们注意到,永久阻塞和超时阻塞都是通过awaitDone方法实现的,该方法有两个入参,分别是:

  • timed:布尔型参数,表示是否需要超时阻塞
  • nanos:超时阻塞的时间
      那么我们进入awaitDone方法看看是如何实现的:
	private int awaitDone(boolean timed, long nanos)
        throws InterruptedException 
        //计算线程阻塞的终止时间:当前时间+最大阻塞时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) 
            if (Thread.interrupted()) 
            	//如果线程被中断了,从阻塞队列中移除,抛出中断异常
                removeWaiter(q);
                throw new InterruptedException();
            

            int s = state;
            if (s > COMPLETING) 
            	//如果任务已经结束,那么返回当前任务状态,后续会根据任务状态解析结果
                if (q != null)
                    q.thread = null;
                return s;
            
            else if (s == COMPLETING) // cannot time out yet
            	//如果任务正在完成中,那么只需要等待任务完成即可
            	//这里使用yield释放cpu时间片,等待下一次cpu执行
                Thread.yield();
            else if (q == null)
            	//如果q为null,那么新建一个WaitNode,q是awaitDone方法的局部变量,初始为null
                q = new WaitNode();
            else if (!queued)
            	//如果q还没有入队,那么通过CAS将其入队(waiters相当于是单向链表的头结点)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) 
            	//如果需要超时等待,根据阻塞截止时间和当前时间计算需要阻塞的时间
            	//因为此方法的逻辑中,排队节点的创建、入队、和阻塞等都是是通过for循环一次一次的进行推进
            	//所以在阻塞之前要重新计算一下剩余需要阻塞的时间
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) 
                	//如果剩余阻塞时间小于等于0 ,说明阻塞时间已经过了,直接移除节点,然后返回任务状态
                    removeWaiter(q);
                    return state;
                
                //超时阻塞
                LockSupport.parkNanos(this, nanos);
            
            else
            	//不带超时的阻塞
                LockSupport.park(this);
        
    

  一个线程要获取FutureTask任务的执行结果,如果任务还未完成,那么线程就需要被阻塞。首先线程会被封装成一个WaitNode对象,WaitNode是FutureTask的一个静态内部类,其结构很简单,就是包含一个线程对象(thread)和一个WaitNode对象(next):

    static final class WaitNode 
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode()  thread = Thread.currentThread(); 
    

  FutureTask通过这个WaitNode构建了一个单向链表(waiters属性),所有通过获取FutureTask任务结果而阻塞的线程都排在这个单向链表中,每个WaitNode是通过CAS入队的,这个和AQS中的同步队列和条件队列基本是一样的逻辑。
  awaitDone方法有个特点,WaitNode的创建、入队、阻塞,包括线程被唤醒之后各种处理的都是通过for循环自旋推进的。比如:

  • 第一次循环判断局部变量q为null,那么创建一个WaitNode
  • 第二次循环判断!queue为true,说明上一次创建的WaitNode还没有入队,那么WaitNode通过CAS入队,如果CAS失败,说明有多个线程在并发入队,留待下一次for循环再进行入队
  • 第三次循环判断是否需要超时阻塞,如果有超时,会重新计算一次剩余阻塞时间, 通过parkNanos方法进行阻塞;如果超时时间已经过了,则直接返回任务状态。如果不使用超时,那么直接通过park方法阻塞线程

  由于通过for循环推进也需要消耗时间,所以在awaitDone方法开始的时候会先保存线程阻塞的终止时间,等待线程真正开始parkNanos阻塞的时候,会再计算一下剩余需要阻塞的时间,如果阻塞时间已经过了,那么直接返回任务状态。

  可以看到,awaitDone方法的返回值是任务的状态state,最终还会调用report方法根据任务的状态返回任务的最终结果。而awaitDone方法返回有几种情况:

  • 线程被中断:线程醒来后会在下次循环中判断发现线程被中断,进而抛出InterruptedException
  • 线程被unpark唤醒,然后发现任务已经完成:返回任务当时的状态(>COMPLETING)
  • 线程超时唤醒:线程等待超时时间到了也没有等到任务完成,方法返回后会在外层get方法里判断如果任务状态小于等于COMPLETING,表示是超时唤醒,那么抛出TimeoutException

  在分析了awaitDone方法之后,我们就能明白在run方法中任务完成之后,在set方法保存结果之后调用finishCompletion方法的作用了,就是循环唤醒waiters链表中的所有线程,线程被唤醒后发现任务状态大于COMPLETING,那么awaitDone方法得以返回。  awaitDone方法返回的是任务的状态,还需要在report方法中决定get方法最终的返回值:

	private V report(int s) throws ExecutionException 
        Object x = outcome;
        if (s == NORMAL)
        	//任务正常完成,返回结果,其它都是异常结束
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    

  最后我们还需要看一下cancel方法的逻辑:

    public boolean cancel(boolean mayInterruptIfRunning) 
    	//mayInterruptIfRunning表示是否中断任务线程
    	//如果需要中断线程,那么需要把任务状态先调整为一个中间状态:INTERRUPTING
    	//否则直接将状态调整为CANCELLED
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            //cas失败了,说明同时有其它线程在操作
            return false;
        try     // in case call to interrupt throws exception
            if (mayInterruptIfRunning) 
                try 
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                 finally  // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                
            
         finally 
        	//触发任务完成的逻辑,唤醒waiters链表中的所有阻塞线程
            finishCompletion();
        
        return true;
    

  之后NEW状态下的任务才能被cancel,该方法同时提供了一个布尔参数mayInterruptIfRunning,表示是否需要中断执行任务的线程,如果需要中断线程,那么任务状态会先调整为一个中间状态:INTERRUPTING,之后调用线程的interrupt方法后再将状态调整为INTERRUPTED,这个中间状态和COMPLETING类似;如果不需要中断线程,那么直接将任务修改为CANCELLED状态,如果CAS修改失败,那么直接返回,不再执行后面的逻辑,因为这代表有多个线程在并发cancel,只需要一个线程处理这个逻辑。
如果CAS修改状态成功,那么可以执行后面cancel
  从cancel的逻辑中可以发现,通过FutureTask提供的cancel方法,如果传入的mayInterruptIfRunning为false,那么对于任务线程本身并不会做什么操作,而只是将状态修改为CANCELLED状态,然后调用finishCompletion方法唤醒阻塞线程;即使是入参为true,那么也是中断线程,如果任务没有响应中断,那么任务也不会退出,即使任务已经是CANCELED状态。

总结

  FutureTask提供了两种构建方式,分别是传入Callable和传入Runnable,如果传入的是Runnable,也会通过适配器将其包装为一个Callable,FutureTask间接实现了Runnable接口,将其提交给Thread之后执行的就是其run方法,在FutureTask的run方法中调用的是Callable的call方法并且获取结果,对于同一个FutureTask,同时只能有一个线程执行该任务,这点体现在Thread类型的runner属性上,在run方法执行之前,会通过CAS将runner以null为期望值修改为当前线程,修改成功才会进入后续的逻辑。
  FutureTask中对于任务规定了7种状态,任务初始默认为NEW状态,大体来说FutureTask中的任务可能会:正常执行结束、任务抛出异常结束、被取消、被中断,这些行为也反映在了定义的7种状态中。
  任务在FutureTask中,正常完成或者抛出异常都算是任务完成,需要保存任务结果,如果任务正常完成,那么结果就是Callable的call方法的返回值;如果抛出异常,那么结果就是异常对象。当任务完成之后需要唤醒所有由于调用get方法获取结果而阻塞的线程,这些线程保存在一个通过WaitNode构建的单向链表中,唤醒的时候从链表头节点(waiters)开始依次unpark即可。
  通过get方法获取任务执行结果而阻塞的线程,会在awaitDone方法中完成WaitNode的创建、入队和阻塞等操作,阻塞分为超时阻塞和永久阻塞,如果是超时阻塞,那么线程被唤醒的方式有中断线程、达到超时时间、unpark;如果是永久阻塞,那么线程被唤醒的方式有中断线程和unpark。当线程被唤醒后,会返回当前任务的状态,或者抛出中断异常,返回任务状态state之后,会通过report方法决定get方法最终返回结果,如果任务是正常结束的,那么返回结果值,否则会抛出相应的异常。
  最后还需要注意,FutureTask的cancel方法并不能保证任务线程立即退出,不过无论如何都会唤醒阻塞线程。当然这点在线程池中也一样,即使调用了线程池的shutdownNow方法,也不能保证工作线程能够立即退出,这个要取决于任务如何响应中断请求,如果要强制结束一个线程,那么可以调用Thread类的stop方法,虽然该方法不建议调用,但是这个方法在处理一些顽固"僵死"线程时很有用。

以上是关于FutureTask源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程(二十):FutureTask源码分析

Java并发编程(二十):FutureTask源码分析

Java并发编程(二十):FutureTask源码分析

Java并发编程(二十):FutureTask源码分析

JDK源码分析-FutureTask

源码分析-FutureTask