CompletableFuture 异步超时 和取消
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CompletableFuture 异步超时 和取消相关的知识,希望对你有一定的参考价值。
参考技术A 通常我们都是使用阻塞方法的时候等待超时,比如 Future.get();有一个case,比如一个List<Task>,比如10个,想要整体的任务在2s内完成,否则就超时,
超时的时候,正确的做法最好是立马停止运行中的线程任务;
FutureTask 有一个 canclel(true/false)的方法;
参考: https://www.jianshu.com/p/9fc446c2c1be
Runnning的线程,在执行CPU是无法取消的,只有blocked/new/Runnable的线程,才可以尝试取消;
为什么说尝试,因为调用的是interrupt方法,只会对 sleep,wait,join等方法有效;
会设置interrupted标志位;所以想要一个可以cancell的task,需要更多的设计细节;
从目前来看,competableFuture 底层使用forkjoinPool,cancel方法是无效的;
所以想要interrupt线程,还是需要用futureTask;
另外,使用completableFuture,可以实现异步超时, jdk9,已经有原生的实现,但是在jdk8,需要自己做类似下面的实现,
需要利用applyToEigther的特性;
异步回调CompletableFuture
异步回调CompletableFuture
使用Future
获得异步执行结果时,要么调用阻塞方法get()
,要么轮询看isDone()
是否为true
,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture
,它针对Future
做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
CompletableFuture
作用类似JavaScript的Promise。
主要功能
静态方法:
-
CompletableFuture.supplyAsync(supplier)
:用指定的supplier创建一个有返回值的异步回调,异步回调的结果是supplier的计算结果。默认使用ForkJoinPool.commonPool()
线程池。 -
CompletableFuture.supplyAsync(supplier,executor)
:同上,不过使用指定线程池。 -
CompletableFuture.runAsync(runnable)
:创建一个没有返回值,即泛型为Void返回值为null的异步回调,其回调函数的入参为null。默认使用ForkJoinPool.commonPool()
线程池。 -
CompletableFuture.runAsync(runnable,executor)
:同上,不过使用指定的线程池。 -
CompletableFuture.completedFuture(U value)
:用给定的值创建一个已经计算完成的异步回调,值为value。 -
CompletableFuture.allOf(completableFutures)
:返回一个新的异步回调,会在所有异步回调计算完成时完成。新completableFuture的值始终为null。如果有一个任务抛异常了,则新回调也会抛异常。如果completableFutures为空数组,则回调直接完成,值为null。 -
CompletableFuture.allOf(completableFutures)
:返回一个新的异步回调,会在任何一个异步回调计算完成时完成。新completableFuture的值为完成的那个异步回调的值。如果完成的那个异步回调抛异常了,则新的异步回调也会抛异常。如果completableFutures为空数组,则返回一个不会完成的completableFuture,该completableFuture的get()方法将一直阻塞。
实例方法:
-
complete(T value)
:在任务未结束时,使用value值作为任务执行的结果,直接结束任务,成功返回true。在任务执行结束之后执行此方法会返回false且不会改变结果。 -
completeExceptionally(Throwable ex)
:在任务未结束时,用给定的异常使任务异常结束,成功返回true,get()直接抛异常。在任务执行结束之后执行此方法会返回false且不会改变结果。
每个实例方法都对应三种实现:xxx(callback)
、xxxAsync(callback)
和xxxAsync(callback,executor)
。以下代码测试了三种方法的区别。这种区别不适用于静态方法。
/**
* 测试 xxx、xxxAsync 和 xxxAsync(callback,executor) 的区别。
* 每次调用方法都会打印执行的线程名。
*/
public class CompletableFutureDemo {
public static void main(String[] args) {
ExecutorService executorService = MyThreadPoolFactory.getThreadPool();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->"supplyAsync:"+Thread.currentThread().getName() );
completableFuture.
thenApply(t-> "thenApply:"+Thread.currentThread().getName() +" "+ t).
thenAccept(t-> System.out.println("thenAccept:"+Thread.currentThread().getName() +" "+ t));
completableFuture.
thenApplyAsync(t-> "thenApplyAsync:"+Thread.currentThread().getName() +" "+ t).
thenAcceptAsync(t-> System.out.println("thenAcceptAsync:"+Thread.currentThread().getName() +" "+ t));
completableFuture.
thenApplyAsync(t-> "thenApplyAsync+pool:"+Thread.currentThread().getName() +" "+ t,executorService).
thenAcceptAsync(t-> System.out.println("thenAcceptAsync+pool:"+Thread.currentThread().getName() +" "+ t),executorService);
}
}
//输出:
//thenAccept:main thenApply:main supplyAsync:ForkJoinPool.commonPool-worker-9
//thenAcceptAsync:ForkJoinPool.commonPool-worker-9 thenApplyAsync:ForkJoinPool.commonPool-worker-9 supplyAsync:ForkJoinPool.commonPool-worker-9
//thenAcceptAsync+pool:P0T1 thenApplyAsync+pool:P0T0 supplyAsync:ForkJoinPool.commonPool-worker-9
由上面的代码可以看出:
xxx(callback)
:用调用方法的线程执行回调函数callback。xxxAsync(callback)
:用CompletableFuture内部的线程池调用回调函数callback。xxxAsync(callback,executor)
:用给定的线程池executor调用回调函数callback。
上面的特点适用于以下方法:除了handle和whenComplete都是在任务正常处理完成没有抛异常时执行。
handle(BiFunction<T,Throwable,U> fn)
:对任务的执行结果进行处理,T为执行结果,Throwable是抛出的异常,U为返回值。whenComplete(BiConsumer<T,Throwable> action)
:当异步回调任务执行完成时,调用BiConsumer函数,并返回新的CompletableFuture。action消费两个参数,第一个参数是任务的执行结果,第二个参数是任务抛出的异常。 thenApply(Function<T,U> fn)
:当任务处理完成后,对结果进行处理,并返回新的CompletableFuture。fn接收一个参数T,返回另一个参数U,两个参数的类型需要通过泛型指定。thenAccept(Consumer<T> action)
:当任务处理完成后,对结果进行处理,并返回新的CompletableFuture。 thenAcceptBoth(CompletionStage<U> other,BiConsumer<T,U> action)
: 同时处理两个任务的执行结果,并返回新的CompletableFuture。 thenCombine(CompletionStage<U> other,BiFunction<T,U, V> fn)
:相当于两个completableFuture版的thenApply(Function<T,U> fn)
,会返回新的CompletableFuturethenCompose(Function<T,CompletionStage<U>> fn)
:用当前任务的执行结果,构成一个新的completableFuture。相当于thenApply(Function<T,CompletionStage<U>> fn)
thenRun(Runnable action)
:当前任务执行完成之后执行action。acceptEither(CompletionStage<T> other, Consumer<T> action)
:消费掉任何一个先完成的任务的结果。applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
:处理任何一个先完成的任务的结果。runAfterBoth(CompletionStage<?> other,Runnable action)
:在两个任务都完成之后执行actionrunAfterEither(CompletionStage<?> other,Runnable action)
:在任何一个完成之后执行action。
总结:
- thenxxx和acceptEither、applyToEither、runAfterBoth、runAfterEither都只在相关任务正常结束而非异常结束时执行回调。
- 以上提到的这些方法除了complete和completeExceptional,都是返回新的CompletableFuture实例,因此都可以链式调用。
以上是关于CompletableFuture 异步超时 和取消的主要内容,如果未能解决你的问题,请参考以下文章
如何将异步CompletableFuture与完成的CompletableFuture结合起来?