JUC系列Executor框架之FutureTask

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列Executor框架之FutureTask相关的知识,希望对你有一定的参考价值。

JUC系列Executor框架之FutureTask

JDK 版本1.8

文章目录


可取消的异步计算。 此类提供 Future 的基本实现,包括启动和取消计算、查询计算是否完成以及检索计算结果的方法。 只有在计算完成后才能检索结果; 如果计算尚未完成,get 方法将阻塞。 一旦计算完成,就不能重新开始或取消计算(除非使用 runAndReset 调用计算)。
FutureTask 可用于包装 Callable 或 Runnable 对象。 因为 FutureTask 实现了 Runnable,所以一个 FutureTask 可以提交给一个 Executor 执行。FutureTask 的线程安全由CAS来保证。

使用示例

示例一

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;

public class FutureTaskDemo<T> 

    public static void main(String[] args) throws ExecutionException, InterruptedException 
        ExecutorService executor = Executors.newCachedThreadPool();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task());
        executor.submit(futureTask);
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] result = " + futureTask.get() + ".");
        executor.shutdown();
    

    static class Task implements Callable<Integer> 
        @Override
        public Integer call() throws Exception 
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is running. ");

            return 0;
        
    

执行结果

[16:13:02--pool-1-thread-1] is running. 
[16:13:02--main] result = 0.
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class FutureTaskDemo 

    public static void main(String[] args) throws ExecutionException, InterruptedException 

        ExecutorService executor = new ThreadPoolExecutor(10
                , 256
                , 0L
                , TimeUnit.MILLISECONDS
                , new ArrayBlockingQueue<>(500)
                , new UserThreadFactory("X")
        );

        FutureTask<Integer> futureTask = new FutureTask<>(new Task(String.valueOf(1)));
        executor.submit(futureTask);
        boolean flag = true;
        while (flag) 
            if (futureTask.isDone()) 
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(System.currentTimeMillis()) + "--" + Thread.currentThread().getName() + "] result = " + futureTask.get() + ".");
                flag = false;
            
        
        executor.shutdown();
    

    static class Task implements Callable<Integer> 
        private final String name;

        Task(String name) 
            this.name = name;
        

        @Override
        public Integer call() throws Exception 
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] task:" + name + " is running.");
            try 
                Thread.sleep(2000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] task:" + name + " is completed.");
            return 2;
        
    

    public static class UserThreadFactory implements ThreadFactory 
        private final String namePrefix;
        private final AtomicInteger nextId = new AtomicInteger(1);

        UserThreadFactory(String whatFeatureOfGroup) 
            this.namePrefix = "From UserThreadFactory's " + whatFeatureOfGroup + "-Worker-";
        

        @Override
        public Thread newThread(Runnable task) 
            String name = namePrefix + nextId.getAndIncrement();
            return new Thread(null, task, name, 0);
        
    

执行结果

[19:18:05:005--From UserThreadFactory's X-Worker-1] task:1 is running.
[19:18:07:007--From UserThreadFactory's X-Worker-1] task:1 is completed.
[19:18:07:007--main] result = 2.

示例二

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;

public class FutureTaskDemo<T> 

    public static void main(String[] args) throws ExecutionException, InterruptedException 
        FutureTask<Integer> futureTask2 = new FutureTask<>(new Task());
        Thread thread = new Thread(futureTask2);
        thread.setName("Task thread");
        thread.start();
        try 
            Thread.sleep(1000);
         catch (InterruptedException e) 
            e.printStackTrace();
        

        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is running. ");

        if (!futureTask2.isDone()) 
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] Task is not done. ");
            try 
                Thread.sleep(1000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
         else 
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] Task is done. ");
        
        int result = 0;
        try 
            result = futureTask2.get();
         catch (Exception e) 
            e.printStackTrace();
        

        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] result = " + result + ".");
    

    static class Task implements Callable<Integer> 
        @Override
        public Integer call() throws Exception 
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is running. ");
            try 
                Thread.sleep(3000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            return 2;
        
    

执行结果

[16:19:30--Task thread] is running. 
[16:19:30--main] is running. 
[16:19:30--main] Task is done. 
[16:19:30--main] result = 2.

源码分析

类图

Callable接口

Callable是个泛型接口,泛型V就是要call()方法返回的类型。对比Runnable接口,Runnable不会返回数据也不能抛出异常。

@FunctionalInterface
public interface Runnable 
    public abstract void run();

Future接口

Future接口代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行。Future接口的定义如下:

public interface Future<V> 
	/** 
     * cancel()方法用来取消异步任务的执行。
     * 如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false。
     * 如果任务还没有被执行,则会返回true并且异步任务不会被执行。
     * 如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。
     */
    boolean isCancelled();

    /**
     * 判断任务是否已经完成,如果完成则返回true,否则返回false。
     * 需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。
     */
    boolean isDone();

    /**
     * 获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。
     * 如果任务被取消则会抛出CancellationException异常,
     * 如果任务执行过程发生异常则会抛出ExecutionException异常,
     * 如果阻塞等待过程中被中断则会抛出InterruptedException异常。
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 带超时时间的get()版本,如果阻塞等待过程中超时则会抛出TimeoutException异常。
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

RunnableFuture接口

将此 Future 设置为其计算结果,除非它已被取消。

public interface RunnableFuture<V> extends Runnable, Future<V> 
    void run();

内部类WaitNode

一个单向的链表结构,用来表示排队的线程。

static final class WaitNode 
    // 当前线程
    volatile Thread thread;
    // 下一个等待结点
    volatile WaitNode next;
    WaitNode()  thread = Thread.currentThread(); 

成员变量

    // 内部持有的callable任务,运行完毕后置空
	private Callable<V> callable;
	// 从get()中返回的结果或抛出的异常
    private Object outcome;
	// 执行callable的线程
    private volatile Thread runner;
	// 使用Treiber栈保存等待线程
    private volatile WaitNode waiters;

任务状态

使用一个volatile修饰的int型变量state作为任务状态,只要有任何一个线程修改了这个变量,那么其他所有的线程都会知道最新的值。

	// 任务状态
	private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

NEW:表示是个新的任务或者还没被执行完的任务。这是初始状态。

COMPLETING:任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态。

NORMAL:任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。

EXCEPTIONAL:任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。

CANCELLED:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。

INTERRUPTING: 任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中间状态。

INTERRUPTED:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。 有一点需要注意的是,所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。

状态关系图

构造函数

这个构造函数会把传入的Callable变量保存在this.callable字段中,该字段定义为private Callable<V> callable;用来保存底层的调用,在被执行完成以后会指向null,接着会初始化state字段为NEW。

public FutureTask(Callable<V> callable) 
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable

这个构造函数会把传入的Runnable封装成一个Callable对象保存在callable字段中,同时如果任务执行成功的话就会返回传入的result。这种情况下如果不需要返回值的话可以传入一个null。 Executors.callable(runnable, result)方法是通过适配器模式将Runnable转换成了Callable。

public FutureTask(Runnable runnable, V result) 
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable

Executors类下的方法和内部类

public static <T> Callable<T> callable(Runnable task, T result) 
    if (task == JUC系列Executor框架之概览

JUC系列Executor框架之CompletionFuture

JUC系列Executor框架之线程池执行器

Java 并发之 Executor 框架

Java 并发之 Executor 框架

Java并发专题之十juc-locks之线程池框架概述