解锁Future新姿势:CompletableFuture初探

Posted 沛沛老爹

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了解锁Future新姿势:CompletableFuture初探相关的知识,希望对你有一定的参考价值。

CompletableFuture作用

官方解释

CompletableFuture可以显式完成(设置其值和状态),并且可以用作 aCompletionStage,支持在完成时触发的相关功能和动作。

当两个或多个线程尝试 complete、 completeExceptionally或 cancel CompletableFuture 时,只有其中一个成功。

除了这些以及直接操作状态和结果的相关方法之外,CompletableFuture 还实现CompletionStage了与以下策略的接口:

  • 为非异步方法的依赖完成提供的操作 可以由完成当前 CompletableFuture 的线程执行,或者由完成方法的任何其他调用者执行。
  • 所有没有显式 Executor 参数的异步方法都使用 来执行ForkJoinPool.commonPool() (除非它不支持至少两个并行级别,在这种情况下,会创建一个新线程来运行每个任务)。为了简化监控、调试和跟踪,所有生成的异步任务都是标记接口的实例CompletableFuture.AsynchronousCompletionTask
  • 所有 CompletionStage 方法都是独立于其他公共方法实现的,因此一种方法的行为不会受到子类中其他方法的覆盖的影响。

CompletableFuture 还
使用了
Future以下策略实现:

  • 由于(不同于FutureTask)此类无法直接控制导致其完成的计算,因此取消被视为另一种形式的异常完成。方法cancel与 的效果相同 completeExceptionally(new CancellationException())。方法 isCompletedExceptionally()可用于确定 CompletableFuture 是否以任何异常方式完成。
  • 如果出现CompletionException完成异常,方法get()get(long, TimeUnit)抛出 ExecutionException 与对应的 CompletionException 中相同的原因。为了在大多数情况下简化使用,此类还定义了方法join(),并且 getNow(T)在这些情况下直接抛出CompletionException。

简单一句话来讲,就是它拥有Future的所有姿势,并且还有自己的绝活。

源码

先简单扫描下源码。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 

...
 /* ------------- public methods -------------- */

   
    public CompletableFuture() 
    

    
    CompletableFuture(Object r) 
        this.result = r;
    

   
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 
        return asyncSupplyStage(ASYNC_POOL, supplier);
    

   
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor) 
        return asyncSupplyStage(screenExecutor(executor), supplier);
    

  
    public static CompletableFuture<Void> runAsync(Runnable runnable) 
        return asyncRunStage(ASYNC_POOL, runnable);
    

   
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) 
        return asyncRunStage(screenExecutor(executor), runnable);
    

  
    public static <U> CompletableFuture<U> completedFuture(U value) 
        return new CompletableFuture<U>((value == null) ? NIL : value);
    

    public boolean isDone() 
        return result != null;
    

    @SuppressWarnings("unchecked")
    public T get() throws InterruptedException, ExecutionException 
        Object r;
        if ((r = result) == null)
            r = waitingGet(true);
        return (T) reportGet(r);
    

    @SuppressWarnings("unchecked")
    public T get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException 
        long nanos = unit.toNanos(timeout);
        Object r;
        if ((r = result) == null)
            r = timedGet(nanos);
        return (T) reportGet(r);
    

    
    @SuppressWarnings("unchecked")
    public T join() 
        Object r;
        if ((r = result) == null)
            r = waitingGet(false);
        return (T) reportJoin(r);
    

    
    @SuppressWarnings("unchecked")
    public T getNow(T valueIfAbsent) 
        Object r;
        return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r);
    

    public boolean complete(T value) 
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    

    public boolean completeExceptionally(Throwable ex) 
        if (ex == null) throw new NullPointerException();
        boolean triggered = internalComplete(new AltResult(ex));
        postComplete();
        return triggered;
    

    public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) 
        return uniApplyStage(null, fn);
    

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) 
        return uniApplyStage(defaultExecutor(), fn);
    

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) 
        return uniApplyStage(screenExecutor(executor), fn);
    

    public CompletableFuture<Void> thenAccept(Consumer<? super T> action) 
        return uniAcceptStage(null, action);
    

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 
        return uniAcceptStage(defaultExecutor(), action);
    

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                                   Executor executor) 
        return uniAcceptStage(screenExecutor(executor), action);
    

    public CompletableFuture<Void> thenRun(Runnable action) 
        return uniRunStage(null, action);
    

    public CompletableFuture<Void> thenRunAsync(Runnable action) 
        return uniRunStage(defaultExecutor(), action);
    

    public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                Executor executor) 
        return uniRunStage(screenExecutor(executor), action);
    

    public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) 
        return biApplyStage(null, other, fn);
    

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) 
        return biApplyStage(defaultExecutor(), other, fn);
    

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) 
        return biApplyStage(screenExecutor(executor), other, fn);
    

    public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) 
        return biAcceptStage(null, other, action);
    

    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) 
        return biAcceptStage(defaultExecutor(), other, action);
    

    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor) 
        return biAcceptStage(screenExecutor(executor), other, action);
    

    public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action) 
        return biRunStage(null, other, action);
    

    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                     Runnable action) 
        return biRunStage(defaultExecutor(), other, action);
    

    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                     Runnable action,
                                                     Executor executor) 
        return biRunStage(screenExecutor(executor), other, action);
    

    public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) 
        return orApplyStage(null, other, fn);
    

    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn) 
        return orApplyStage(defaultExecutor(), other, fn);
    

    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn,
        Executor executor) 
        return orApplyStage(screenExecutor(executor), other, fn);
    

    public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action) 
        return orAcceptStage(null, other, action);
    

    public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action) 
        return orAcceptStage(defaultExecutor(), other, action);
    

    public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action,
        Executor executor) 
        return orAcceptStage(screenExecutor(executor), other, action);
    

    public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                                  Runnable action) 
        return orRunStage(null, other, action);
    

    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action) 
        return orRunStage(defaultExecutor(), other, action);
    

    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action,
                                                       Executor executor) 
        return orRunStage(screenExecutor(executor), other, action);
    

    public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) 
        return uniComposeStage(null, fn);
    

    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn) 
        return uniComposeStage(defaultExecutor(), fn);
    

    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn,
        Executor executor) 
        return uniComposeStage(screenExecutor(executor), fn);
    

    public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) 
        return uniWhenCompleteStage(null, action);
    

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) 
        return uniWhenCompleteStage(defaultExecutor(), action);
    

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor) 
        return uniWhenCompleteStage(screenExecutor(executor), action);
    

    public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn) 
        return uniHandleStage(null, fn);
    

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) 
        return uniHandleStage(defaultExecutor(), fn);
    

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) 
        return uniHandleStage(screenExecutor(executor), fn);
    

    /**
     * Returns this CompletableFuture.
     *
     * @return this CompletableFuture
     */
    public CompletableFuture<T> toCompletableFuture() 
        return this;
    

    // not in interface CompletionStage

    /**
     * Returns a new CompletableFuture that is completed when this
     * CompletableFuture completes, with the result of the given
     * function of the exception triggering this CompletableFuture's
     * completion when it completes exceptionally; otherwise, if this
     * CompletableFuture completes normally, then the returned
     * CompletableFuture also completes normally with the same value.
     * Note: More flexible versions of this functionality are
     * available using methods @code whenComplete and @code handle.
     *
     * @param fn the function to use to compute the value of the
     * returned CompletableFuture if this CompletableFuture completed
     * exceptionally
     * @return the new CompletableFuture
     */
    public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn) 
        return uniExceptionallyStage(fn);
    


    /* ------------- Arbitrary-arity constructions -------------- */
    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 
        return andTree(cfs, 0, cfs.length - 1);
    

   
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 
        int n; Object r;
        if ((n = cfs.length) <= 1)
            return (n == 0)
                ? new CompletableFuture<Object>()
                : uniCopyStage(cfs[0]);
        for (CompletableFuture<?> cf : cfs)
            if ((r = cf.result) != null)
                return new CompletableFuture<Object>(encodeRelay(r));
        cfs = cfs.clone();
        CompletableFuture<Object> d = new CompletableFuture<>();
        for (CompletableFuture<?> cf : cfs)
            cf.unipush(new AnyOf(d, cf, cfs));
        if (d.result != null)
            for (int i = 0, len = cfs.length; i < len; i++)
                if (cfs[i].result != null)
                    for (i++; i < len; i++)
                        if (cfs[i].result == null)
                            cfs[i].cleanStack();
        return d;
    

    /* ------------- Control and status methods -------------- */
。。。

我们可以看到它实现了Future。所以一般的Future的姿势,它都有。

使用场景

一般情况下,使用Future都和计算类的有关。

1、聚合/汇总计算,多线程计算完成后,把所有计算的结果一起相加后输出。

2、异步编程。不需要等待当前操作结果。例如日志输入,唤起第三方(不对结果进行负责)

常用方法

在这里我们看下公共方法,其他的方法例如状态控制相关方法,因为用的比较少,暂不考虑。

如果有兴趣的话,可以看对应的方法。

get

获取返回结果用的。这个方法是最基本的方法。

因为所有的CompletableFuture 的方法返回的是CompletableFuture<U> 对象,如果你想要获得对应的U对象值,就必须使用get方法,和使用Stream流中的get原理是一样的。

runAsync

异步操作运行完成后,CompletableFuture 没有返回值。因为它的返回值是void。

如果你要想获得对应的返回参数信息,就需要使用supplyAsync

supplyAsync

带返回值的异步操作。

例如,您需要获取当前用户的消息数。

  CompletableFuture<Integer> futureMessageNum = CompletableFuture.supplyAsync(() -> 
    getMessageNumByUId(uid)
);

因为这里只有一行代码,所以直接省略了和return。

你也可以这样写


CompletableFuture<Integer> futureMessageNum = CompletableFuture.supplyAsync(() -> 
        return getMessageNumByUId(uid);
    
);

allOf

这个方法就是用来聚合的,所有的异步操作全部完成之后,它才会完成。

所以一般情况下,如果我们希望返回多个异步操作结果的时候,我们都会使用allOf函数来进行聚合。

这个方法没有返回类型。

CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) 

例如如果有多个异步计算。

//消息数目

CompletableFuture<Integer> futureMessageNum =...

//粉丝数

CompletableFuture<Integer> futureFansNum =...

//积分

CompletableFuture<Integer> futureIntegral=...

//文章数

CompletableFuture<Integer> futureArticleNum =...

我们需要把所有的信息都赋值到对应的用户输出信息对象UserInfo中,然后给到前端调用。

因为走的是异步,我们没有办法保证所有方法同一时间完成。

所以,就需要allOf方法来保证所有异步的全部完成,然后再将异步计算结果赋值给用户对象对应的属性。

CompletableFuture<void> allFuture = CompletableFuture.allOf(futureMessageNum,futureFansNum,futureIntegral,futureArticleNum);

CompletableFuture<void> allFuture = CompletableFuture.allOf(futureMessageNum,futureFansNum,futureIntegral,futureArticleNum);

它必须等到futureMessageNum,futureFansNum,futureIntegral,futureArticleNum全部执行完成。

因为这个特性。我们可以在执行完当前代码后。

然后使用get方法,将对应的值赋值到UserInfo对应的属性。

测试代码

全部测试代码如下

@Data
public class UserInfo

    private Integer uid;

    private String name;

    private Integer messageNum;

    private Integer fansNum;

    private Integer integral;

    private Integer articleNum;
...

...

@Overid
public UserInfo getUserInfoById(Integer id)

    UserInfo userInfo = new UserInfo();
    
    //消息数目

    CompletableFuture<Integer> futureMessageNum = CompletableFuture.supplyAsync(() -> 
    getMessageNumByUId(uid)
);


    //粉丝数

    CompletableFuture<Integer> futureFansNum = CompletableFuture.supplyAsync(() -> 
    getFansNumByUId(uid)
);


    //积分

    CompletableFuture<Integer> futureIntegral= CompletableFuture.supplyAsync(() -> 
    getIntegralByUId(uid)
);

    //文章数

    CompletableFuture<Integer> futureArticleNum = CompletableFuture.supplyAsync(() -> 
    getArticleNumByUId(uid)
);

    CompletableFuture<void> allFuture= CompletableFuture.allOf(futureMessageNum,futureFansNum,futureIntegral,futureArticleNum);
    
    try //get方法有异常抛出,所以需要try
        userInfo.setMessageNum(futureMessageNum.get());
        userInfo.setFansNum(futureFansNum.get());
        userInfo.setIntegral(futureIntegral.get());
        userInfo.setArticlNum(futureArticleNum.get());
    。。。

completedFuture

将传入的对象转换为completedFuture对象

join

聚合。如果上面所有的返回都是一样的值,就可以使用join函数将他们放到一起。

这个和ForkJoinTask差不多。

thenApply

接收一个函数作为参数,使用该函数处理上一个CompletableFuture 调用的结果,并返回一个具有处理结果的Future对象。

thenCompose

thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。

总结

这个是1.8之后推出来的功能。

1.8之前的版本就不要想了。

最好的办法就是自己代码跑跑看,跑的时候你还能解锁更多的宫那个方法

以上是关于解锁Future新姿势:CompletableFuture初探的主要内容,如果未能解决你的问题,请参考以下文章

Android实战——RxJava2+Retrofit+RxBinding解锁各种新姿势

Cmd打开Java软件——解锁.jar打开新姿势

Cmd打开Java软件——解锁.jar打开新姿势

机器学习解锁验证码识别新姿势 | 咖面44期

肝了两天IntelliJ IDEA 2020,解锁11种新姿势, 真香!!!

eNSP中玩转Python自动化——解锁网工新姿势