FutureTask

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了FutureTask相关的知识,希望对你有一定的参考价值。

参考技术A 一、学习FutureTask的时候,要知道,为什么引入这个概念。

        我们在创建线程的时候,一般都是继承thread类或者实现runnable接口这两种。但是他们都有一个共同的问题就是没法返回线程执行之后的结果(当然你也可以通过共享变量或者使用线程通信的方式来达到效果)。java5之后提供了callable和Future接口,通过他们就可以在任务执行结束后返回执行结果。

二、本文主要就是浅析一下这个三个类Callable、Future、FutureTask。

   (一)Callable:

   Callable 的使用要配合ExecuteServer。

    ExecuteServer的submit方法是一个多态的方法。

    (1)Futuresubmit(Callabletask);

    (2)Future submit(Runnable task, T result);

    (3)Future submit(Runnable task);

    Callable接口:

    public interface Callable<V>

    V call() throws Exception;//这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

    

    (二)Future:

    就是对具体的callable与runnable任务的执行结果进行取消,查询是否完成,通过使用get方法获取线程最终的执行结果,get方法会阻塞直到任务返回结果。

    重点:(敲黑板)

    Future重心是:获取线程执行结果、能够中断任务、判断任务是否执行结束

    Future位于concurrent包下,他是一个接口

    具体定义了如下五个方法:

    public interface Future

    ① boolean cancel(boolean mayInterruptIfRunning);

   ②  boolean isCancelled();

    ③ boolean isDone();

    ④ V get() throws InterruptedException, ExecutionException;

   ⑤  V get(long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutException;



    cancel: 用来取消任务。

    如果取消任务成功----------------ture、如果取消任务失败-------------false。

    参数mayInterruptIfRunning:标示是否取消正在执行但没有执行完毕的任务。如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。

    isCancelled :

    方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

     isDone :

    方法表示任务是否已经完成,若任务完成,则返回true;

     get() :

    方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

     get(long timeout, TimeUnit unit) :

    用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

    (三)FutureTask:

    futuretask是实现了runnablefuture接口,而runnablefuture接口是继承了runnable和future两个接口。而futureTask又实现了runnable接口,所以futuretask即可以被线程执行,又可以作为future得到callable的返回值。

    FutureTask提供了2个构造器:public     FutureTask(Callable callable)

    public FutureTask(Runnable runnable, V result)

FutureTask源码解析

简介

FutureTask可用于异步获取执行结果或者取消任务。通过实现Runnable或者Callable接口定义任务给FutureTask,然后直接调用其run()方法或者把任务放入线程池执行。在外部可以通过FutureTask的get()方法异步获取执行结果。因此FutureTask非常适合用于耗时的计算,主线程可以完成自己的任务之后再去获取结果。另外,FutureTask可以确定即使多次调用了run()方法,它只执行一次Runnable或者Callable任务,获取通过cancel取消FutureTask任务。

使用场景

利用FutureTask和ExecutorService,可以用多线程的方式提交计算任务,主线程继续执行其他任务,当主线程需要子线程的计算结果是,再异步获取现成的执行结果
FutureTask在高并发环境下确保任务只执行一次

使用案例

public class FutureTaskTest {
    public static void main(String[] args) {
        FutureTaskTest futureTaskTest = new FutureTaskTest();
        ArrayList<FutureTask<Integer>> taskList = new ArrayList<>();
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            // 传入Callable对象创建FutureTask对象
            FutureTask<Integer> ft = new FutureTask<Integer>(futureTaskTest.new ComputeTask(i, "" + i));
            taskList.add(ft);
            // 提交给线程池执行任务,也可以通过exec.invokeAll(taskList)一次性提交所有任务;
            service.submit(ft);
        }
        System.out.println("所有计算任务提交完毕, 主线程接着干其他事情!");

        // 开始统计各计算线程计算结果
        Integer totalResult = 0;
        for (FutureTask<Integer> ft : taskList) {
            try {
                //FutureTask的get方法会自动阻塞,直到获取计算结果为止
                totalResult = totalResult + ft.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        // 关闭线程池
        service.shutdown();
        System.out.println("多任务计算后的总结果是:" + totalResult);
    }
    private class ComputeTask implements Callable<Integer>{
        private Integer result = 0;
        private String taskName = "";
        public ComputeTask(Integer iniResult, String taskName) {
            result = iniResult;
            this.taskName = taskName;
            System.out.println("生成子线程计算任务: " + taskName);
        }
        @Override
        public Integer call() throws Exception {
            for (int i = 0; i < 100; i++) {
                result = +i;
            }
            // 休眠5秒钟,观察主线程行为,预期的结果是主线程会继续执行,到要取得FutureTask的结果是等待直至完成。
            Thread.sleep(5000);
            System.out.println("子线程计算任务: " + taskName + " 执行完成!");
            return result;
        }
    }
}

在上面的案例中,我们自定义ComputeTask实现了Callable接口,自定义任务内容。并创建了一个定长为5的线程池和一个ArrayList类型的FutureTask集合,然后通过线程池的subbmit方法把自定义任务提交给线程池。
而submit方法如下

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
我们可以看到submit有三个重载方法,案例里用的是传参为Callable的submit方法,最终通过newTaskFor方法创建了一个FutureTask方法提交给了线程池

开启了一个for循环,在for循环里面向线程池提交了10次task任务。
最后遍历list集合里面的FutureTask,通过get()获取执行结果,对所有结果进行累加。
因为我们通过newFixedThreadPool(5)创建了一个定长为5的线程池,最大并发数为5,多余的任务会在队列里面进行等待。所以会先执行五个task任务,然后再去执行队列里面的五个task任务。最后在输出结果的时候中间会有一个短暂的停顿,这个停顿是因为开始提交的五个task任务执行完成,get()方法获取到了执行结果,但是后面的五个还在队列里面没有执行,所以get方法阻塞导致出现停顿。

在很多高并发的环境下,往往我们只需要某些任务只执行一次。这种使用情景FutureTask的特性恰能胜任。

源码解析

数据结构和常量分析

WaitNode 节点用来存储在后面get()方法获取任务执行结果出现阻塞时存放线程使用。

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
state表示当前任务执行状态
private volatile int state;
    // 当前任务尚未执行
    private static final int NEW          = 0;
    // 当前任务正在结束,但是没有完全结束
    private static final int COMPLETING   = 1;
    // 当前任务正常结束
    private static final int NORMAL       = 2;
    // 当前任务执行过程中发生了异常 内部封装的Callable.run()向上抛出异常
    private static final int EXCEPTIONAL  = 3;
    // 当前任务被取消
    private static final int CANCELLED    = 4;
    // 当前任务中断中
    private static final int INTERRUPTING = 5;
    // 当前任务已经中断 中断不是线程停止了。中断只是线程的一个标记,如果submit提交的任务完全没有响应中断
    private static final int INTERRUPTED  = 6;
    
    // submit(runnable/callable) runnable使用装饰者模式伪装成callable
    private Callable<V> callable;
    /**
     * 正常情况下任务正常执行结束,outcome保存执行结果
     * 非正常情况,outcome保存异常*/
    private Object outcome; // non-volatile, protected by state reads/writes
    /**
     * 当前任务被线程执行期间,保存当前执行任务的线程对象引用*/
    private volatile Thread runner;
    /**
     * 因为会有很多线程去get当前任务的结果,这里是用栈保存来多个线程 */
    private volatile WaitNode waiters;

主要方法

run()方法

run()方法是任务执行的入口

 public void run() {
        /**
         * state不为new,说明当前task已经被其他线程处理过,当前线程不处理
         * 通过CAS把task的runner设置为当前线程
         */
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        // 执行到这里,当前task一定是NEW状态,而且当前线程抢占task成功
        try {
            // callable就是程序员自己封装的逻辑,就是上面案例中自定义的ComputeTask类
            Callable<V> c = callable;
            // 判断防止提交task为null,外部线程把当前task给cancel掉
            if (c != null && state == NEW) {
                V result;
                // 任务执行状态
                boolean ran;
                try {
                // 执行自定义任务内容,就是上面ComputeTask中的for循环对result累加操作
                    result = c.call();
                // 设置task任务执行状态为true,表示当前任务已经执行完成,但是FutureTask不是执行完成的状态
                    ran = true;
                } catch (Throwable ex) {
                // 如果执行自定义任务过程中出现异常,就把result设置为null,同时把FutureTask返回结果设置为执行过程中抛出的异常
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // 设置当前执行线程为nul
            runner = null;
            // 获取当前FutureTask状态
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
get()方法

get方法就是外部线程获取task任务执行结果的方法。如果当前任务状态为COMPLETING和NEW,则把当前线程封装成一个waitNode节点进入队列等待,否则就通过report方法获取执行结果或者执行结果中的异常,变量outcome就是保存任务执行结果或者执行过程中对外抛出的异常。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 当前task未执行或者正在结束,调用get方法的外部线程会被阻塞
    if (s <= COMPLETING)

        s = awaitDone(false, 0L);
    return report(s);
}
awaitDone() 方法

首先把waitNode置为null,然后进入for循环的自旋
CASE 1:
第一次自旋,因为当前waitNode,且当前外部线程第一次获取执行结果,没有属于自己的node节点,所以命中else if (q == null)这个判断条件,创建属于当前外部线程的node节点。
CASE 2:
因为当前线程有了自己的node节点,如果当前任务还是没有完成,就会命中else if (!queued)这个判断条件,通过CAS以头插法的形式把当前线程的node节点进入等待队列。入队成功,queued,开始下轮自旋,查看任务状态,如果任务状态不为NEW和COMPLETING,则返回任务执行结果或者异常。如果入队失败,仍然获取最新的任务状态,不是NEW和COMPLETING返回结果或者异常,如果是NEW和COMPLETING则开始新一次的CAS把当前线程的node节点入队。如果入队成功并且当前任务还是NEW和COMPLETING,则当前线程会被park,知道被唤醒或者被中断。

 private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        /**
         *因为get方法中,传入的timed为false,如果这个地方deadline为0
         * 
         */
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        //因为当前获取结果的线程第一次进入,所以线程对应的waitNode节点为null
        WaitNode q = null;
        // 是否入栈标志
        boolean queued = false;
        // 自旋
        for (;;) {
            //如果当前线程是被中断唤醒,interrupted()会返回true,同时清除中断标记,后面interrupted()返回的都是false
            if (Thread.interrupted()) {
                // 栈顶元素出栈
                removeWaiter(q);
                // 向外抛出中断异常
                throw new InterruptedException();
            }
            //获取当前任务的最新状态
            int s = state;
            // 当前任务已经处理并有结果,可能是正常结果,也可能是异常结果
            if (s > COMPLETING) {
                // 说明当前线程已经创建了node对象,此时将q.thread设置为null,帮助GC
                if (q != null)
                    q.thread = null;
                // 返回执行结果
                return s;
            }
            // 说明当前任务已经进入完成中,让出CPU执行权,开始下一轮的抢占
            else if (s == COMPLETING) // cannot time out yet
            /**
             * java通过start启动线程,让线程变成就绪状态等待CPU调度执行,而yield()方法就是让线程由执行状态变成就绪状态,让出CPU时间片
             * 在下一个线程执行的时候,当前线程可能会被执行,也可能不会被执行
             */
                Thread.yield();
            // 第一次自旋,因为前面q已经为null,所以这里为当前线程创建一个waitNode
            else if (q == null)
                q = new WaitNode();
            // 第二次自旋,当前线程已经有了WaitNode对象,但是node对象还没有入队
            else if (!queued)
            /**
             * q.next = waiters把当前节点的next节点设置为原来的头结点waiter,然后把waiter指向q
             * waiter一直指向队列头结点,把当前线程node节点的next指向队列头结点,
             * 听过CAS设置waiter指向当前线程node,设置失败可能其他线程抢先设置成功
             */
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;9
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
            /**
             * 当前线程就会被park,变为WAITING
             * 除非有其他线程将你唤醒或者当前线程被中断
             */
                LockSupport.park(this);
        }
    }

这里,FutureTask的主要方法的源码基本解析完成,我们可以看到,在FutrueTask里面,在并发多线程的场景下,更改值都是通过unsafe类的CAS方式实现,并且每次操作之前都会获取最新的state值来获取当前任务的状态,因为在并发场景下,每次更改之前不获取最新值就直接进行操作的话就可能覆盖掉最新的执行结果。这个思想我们在以后的高并发场景下可以采用。

以上是关于FutureTask的主要内容,如果未能解决你的问题,请参考以下文章

并发容器J.U.C --组件FutureTaskForkJoinBlockingQueue

带你认识3个J.U.C组件扩展

带你认识3个J.U.C组件扩展

一文带你了解J.U.C的FutureTaskFork/Join框架和BlockingQueue

一文带你了解J.U.C的FutureTaskFork/Join框架和BlockingQueue