解锁Future新姿势:CompletableFuture初探
Posted 沛沛老爹
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了解锁Future新姿势:CompletableFuture初探相关的知识,希望对你有一定的参考价值。
CompletableFuture作用
官方解释
CompletableFuture可以显式完成(设置其值和状态),并且可以用作 aCompletionStage,支持在完成时触发的相关功能和动作。
当两个或多个线程尝试 complete、 completeExceptionally或 cancel CompletableFuture 时,只有其中一个成功。
除了这些以及直接操作状态和结果的相关方法之外,CompletableFuture 还实现CompletionStage了与以下策略的接口:
- 为非异步方法的依赖完成提供的操作 可以由完成当前 CompletableFuture 的线程执行,或者由完成方法的任何其他调用者执行。
- 所有没有显式 Executor 参数的异步方法都使用 来执行ForkJoinPool.commonPool() (除非它不支持至少两个并行级别,在这种情况下,会创建一个新线程来运行每个任务)。为了简化监控、调试和跟踪,所有生成的异步任务都是标记接口的实例CompletableFuture.AsynchronousCompletionTask。
- 所有 CompletionStage 方法都是独立于其他公共方法实现的,因此一种方法的行为不会受到子类中其他方法的覆盖的影响。
CompletableFuture 还
使用了Future以下策略实现:
- 由于(不同于FutureTask)此类无法直接控制导致其完成的计算,因此取消被视为另一种形式的异常完成。方法cancel与 的效果相同
completeExceptionally(new CancellationException())
。方法 isCompletedExceptionally()可用于确定 CompletableFuture 是否以任何异常方式完成。 - 如果出现CompletionException完成异常,方法get()并get(long, TimeUnit)抛出 ExecutionException 与对应的 CompletionException 中相同的原因。为了在大多数情况下简化使用,此类还定义了方法join(),并且 getNow(T)在这些情况下直接抛出CompletionException。
简单一句话来讲,就是它拥有Future的所有姿势,并且还有自己的绝活。
源码
先简单扫描下源码。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
...
/* ------------- public methods -------------- */
public CompletableFuture()
CompletableFuture(Object r)
this.result = r;
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
return asyncSupplyStage(ASYNC_POOL, supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor)
return asyncSupplyStage(screenExecutor(executor), supplier);
public static CompletableFuture<Void> runAsync(Runnable runnable)
return asyncRunStage(ASYNC_POOL, runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor)
return asyncRunStage(screenExecutor(executor), runnable);
public static <U> CompletableFuture<U> completedFuture(U value)
return new CompletableFuture<U>((value == null) ? NIL : value);
public boolean isDone()
return result != null;
@SuppressWarnings("unchecked")
public T get() throws InterruptedException, ExecutionException
Object r;
if ((r = result) == null)
r = waitingGet(true);
return (T) reportGet(r);
@SuppressWarnings("unchecked")
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException
long nanos = unit.toNanos(timeout);
Object r;
if ((r = result) == null)
r = timedGet(nanos);
return (T) reportGet(r);
@SuppressWarnings("unchecked")
public T join()
Object r;
if ((r = result) == null)
r = waitingGet(false);
return (T) reportJoin(r);
@SuppressWarnings("unchecked")
public T getNow(T valueIfAbsent)
Object r;
return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r);
public boolean complete(T value)
boolean triggered = completeValue(value);
postComplete();
return triggered;
public boolean completeExceptionally(Throwable ex)
if (ex == null) throw new NullPointerException();
boolean triggered = internalComplete(new AltResult(ex));
postComplete();
return triggered;
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn)
return uniApplyStage(null, fn);
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn)
return uniApplyStage(defaultExecutor(), fn);
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor)
return uniApplyStage(screenExecutor(executor), fn);
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
return uniAcceptStage(null, action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
return uniAcceptStage(defaultExecutor(), action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor)
return uniAcceptStage(screenExecutor(executor), action);
public CompletableFuture<Void> thenRun(Runnable action)
return uniRunStage(null, action);
public CompletableFuture<Void> thenRunAsync(Runnable action)
return uniRunStage(defaultExecutor(), action);
public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor)
return uniRunStage(screenExecutor(executor), action);
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
return biApplyStage(null, other, fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
return biApplyStage(defaultExecutor(), other, fn);
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor)
return biApplyStage(screenExecutor(executor), other, fn);
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
return biAcceptStage(null, other, action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
return biAcceptStage(defaultExecutor(), other, action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor)
return biAcceptStage(screenExecutor(executor), other, action);
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action)
return biRunStage(null, other, action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action)
return biRunStage(defaultExecutor(), other, action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action,
Executor executor)
return biRunStage(screenExecutor(executor), other, action);
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn)
return orApplyStage(null, other, fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn)
return orApplyStage(defaultExecutor(), other, fn);
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor)
return orApplyStage(screenExecutor(executor), other, fn);
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action)
return orAcceptStage(null, other, action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action)
return orAcceptStage(defaultExecutor(), other, action);
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor)
return orAcceptStage(screenExecutor(executor), other, action);
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action)
return orRunStage(null, other, action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action)
return orRunStage(defaultExecutor(), other, action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action,
Executor executor)
return orRunStage(screenExecutor(executor), other, action);
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn)
return uniComposeStage(null, fn);
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn)
return uniComposeStage(defaultExecutor(), fn);
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor)
return uniComposeStage(screenExecutor(executor), fn);
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action)
return uniWhenCompleteStage(null, action);
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action)
return uniWhenCompleteStage(defaultExecutor(), action);
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor)
return uniWhenCompleteStage(screenExecutor(executor), action);
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn)
return uniHandleStage(null, fn);
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn)
return uniHandleStage(defaultExecutor(), fn);
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
return uniHandleStage(screenExecutor(executor), fn);
/**
* Returns this CompletableFuture.
*
* @return this CompletableFuture
*/
public CompletableFuture<T> toCompletableFuture()
return this;
// not in interface CompletionStage
/**
* Returns a new CompletableFuture that is completed when this
* CompletableFuture completes, with the result of the given
* function of the exception triggering this CompletableFuture's
* completion when it completes exceptionally; otherwise, if this
* CompletableFuture completes normally, then the returned
* CompletableFuture also completes normally with the same value.
* Note: More flexible versions of this functionality are
* available using methods @code whenComplete and @code handle.
*
* @param fn the function to use to compute the value of the
* returned CompletableFuture if this CompletableFuture completed
* exceptionally
* @return the new CompletableFuture
*/
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn)
return uniExceptionallyStage(fn);
/* ------------- Arbitrary-arity constructions -------------- */
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
return andTree(cfs, 0, cfs.length - 1);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
int n; Object r;
if ((n = cfs.length) <= 1)
return (n == 0)
? new CompletableFuture<Object>()
: uniCopyStage(cfs[0]);
for (CompletableFuture<?> cf : cfs)
if ((r = cf.result) != null)
return new CompletableFuture<Object>(encodeRelay(r));
cfs = cfs.clone();
CompletableFuture<Object> d = new CompletableFuture<>();
for (CompletableFuture<?> cf : cfs)
cf.unipush(new AnyOf(d, cf, cfs));
if (d.result != null)
for (int i = 0, len = cfs.length; i < len; i++)
if (cfs[i].result != null)
for (i++; i < len; i++)
if (cfs[i].result == null)
cfs[i].cleanStack();
return d;
/* ------------- Control and status methods -------------- */
。。。
我们可以看到它实现了Future。所以一般的Future的姿势,它都有。
使用场景
一般情况下,使用Future都和计算类的有关。
1、聚合/汇总计算,多线程计算完成后,把所有计算的结果一起相加后输出。
2、异步编程。不需要等待当前操作结果。例如日志输入,唤起第三方(不对结果进行负责)
常用方法
在这里我们看下公共方法,其他的方法例如状态控制相关方法,因为用的比较少,暂不考虑。
如果有兴趣的话,可以看对应的方法。
get
获取返回结果用的。这个方法是最基本的方法。
因为所有的CompletableFuture 的方法返回的是CompletableFuture<U> 对象,如果你想要获得对应的U对象值,就必须使用get方法,和使用Stream流中的get原理是一样的。
runAsync
异步操作运行完成后,CompletableFuture 没有返回值。因为它的返回值是void。
如果你要想获得对应的返回参数信息,就需要使用supplyAsync
supplyAsync
带返回值的异步操作。
例如,您需要获取当前用户的消息数。
CompletableFuture<Integer> futureMessageNum = CompletableFuture.supplyAsync(() ->
getMessageNumByUId(uid)
);
因为这里只有一行代码,所以直接省略了和return。
你也可以这样写
CompletableFuture<Integer> futureMessageNum = CompletableFuture.supplyAsync(() ->
return getMessageNumByUId(uid);
);
allOf
这个方法就是用来聚合的,所有的异步操作全部完成之后,它才会完成。
所以一般情况下,如果我们希望返回多个异步操作结果的时候,我们都会使用allOf函数来进行聚合。
这个方法没有返回类型。
CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
例如如果有多个异步计算。
//消息数目
CompletableFuture<Integer> futureMessageNum =...
//粉丝数
CompletableFuture<Integer> futureFansNum =...
//积分
CompletableFuture<Integer> futureIntegral=...
//文章数
CompletableFuture<Integer> futureArticleNum =...
我们需要把所有的信息都赋值到对应的用户输出信息对象UserInfo中,然后给到前端调用。
因为走的是异步,我们没有办法保证所有方法同一时间完成。
所以,就需要allOf方法来保证所有异步的全部完成,然后再将异步计算结果赋值给用户对象对应的属性。
CompletableFuture<void> allFuture = CompletableFuture.allOf(futureMessageNum,futureFansNum,futureIntegral,futureArticleNum);
CompletableFuture<void> allFuture = CompletableFuture.allOf(futureMessageNum,futureFansNum,futureIntegral,futureArticleNum);
它必须等到futureMessageNum,futureFansNum,futureIntegral,futureArticleNum全部执行完成。
因为这个特性。我们可以在执行完当前代码后。
然后使用get方法,将对应的值赋值到UserInfo对应的属性。
测试代码
全部测试代码如下
@Data
public class UserInfo
private Integer uid;
private String name;
private Integer messageNum;
private Integer fansNum;
private Integer integral;
private Integer articleNum;
...
...
@Overid
public UserInfo getUserInfoById(Integer id)
UserInfo userInfo = new UserInfo();
//消息数目
CompletableFuture<Integer> futureMessageNum = CompletableFuture.supplyAsync(() ->
getMessageNumByUId(uid)
);
//粉丝数
CompletableFuture<Integer> futureFansNum = CompletableFuture.supplyAsync(() ->
getFansNumByUId(uid)
);
//积分
CompletableFuture<Integer> futureIntegral= CompletableFuture.supplyAsync(() ->
getIntegralByUId(uid)
);
//文章数
CompletableFuture<Integer> futureArticleNum = CompletableFuture.supplyAsync(() ->
getArticleNumByUId(uid)
);
CompletableFuture<void> allFuture= CompletableFuture.allOf(futureMessageNum,futureFansNum,futureIntegral,futureArticleNum);
try //get方法有异常抛出,所以需要try
userInfo.setMessageNum(futureMessageNum.get());
userInfo.setFansNum(futureFansNum.get());
userInfo.setIntegral(futureIntegral.get());
userInfo.setArticlNum(futureArticleNum.get());
。。。
completedFuture
将传入的对象转换为completedFuture对象
join
聚合。如果上面所有的返回都是一样的值,就可以使用join函数将他们放到一起。
这个和ForkJoinTask差不多。
thenApply
接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。
thenCompose
thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。
总结
这个是1.8之后推出来的功能。
1.8之前的版本就不要想了。
最好的办法就是自己代码跑跑看,跑的时候你还能解锁更多的宫那个方法
以上是关于解锁Future新姿势:CompletableFuture初探的主要内容,如果未能解决你的问题,请参考以下文章
Android实战——RxJava2+Retrofit+RxBinding解锁各种新姿势