CompletableFuture入门
Posted 热爱编程的大忽悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CompletableFuture入门相关的知识,希望对你有一定的参考价值。
CompletableFuture入门
简单介绍
CompletableFuture
同时实现了 Future
和 CompletionStage
接口。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
CompletableFuture
除了提供了更为好用和强大的 Future
特性之外,还提供了函数式编程的能力。
Future
接口有 5 个方法:
boolean cancel(boolean mayInterruptIfRunning)
:尝试取消执行任务。boolean isCancelled()
:判断任务是否被取消。boolean isDone()
: 判断任务是否已经被执行完成。get()
:等待任务执行完成并获取运算结果。get(long timeout, TimeUnit unit)
:多了一个超时时间。
CompletionStage<T>
接口中的方法比较多,CompletableFuture
的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其大量使用了 Java8 引入的函数式编程。
由于方法众多,所以这里不能一一讲解,下文中我会介绍大部分常见方法的使用。
常见操作
创建 CompletableFuture
常见的创建 CompletableFuture
对象的方法如下:
- 通过 new 关键字。
- 基于
CompletableFuture
自带的静态工厂方法:runAsync()
、supplyAsync()
。
new 关键字
通过 new 关键字创建 CompletableFuture
对象这种使用方式可以看作是将 CompletableFuture
当做 Future
来使用。
我在我的开源项目 guide-rpc-framework 中就是这种方式创建的 CompletableFuture
对象。
下面咱们来看一个简单的案例。
我们通过创建了一个结果值类型为 RpcResponse<Object>
的 CompletableFuture
,你可以把 resultFuture
看作是异步运算结果的载体。
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
假设在未来的某个时刻,我们得到了最终的结果。这时,我们可以调用 complete()
方法为其传入结果,这表示 resultFuture
已经被完成了。
// complete() 方法只能调用一次,后续调用将被忽略。
resultFuture.complete(rpcResponse);
你可以通过 isDone()
方法来检查是否已经完成。
public boolean isDone()
return result != null;
获取异步计算的结果也非常简单,直接调用 get()
方法即可。调用 get()
方法的线程会阻塞直到 CompletableFuture
完成运算。
rpcResponse = completableFuture.get();
如果你已经知道计算的结果的话,可以使用静态方法 completedFuture()
来创建 CompletableFuture
。
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!");
assertEquals("hello!", future.get());
completedFuture()
方法底层调用的是带参数的 new 方法,只不过,这个方法不对外暴露。
public static <U> CompletableFuture<U> completedFuture(U value)
return new CompletableFuture<U>((value == null) ? NIL : value);
静态工厂方法
这两个方法可以帮助我们封装计算逻辑。
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 使用自定义线程池(推荐)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
// 使用自定义线程池(推荐)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
runAsync()
方法接受的参数是 Runnable
,这是一个函数式接口,不允许返回值。当你需要异步操作且不关心返回结果的时候可以使用 runAsync()
方法。
@FunctionalInterface
public interface Runnable
public abstract void run();
supplyAsync()
方法接受的参数是 Supplier<U>
,这也是一个函数式接口,U
是返回结果值的类型。
@FunctionalInterface
public interface Supplier<T>
/**
* Gets a result.
*
* @return a result
*/
T get();
当你需要异步操作且关心返回结果的时候,可以使用 supplyAsync()
方法。
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("hello!"));
future.get();// 输出 "hello!"
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "hello!");
assertEquals("hello!", future2.get());
处理异步结算的结果
当我们获取到异步计算的结果之后,还可以对其进行进一步的处理,比较常用的方法有下面几个:
thenApply()
thenAccept()
thenRun()
whenComplete()
thenApply()
方法接受一个 Function
实例,用它来处理结果。
// 沿用上一个任务的线程池
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn)
return uniApplyStage(null, fn);
//使用默认的 ForkJoinPool 线程池(不推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn)
return uniApplyStage(defaultExecutor(), fn);
// 使用自定义线程池(推荐)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor)
return uniApplyStage(screenExecutor(executor), fn);
thenApply()
方法使用示例如下:
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!")
.thenApply(s -> s + "world!");
assertEquals("hello!world!", future.get());
// 这次调用将被忽略。
future.thenApply(s -> s + "nice!");
assertEquals("hello!world!", future.get());
你还可以进行 流式调用:
CompletableFuture<String> future = CompletableFuture.completedFuture("hello!")
.thenApply(s -> s + "world!").thenApply(s -> s + "nice!");
assertEquals("hello!world!nice!", future.get());
如果你不需要从回调函数中获取返回结果,可以使用 thenAccept()
或者 thenRun()
。这两个方法的区别在于 thenRun()
不能访问异步计算的结果。
thenAccept()
方法的参数是 Consumer<? super T>
。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
return uniAcceptStage(null, action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
return uniAcceptStage(defaultExecutor(), action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor)
return uniAcceptStage(screenExecutor(executor), action);
顾名思义,Consumer
属于消费型接口,它可以接收 1 个输入对象然后进行“消费”。
@FunctionalInterface
public interface Consumer<T>
void accept(T t);
default Consumer<T> andThen(Consumer<? super T> after)
Objects.requireNonNull(after);
return (T t) -> accept(t); after.accept(t); ;
thenRun()
的方法是的参数是 Runnable
。
public CompletableFuture<Void> thenRun(Runnable action)
return uniRunStage(null, action);
public CompletableFuture<Void> thenRunAsync(Runnable action)
return uniRunStage(defaultExecutor(), action);
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor)
return uniRunStage(screenExecutor(executor), action);
thenAccept()
和 thenRun()
使用示例如下:
CompletableFuture.completedFuture("hello!")
.thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenAccept(System.out::println);//hello!world!nice!
CompletableFuture.completedFuture("hello!")
.thenApply(s -> s + "world!").thenApply(s -> s + "nice!").thenRun(() -> System.out.println("hello!"));//hello!
whenComplete()
的方法的参数是 BiConsumer<? super T, ? super Throwable>
。
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action)
return uniWhenCompleteStage(null, action);
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action)
return uniWhenCompleteStage(defaultExecutor(), action);
// 使用自定义线程池(推荐)
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor)
return uniWhenCompleteStage(screenExecutor(executor), action);
相对于 Consumer
, BiConsumer
可以接收 2 个输入对象然后进行“消费”。
@FunctionalInterface
public interface BiConsumer<T, U>
void accept(T t, U u);
default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after)
Objects.requireNonNull(after);
return (l, r) ->
accept(l, r);
after.accept(l, r);
;
whenComplete()
使用示例如下:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello!")
.whenComplete((res, ex) ->
// res 代表返回的结果
// ex 的类型为 Throwable ,代表抛出的异常
System.out.println(res);
// 这里没有抛出异常所有为 null
assertNull(ex);
);
assertEquals("hello!", future.get());
异常处理
你可以通过 handle()
方法来处理任务执行过程中可能出现的抛出异常的情况。
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn)
return uniHandleStage(null, fn);
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn)
return uniHandleStage(defaultExecutor(), fn);
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
return uniHandleStage(screenExecutor(executor), fn);
示例代码如下:
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() ->
if (true)
throw new RuntimeException("Computation error!");
return "hello!";
).handle((res, ex) ->
// res 代表返回的结果
// ex 的类型为 Throwable ,代表抛出的异常
return res != null ? res : "world!";
);
assertEquals("world!", future.get());
你还可以通过 exceptionally()
方法来处理异常情况。
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() ->
if (true)
throw new RuntimeException("Computation error!");
return "hello!";
).exceptionally(ex ->
System.out.println(ex.toString());// CompletionException
return "world!";
);
assertEquals("world!", future.get());
如果你想让 CompletableFuture
的结果就是异常的话,可以使用 completeExceptionally()
方法为其赋值。
CompletableFuture<String> completableFuture = new CompletableFuture<>();
// ...
completableFuture.completeExceptionally(
new RuntimeException("Calculation failed!"));
// ...
completableFuture.get(); // ExecutionException
组合 CompletableFuture
你可以使用 thenCompose()
按顺序链接两个 CompletableFuture
对象。
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn)
return uniComposeStage(null, fn);
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn)
return uniComposeStage(defaultExecutor(), fn);
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? 以上是关于CompletableFuture入门的主要内容,如果未能解决你的问题,请参考以下文章