Java多线程系列--掌握Future,轻松获取异步任务结果

Posted leobert_lan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程系列--掌握Future,轻松获取异步任务结果相关的知识,希望对你有一定的参考价值。

前言

最近因为一些个人原因,未能抽出太多精力更新Java多线程系列,一搁置就是好几个月,先向读者诸君致歉。

在本系列的其他文章中,已经提到过线程之间的相互协作, 通过分工,将程序系统的不同任务进行线程分离,充分利用机器性能、提升特定线程的利用率和程序的体验感。

详见拙作:Java多线程基础–线程生命周期与线程协作详解.

并在线程池相关文章中提到:作为程序构建者,我们更关心线程(组)的特性和它们所执行的任务,并不愿意分心去做线程操作。

详见拙作:Java多线程基础–线程的创建与线程池管理

然而实际开发中,我们同样关心一个任务对程序系统产生的影响,习惯上称之为任务的的执行结果

Runnable的局限性

在前文中我们谈到,通过编码实现Runnable接口,将获得具有边界性的 “任务”,在指定的线程(或者线程池)中运行。

重新观察该接口,不难发现它并没有方法返回值:

public interface Runnable 
    void run();

在JDK1.5之前,想利用任务的执行结果,需要小心的操作线程访问临界区资源。使用 回调 进行解耦是非常不错的选择。

练手小Demo – 回顾既往文章知识

注意,为了减少篇幅使用了lambda,但jdk1.5之前并不支持lambda

将计算任务分离到其他线程执行,再回到主线程消费结果

我们将计算、IO等耗时任务丢到其他线程,让主线程专注于自身业务,假想它在接受用户输入以及处理反馈,但我们略去这一部分

我们可以设计出类似下面的代码:

虽然它还有很多不合理之处值得优化,但也足以用于演示

class Demo 
    static final Object queueLock = new Object();
    static List<Runnable> mainQueue = new ArrayList<>();
    static boolean running = true;

    static final Runnable FINISH = () -> running = false;

    public static void main(String[] args) 
        synchronized (queueLock) 
            mainQueue.add(Demo::onStart);
        
        while (running) 
            Runnable runnable = null;
            synchronized (queueLock) 
                if (!mainQueue.isEmpty())
                    runnable = mainQueue.remove(0);
            
            if (runnable != null) 
                runnable.run();
            
            Thread.yield();
        
    

    public static void onStart() 
        //...
    

    public static void finish() 
        synchronized (queueLock) 
            mainQueue.clear();
            mainQueue.add(FINISH);
        
    

再模拟一个计算的线程和任务回调:

interface Callback 
    void onResultCalculated(int result);


class CalcThread extends Thread 

    private final Callback callback;

    private final int a;

    private final int b;

    public CalcThread(Callback callback, int a, int b) 
        this.callback = callback;
        this.a = a;
        this.b = b;
    

    @Override
    public void run() 
        super.run();
        try 
            Thread.sleep(10);
         catch (InterruptedException e) 
            e.printStackTrace();
        
        final int result = a + b;
        System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());

        synchronized (queueLock) 
            mainQueue.add(() -> callback.onResultCalculated(result));
        
    

填充一下onStart业务:

class Demo 
    public static void onStart() 
        System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());

        new CalcThread(result -> 
            System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis());
            finish();
        , 200, 300).start();

    

复习:优化为使用Runnable

在前文我们提到,如果业务仅关注任务的执行,并不过于关心线程本身,则可以利用Runnable:

class Demo 
    static class CalcRunnable implements Runnable 

        private final Callback callback;

        private final int a;

        private final int b;

        public CalcRunnable(Callback callback, int a, int b) 
            this.callback = callback;
            this.a = a;
            this.b = b;
        

        @Override
        public void run() 
            try 
                Thread.sleep(10);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            final int result = a + b;
            System.out.println("threadId" + Thread.currentThread().getId() + ",calc result:" + result + ";" + System.currentTimeMillis());

            synchronized (queueLock) 
                mainQueue.add(() -> callback.onResultCalculated(result));
            
        
    

    public static void onStart() 
        System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());

        new Thread(new CalcRunnable(result -> 
            System.out.println("threadId" + Thread.currentThread().getId() + ",onResultCalculated:" + result + ";" + System.currentTimeMillis());
            finish();
        , 200, 300)).start();

    

不难想象出:我们非常需要

  • 让特定线程、特定类型的线程方便地接收任务,回顾本系列文章中的 线程池篇 ,线程池是应运而生
  • 拥有比Synchronize更轻量的机制
  • 拥有更方便的数据结构

至此,我们可以体会到:JDK1.5之前,因为JDK的功能不足,Java程序对于线程的使用 较为粗糙

为异步而生的Future

终于在JDK1.5中,迎来了新特性: Future 以及先前文章中提到的线程池, 时光荏苒,一晃将近20年了

/**
 * 略
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this Future's @code get method
 */
public interface Future<V> 

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;

尽管已经移除了API注释,但仍然能够理解每个API的含义,不多做赘述。

显而易见,为了增加返回值,没有必要用如此复杂的 接口来替代 Runnable。简单思考后可以对返回值的情况进行归纳:

  • 返回Runnable中业务的结果,例如计算、读取资源等
  • 单纯的在Runnable执行完毕后返回一个结果

从业务层上看,仅需要如下接口即可,它增加了返回值、并可以更友好地让使用者处理异常:

作者按:抛开底层实现,仅看业务方编码需要

public interface Callable<V> 
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     * 防盗戳 leobert-lan https://juejin.cn/user/2066737589654327
     */
    V call() throws Exception;

显然,JDK需要提供后向兼容能力:

  • Runnable 不能够丢弃,也不应当丢弃
  • 不能要求使用者完全的重构代码

所以一并提供了适配器,让使用者进行简单的局部重构即可用上新特性

static final class RunnableAdapter<T> implements Callable<T> 
    final Runnable task;
    final T result;

    RunnableAdapter(Runnable task, T result) 
        this.task = task;
        this.result = result;
    

    public T call() 
        task.run();
        return result;
    

而Future恰如其名,它代表了在 “未来” 的一个结果和状态,为了更方便地处理异步而生。

并且内置了 FutureTask,在 FutureTask详解 章节中再行展开。

类图

在JDK1.8的基础上,看一下精简的类图结构:

FutureTask详解

构造函数

public class FutureTask 
    public FutureTask(Callable<V> callable) 
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    

    public FutureTask(Runnable runnable, V result) 
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    

生命周期

public class FutureTask 
    //新建
    private static final int NEW = 0;

    //处理中
    private static final int COMPLETING = 1;

    //正常
    private static final int NORMAL = 2;

    //异常
    private static final int EXCEPTIONAL = 3;

    //已取消
    private static final int CANCELLED = 4;

    //中断中
    private static final int INTERRUPTING = 5;

    //已中断
    private static final int INTERRUPTED = 6;

可能的生命周期转换如下:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

JDK中原汁原味的解释如下:

The run state of this task, initially NEW. The run state
transitions to a terminal state only in methods set,
setException, and cancel. During completion, state may take on
transient values of COMPLETING (while outcome is being set) or
INTERRUPTING (only while interrupting the runner to satisfy a
cancel(true)). Transitions from these intermediate to final
states use cheaper ordered/lazy writes because values are unique
and cannot be further modified.

核心方法

本节从以下三块入手阅读源码

  • 状态判断
  • 取消
  • 获取结果

状态判断API的实现非常简单

public class FutureTask 
    public boolean isCancelled() 
        return state >= CANCELLED;
    

    public boolean isDone() 
        return state != NEW;
    

取消:

  1. 当前状态为 NEW 且 CAS修改 state 成功,否则返回取消失败
  2. 如果 mayInterruptIfRunning 则中断在执行的线程并CAS修改state为INTERRUPTED
  3. 调用 finishCompletion
    1. 删除并通知所有等待的线程
    2. 调用done()
    3. 设置callable为null
public class FutureTask 
    public boolean cancel(boolean mayInterruptIfRunning) 
        if (!(state == NEW &&
                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 

            return false;
        

        try     // in case call to interrupt throws exception
            if (mayInterruptIfRunning) 
                try 
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                 finally  // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                
            
         finally 
            finishCompletion();
        
        return true;
    

    private void finishCompletion() 
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null; ) 
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) 
                for (; ; ) 
                    Thread t = q.thread;
                    if (t != null) 
                        q.thread = null;
                        LockSupport.unpark(t);
                    
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                
                break;
            
        

        done();

        callable = null;        // to reduce footprint
    

获取结果: 先判断状态,如果未进入到 COMPLETING(即为NEW状态),则阻塞等待状态改变,返回结果或抛出异常

public class FutureTask 
    public V get() throws InterruptedException, ExecutionException 
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    

    public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException 
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    

    private V report(int s) throws ExecutionException 
        Object x = outcome;
        if (s == NORMAL)
            return (V) x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable) x);
    

如何使用

而使用则非常简单,也非常的朴素。

我们以文中的的例子进行改造:

  1. 沿用原Runnable逻辑
  2. 移除回调,增加 CalcResult
  3. CalcResult 对象作为既定返回结果,Runnable中设置其属性
class Demo 
   static class CalcResult 
      public int result;
   
   public static void onStart() 
      System.out.println("threadId" + Thread.currentThread().getId() + ",onStart," + System.currentTimeMillis());

      final CalcResult calcResult = new CalcResult();
      Future<CalcResult> resultFuture = Executors.newSingleThreadExecutor().submit(() -> 
         try 
            Thread.sleep(10);
          catch (InterruptedException e) 
            e.printStackTrace();
         
         final int result = 200 + 300;
         System.out.println("threadId" + Thread.currentThread().getId() Java多线程系列--掌握Future,轻松获取异步任务结果

Java多线程系列--“JUC线程池”06之 Callable和Future

Java多线程系列--“JUC线程池”06之 Callable和Future

Java程序员必须掌握的线程知识-Callable和Future

Java程序员必须掌握的线程知识-Callable和Future

Java程序员必须掌握的线程知识-Callable和Future