CompletableFuture 异步超时 和取消

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CompletableFuture 异步超时 和取消相关的知识,希望对你有一定的参考价值。

参考技术A 通常我们都是使用阻塞方法的时候等待超时,比如 Future.get();
有一个case,比如一个List<Task>,比如10个,想要整体的任务在2s内完成,否则就超时,
超时的时候,正确的做法最好是立马停止运行中的线程任务;
FutureTask 有一个 canclel(true/false)的方法;
参考: https://www.jianshu.com/p/9fc446c2c1be
Runnning的线程,在执行CPU是无法取消的,只有blocked/new/Runnable的线程,才可以尝试取消;
为什么说尝试,因为调用的是interrupt方法,只会对 sleep,wait,join等方法有效;
会设置interrupted标志位;所以想要一个可以cancell的task,需要更多的设计细节;

从目前来看,competableFuture 底层使用forkjoinPool,cancel方法是无效的;
所以想要interrupt线程,还是需要用futureTask;

另外,使用completableFuture,可以实现异步超时, jdk9,已经有原生的实现,但是在jdk8,需要自己做类似下面的实现,
需要利用applyToEigther的特性;

异步回调CompletableFuture

异步回调CompletableFuture

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

CompletableFuture作用类似JavaScript的Promise。

主要功能

静态方法:

  • CompletableFuture.supplyAsync(supplier):用指定的supplier创建一个有返回值的异步回调,异步回调的结果是supplier的计算结果。默认使用ForkJoinPool.commonPool() 线程池。

  • CompletableFuture.supplyAsync(supplier,executor):同上,不过使用指定线程池。

  • CompletableFuture.runAsync(runnable):创建一个没有返回值,即泛型为Void返回值为null的异步回调,其回调函数的入参为null。默认使用ForkJoinPool.commonPool()线程池。

  • CompletableFuture.runAsync(runnable,executor):同上,不过使用指定的线程池。

  • CompletableFuture.completedFuture(U value):用给定的值创建一个已经计算完成的异步回调,值为value。

  • CompletableFuture.allOf(completableFutures):返回一个新的异步回调,会在所有异步回调计算完成时完成。新completableFuture的值始终为null。如果有一个任务抛异常了,则新回调也会抛异常。如果completableFutures为空数组,则回调直接完成,值为null。

  • CompletableFuture.allOf(completableFutures):返回一个新的异步回调,会在任何一个异步回调计算完成时完成。新completableFuture的值为完成的那个异步回调的值。如果完成的那个异步回调抛异常了,则新的异步回调也会抛异常。如果completableFutures为空数组,则返回一个不会完成的completableFuture,该completableFuture的get()方法将一直阻塞。

实例方法:

  • complete(T value):在任务未结束时,使用value值作为任务执行的结果,直接结束任务,成功返回true。在任务执行结束之后执行此方法会返回false且不会改变结果。

  • completeExceptionally(Throwable ex):在任务未结束时,用给定的异常使任务异常结束,成功返回true,get()直接抛异常。在任务执行结束之后执行此方法会返回false且不会改变结果。

每个实例方法都对应三种实现:xxx(callback)xxxAsync(callback)xxxAsync(callback,executor)。以下代码测试了三种方法的区别。这种区别不适用于静态方法。

/**
 * 测试 xxx、xxxAsync 和 xxxAsync(callback,executor) 的区别。
 * 每次调用方法都会打印执行的线程名。
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        ExecutorService executorService  = MyThreadPoolFactory.getThreadPool();
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->"supplyAsync:"+Thread.currentThread().getName() );
        completableFuture.
                thenApply(t-> "thenApply:"+Thread.currentThread().getName() +"    "+ t).
                thenAccept(t-> System.out.println("thenAccept:"+Thread.currentThread().getName() +"    "+ t));
        completableFuture.
                thenApplyAsync(t-> "thenApplyAsync:"+Thread.currentThread().getName() +"    "+ t).
                thenAcceptAsync(t-> System.out.println("thenAcceptAsync:"+Thread.currentThread().getName() +"    "+ t));
        completableFuture.
                thenApplyAsync(t-> "thenApplyAsync+pool:"+Thread.currentThread().getName() +"    "+ t,executorService).
                thenAcceptAsync(t-> System.out.println("thenAcceptAsync+pool:"+Thread.currentThread().getName() +"    "+ t),executorService);
    }
}
//输出:
//thenAccept:main    thenApply:main    supplyAsync:ForkJoinPool.commonPool-worker-9
//thenAcceptAsync:ForkJoinPool.commonPool-worker-9    thenApplyAsync:ForkJoinPool.commonPool-worker-9    supplyAsync:ForkJoinPool.commonPool-worker-9
//thenAcceptAsync+pool:P0T1    thenApplyAsync+pool:P0T0    supplyAsync:ForkJoinPool.commonPool-worker-9

由上面的代码可以看出:

  • xxx(callback):用调用方法的线程执行回调函数callback。
  • xxxAsync(callback):用CompletableFuture内部的线程池调用回调函数callback。
  • xxxAsync(callback,executor):用给定的线程池executor调用回调函数callback。

上面的特点适用于以下方法:除了handle和whenComplete都是在任务正常处理完成没有抛异常时执行。

  • handle(BiFunction<T,Throwable,U> fn):对任务的执行结果进行处理,T为执行结果,Throwable是抛出的异常,U为返回值。
  • whenComplete(BiConsumer<T,Throwable> action):当异步回调任务执行完成时,调用BiConsumer函数,并返回新的CompletableFuture。action消费两个参数,第一个参数是任务的执行结果,第二个参数是任务抛出的异常。
  • thenApply(Function<T,U> fn):当任务处理完成后,对结果进行处理,并返回新的CompletableFuture。fn接收一个参数T,返回另一个参数U,两个参数的类型需要通过泛型指定。
  • thenAccept(Consumer<T> action):当任务处理完成后,对结果进行处理,并返回新的CompletableFuture
  • thenAcceptBoth(CompletionStage<U> other,BiConsumer<T,U> action): 同时处理两个任务的执行结果,并返回新的CompletableFuture
  • thenCombine(CompletionStage<U> other,BiFunction<T,U, V> fn):相当于两个completableFuture版的thenApply(Function<T,U> fn),会返回新的CompletableFuture
  • thenCompose(Function<T,CompletionStage<U>> fn):用当前任务的执行结果,构成一个新的completableFuture。相当于thenApply(Function<T,CompletionStage<U>> fn)
  • thenRun(Runnable action):当前任务执行完成之后执行action。
  • acceptEither(CompletionStage<T> other, Consumer<T> action):消费掉任何一个先完成的任务的结果。
  • applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn):处理任何一个先完成的任务的结果。
  • runAfterBoth(CompletionStage<?> other,Runnable action):在两个任务都完成之后执行action
  • runAfterEither(CompletionStage<?> other,Runnable action):在任何一个完成之后执行action。

总结:

  • thenxxx和acceptEither、applyToEither、runAfterBoth、runAfterEither都只在相关任务正常结束而非异常结束时执行回调。
  • 以上提到的这些方法除了complete和completeExceptional,都是返回新的CompletableFuture实例,因此都可以链式调用。

以上是关于CompletableFuture 异步超时 和取消的主要内容,如果未能解决你的问题,请参考以下文章

异步回调CompletableFuture

如何将异步CompletableFuture与完成的CompletableFuture结合起来?

理解Java8里面CompletableFuture异步编程

13_CompletableFuture异步回调

13_CompletableFuture异步回调

线程异步编排串行(CompletableFuture)