线程池那些事之Future

Posted Java后端笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池那些事之Future相关的知识,希望对你有一定的参考价值。

前言

ThreadPoolExecutor除了可以执行Runnable的任务外,还可以执行那些带有返回结果的Callable任务。在提交Callable任务后,我们会得到一个Future对象。使用这个Future对象,我们可以监控任务的执行状态,也可以取消任务的执行。

源码分析

任务的提交

ThreadPoolExecutor的submit方法除了可以提交Callable的任务,也可以提交Runnable的任务,但是在底层都会转换为Callable,只不过提交Runnable任务在Future会返回空或者我们预设的值。

 
   
   
 
  1. public Future<?> submit(Runnable task) {

  2.        if (task == null) throw new NullPointerException();

  3.        RunnableFuture<Void> ftask = newTaskFor(task, null);

  4.        execute(ftask);

  5.        return ftask;

  6.    }

  7.    public <T> Future<T> submit(Runnable task, T result) {

  8.        if (task == null) throw new NullPointerException();

  9.        RunnableFuture<T> ftask = newTaskFor(task, result);

  10.        execute(ftask);

  11.        return ftask;

  12.    }

  13.    public <T> Future<T> submit(Callable<T> task) {

  14.        if (task == null) throw new NullPointerException();

  15.        RunnableFuture<T> ftask = newTaskFor(task);

  16.        execute(ftask);

  17.        return ftask;

  18.    }

在submit方法中,执行的逻辑为:

  1. 通过入参生成一个RunnableFuture对象

  2. 通过RunnableFuture调用execute方法

  3. 把这个RunnableFuture对象返回。

这边我们需要注意的是,对于execute方法来讲,它只关注RunnableFuture的run方法逻辑体,那么监控是如何做的呢,看下面的FutureTask解析。

FutureTask

从类图中可以看到,FutureTask实现了RunnbaleFuture接口,而RunnbaleFuture继承了Runnable和Future接口,FutureTask针对这两个的接口实现,有各自的作用。

我们先看下Future接口定义

 
   
   
 
  1. public interface Future<V> {

  2.    //如果任务还未执行,取消任务执行

  3.    //mayInterruptIfRunning为true,会中断正在执行任务的线程

  4.    boolean cancel(boolean mayInterruptIfRunning);

  5.    //任务是否已经被取消

  6.    boolean isCancelled();

  7.    //任务是否完成

  8.    boolean isDone();

  9.    //获取任务执行结果

  10.    V get() throws InterruptedException, ExecutionException;

  11.    //超时获取任务结果,超过时间,抛出TimeoutException

  12.    V get(long timeout, TimeUnit unit)

  13.        throws InterruptedException, ExecutionException, TimeoutException;

  14. }

Future接口定义了一些监控的方法,讲解FutureTask如何实现Future的接口之前,先了解下FutureTask的生命周期

FutureTask生命周期

FutureTask有7个状态

 
   
   
 
  1.    //初始化

  2.    private static final int NEW          = 0;

  3.    //任务执行中

  4.    private static final int COMPLETING   = 1;

  5.    //正常结束

  6.    private static final int NORMAL       = 2;

  7.    //异常结束

  8.    private static final int EXCEPTIONAL  = 3;

  9.    //取消

  10.    private static final int CANCELLED    = 4;

  11.    //中断取消进行中

  12.    private static final int INTERRUPTING = 5;

  13.    //中断取消完成

  14.    private static final int INTERRUPTED  = 6;

状态转换有以下4种情况

  1. 正常执行 NEW->COMPLETING->NORMAL

  2. 执行失败 NEW->COMPLETING->EXCEPTIONAL

  3. 取消 NEW->CANCELLED

  4. 中断取消 NEW->INTERRUPTING->INTERRUPTED

生命周期状态转换的逻辑都在FutureTask的run方法中

Runnable接口实现

 
   
   
 
  1. public void run() {

  2.        //如果已经被执行过或者被取消了,直接return

  3.        if (state != NEW ||

  4.            !UNSAFE.compareAndSwapObject(this, runnerOffset,

  5.                                         null, Thread.currentThread()))

  6.            return;

  7.        try {

  8.            Callable<V> c = callable;

  9.            if (c != null && state == NEW) {

  10.                V result;

  11.                boolean ran;

  12.                try {

  13.                    //执行call方法

  14.                    result = c.call();

  15.                    ran = true;

  16.                } catch (Throwable ex) {

  17.                    result = null;

  18.                    ran = false;

  19.                    //执行失败,抛出异常,设置异常

  20.                    setException(ex);

  21.                }

  22.                if (ran)

  23.                    //执行成功设置结果

  24.                    set(result);

  25.            }

  26.        } finally {

  27.            runner = null;

  28.            int s = state;

  29.            //对中断的一些处理

  30.            if (s >= INTERRUPTING)

  31.                handlePossibleCancellationInterrupt(s);

  32.        }

  33.    }

逻辑很简单,成功调用set(result)设置结果,发生异常调用 setException(ex);

下面来看下set和setException方法

 
   
   
 
  1. protected void set(V v) {

  2.        //修改状态为COMPLETING

  3.        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

  4.            //设置结果到outcome

  5.            outcome = v;

  6.            //修改状态为NORMAL

  7.            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);

  8.            //唤醒等待的get方法

  9.            finishCompletion();

  10.        }

  11.    }

  12. protected void setException(Throwable t) {

  13.        //修改状态为COMPLETING

  14.        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

  15.             //设置异常结果到outcome

  16.            outcome = t;

  17.            //修改状态为EXCEPTIONAL

  18.            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);

  19.             //唤醒等待的get方法

  20.            finishCompletion();

  21.        }

  22.    }

两个方法的逻辑几乎一样,除了最后的结果和state不同。

在finishCompletion会唤醒等待结果的线程

 
   
   
 
  1. private void finishCompletion() {

  2.        // 遍历waiters进行唤醒

  3.        for (WaitNode q; (q = waiters) != null;) {

  4.            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {

  5.                for (;;) {

  6.                    Thread t = q.thread;

  7.                    if (t != null) {

  8.                        q.thread = null;

  9.                        //通过LockSupport唤醒线程

  10.                        LockSupport.unpark(t);

  11.                    }

  12.                    WaitNode next = q.next;

  13.                    if (next == null)

  14.                        break;

  15.                    q.next = null; // unlink to help gc

  16.                    q = next;

  17.                }

  18.                break;

  19.            }

  20.        }

  21.        done();

  22.        callable = null;        // to reduce footprint

  23.    }

run方法中主要封装了对提交任务的执行以及FutureTask生命周期的转变逻辑,接下来看下Future接口的实现。

也就是在线程池中,工作线程会执行以上逻辑。

Futute接口实现

Future接口的方法调用者,不是工作线程,而是我们自己的业务线程了。

get
 
   
   
 
  1. public V get() throws InterruptedException, ExecutionException {

  2.        int s = state;

  3.        //状态小于COMPLETING的时候代表任务没有完成,需要阻塞等待

  4.        if (s <= COMPLETING)

  5.            //阻塞当前线程

  6.            s = awaitDone(false, 0L);

  7.        return report(s);

  8.    }

  9. private V report(int s) throws ExecutionException {

  10.        Object x = outcome;

  11.        //如果状态为NORMAL,表示调用成功,返回结果

  12.        if (s == NORMAL)

  13.            return (V)x;

  14.        //表示被取消,抛出CancellationException异常

  15.        if (s >= CANCELLED)

  16.            throw new CancellationException();

  17.        //下面的话,代表执行过程中抛出其他异常

  18.        throw new ExecutionException((Throwable)x);

  19.    }

我们需要注意如果任务没有执行完毕,在get方法中会阻塞当前线程进行等待执行结果。当然get也支持超时模式。

 
   
   
 
  1. public V get(long timeout, TimeUnit unit)

  2.        throws InterruptedException, ExecutionException, TimeoutException {

  3.        if (unit == null)

  4.            throw new NullPointerException();

  5.        int s = state;

  6.        //下面通过awaitDone会传入超时时间

  7.        if (s <= COMPLETING &&

  8.            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

  9.            //如果超时后,状态还是小于COMPLETING,抛出异常

  10.            throw new TimeoutException();

  11.        return report(s);

  12.    }

如果在指定时间内没有返回,抛出TimeoutException异常。

在awaitDone中,会把当前线程阻塞,以链表节点的形式放入waiters中。

cancel

cancel用于取消任务,有普通取消和中断取消两种,通过mayInterruptIfRunning区分。

 
   
   
 
  1. public boolean cancel(boolean mayInterruptIfRunning) {

  2.        //只能在任务开始运行前取消

  3.        //并且只能对NEW状态的线程进行中断

  4.        //如果不进行中断设置为CANCELLED状态,否则,INTERRUPTING状态

  5.        if (!(state == NEW &&

  6.              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,

  7.                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

  8.            return false;

  9.        try {    // in case call to interrupt throws exception

  10.            //对线程进行中断的逻辑

  11.            if (mayInterruptIfRunning) {

  12.                try {

  13.                    Thread t = runner;

  14.                    if (t != null)

  15.                        t.interrupt();

  16.                } finally { // final state

  17.                    //中断过后,设置为INTERRUPTED状态

  18.                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);

  19.                }

  20.            }

  21.        } finally {

  22.            //唤醒结果的线程

  23.            finishCompletion();

  24.        }

  25.        return true;

  26.    }

对于中断取消,会调用线程的interrupt对线程进行中断。

这里有个知识点,线程中断只对阻塞的线程有效,如果阻塞的线程被中断了,会抛出中断异常。线程对于不阻塞的线程无效。 理论上取消需要在任务未执行前,那么我们也没必要对线程进行中断。但是从以下代码来看,在多线程情况下,即使状态为NEW,你取消了,任务还是有可能被执行。如果这个被执行任务内部有阻塞的逻辑,那么对应工作线程会释放不了,所以需要进行中断。

 
   
   
 
  1. //run方法第一个判断

  2. if (state != NEW ||

  3.            !UNSAFE.compareAndSwapObject(this, runnerOffset,

  4.                                         null, Thread.currentThread()))

  5. //cancel方法的判断

  6. if (!(state == NEW &&

  7.              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,

  8.                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

  9.            return false;

isCancelled和isDone

这两个方法比较简单,直接使用状态来判断。

 
   
   
 
  1. public boolean isCancelled() {

  2.        return state >= CANCELLED;

  3.    }

  4.    public boolean isDone() {

  5.        return state != NEW;

  6.    }

一些测试

超时测试

 
   
   
 
  1.        Future future = executorService.submit(new Callable<Object>() {

  2.            @Override

  3.            public Object call() throws Exception {

  4.                try {

  5.                    Thread.sleep(2000);

  6.                }catch (Exception ex){

  7.                    ex.printStackTrace();

  8.                }

  9.                return "hello world";

  10.            }

  11.        });

  12.        System.out.println(future.isDone());

  13.        System.out.println(future.get(1000,TimeUnit.MILLISECONDS));

  14.        System.out.println(future.isDone());

会抛出超时异常

 
   
   
 
  1. Exception in thread "main" java.util.concurrent.TimeoutException

中断测试

 
   
   
 
  1. Future future = executorService.submit(new Callable<Object>() {

  2.            @Override

  3.            public Object call() throws Exception {

  4.                try {

  5.                    Thread.sleep(2000);

  6.                }catch (Exception ex){

  7.                    ex.printStackTrace();

  8.                }

  9.                return "hello world";

  10.            }

  11.        });

  12.        Thread.sleep(1000L);

  13.        System.out.println(future.isDone());

  14.        future.cancel(true);

  15.        System.out.println(future.get());

  16.        System.out.println(future.isDone());

返回结果为

 
   
   
 
  1. false

  2. Exception in thread "main" java.lang.InterruptedException: sleep interrupted

  3. java.util.concurrent.CancellationException

抛出了2个异常,一个是中断异常,因为线程在sleep下面阻塞,如果没有阻塞不会有这个异常,第二个是get的时候抛出取消异常。

下面模拟下对于已经执行的任务,取消无效的例子

 
   
   
 
  1. Future future = executorService.submit(new Callable<Object>() {

  2.            @Override

  3.            public Object call() throws Exception {

  4.                int i =0;

  5.                while(i>=0){

  6.                    i++;

  7.                    if(i%1000000==0){

  8.                        System.out.println(i);

  9.                    }

  10.                }

  11.                return "hello world";

  12.            }

  13.        });

  14.        //Thread.sleep(1000L);

  15.        System.out.println(future.isDone());

  16.        System.out.println(future.cancel(true));

  17.        System.out.println(future.get());

  18.        System.out.println(future.isDone());

结果的开始如下

 
   
   
 
  1. false

  2. Exception in thread "main" java.util.concurrent.CancellationException

  3. true

  4. 1000000

  5.    at java.util.concurrent.FutureTask.report(FutureTask.java:121)

  6.    at java.util.concurrent.FutureTask.get(FutureTask.java:192)

  7.    at com.scj.threadpool.BasicUse.main(BasicUse.java:57)

  8. 2000000

  9. 3000000

  10. 4000000

从第二个true看出来,我们cancel是成功了,但是这个线程一直在跑,所以对于这种无线循环的任务我们也是需要注意的,即使取消了,也没有用。只能把整个线程池销毁了吧。

总结

对于ThreadPoolExecutor来讲,FutureTask和Runnable毫无差别。FutureTask对Runnable任务进行了包装,在run方法体整合了Callable的调用,FutureTask状态转换以及调用结果设置的逻辑,同时可以通过Future接口的方法提供监控操作。



以上是关于线程池那些事之Future的主要内容,如果未能解决你的问题,请参考以下文章

线程池那些事之ThreadPoolExecutor

线程池那些事之ScheduledThreadPoolExecutor

C++那些事之高效率开发C++/C

Java线程池(Callable+Future模式)

JUC线程池扩展可回调的Future

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段