JUC系列Executor框架之CompletionFuture
Posted 顧棟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC系列Executor框架之CompletionFuture相关的知识,希望对你有一定的参考价值。
JUC系列Executor框架之CompletionFuture
文章目录
需要优先阅读
CompletionStage
可能是异步计算的一个阶段,它在另一个 CompletionStage 完成时执行一个操作或计算一个值。 一个阶段在其计算终止时完成,但这可能反过来触发其他相关阶段。 此接口中定义的功能仅采用几种基本形式,可扩展为更大的方法集以捕获一系列使用风格:
- 阶段执行的计算可以表示为 Function、Consumer 或 Runnable(分别使用名称包括 apply、accept 或 run 的方法),具体取决于它是否需要参数和/或产生结果。 例如 stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())。 另一种形式(compose)应用阶段本身的功能,而不是它们的结果。
- 一个阶段的执行可以由单个阶段的完成触发,也可以由两个阶段的完成触发,或者两个阶段中的任一个触发。 使用带有前缀 then 的方法来排列单个阶段的依赖关系。 由两个阶段完成触发的那些可以使用相应命名的方法组合它们的结果或效果。 由两个阶段中的任何一个触发的那些不保证哪些结果或效果用于从属阶段的计算。
- 阶段之间的依赖关系控制计算的触发,但不保证任何特定的顺序。 此外,新阶段计算的执行可以通过以下三种方式中的任何一种进行安排:默认执行、默认异步执行(使用带有后缀 async 的方法,该方法采用阶段的默认异步执行工具)或自定义(通过提供的 Executor)。 默认和异步模式的执行属性由 CompletionStage 实现指定,而不是此接口。 具有显式 Executor 参数的方法可能具有任意执行属性,甚至可能不支持并发执行,但以适应异步的方式安排处理。
- 两种方法形式支持处理触发阶段是正常完成还是异常完成:方法 whenComplete 允许注入动作而不管结果如何,否则在完成时保留结果。方法句柄还允许阶段计算替换结果,该结果可以允许其他相关阶段进行进一步处理。在所有其他情况下,如果某个阶段的计算因(未经检查的)异常或错误而突然终止,则所有需要其完成的相关阶段也会异常完成,并且 CompletionException 将异常作为其原因。如果一个阶段依赖于两个阶段,并且都异常完成,那么 CompletionException 可能对应于这些异常中的任何一个。如果一个阶段依赖于其他两个阶段中的任何一个,并且其中只有一个异常完成,则无法保证依赖阶段是正常完成还是异常完成。在方法 whenComplete 的情况下,当提供的操作本身遇到异常时,如果尚未异常完成,则阶段异常完成此异常。
所有方法都遵循上述触发、执行和异常完成规范(在单个方法规范中不再重复)。 此外,虽然用于为接受它们的方法传递完成结果(即,对于 T 类型的参数)的参数可能为 null,但为任何其他参数传递 null 值将导致引发 NullPointerException。
该接口未定义初始创建、正常或异常强制完成、探测完成状态或结果或等待阶段完成的方法。 CompletionStage 的实现可以酌情提供实现这种效果的方法。 方法 toCompletableFuture 通过提供一个通用的转换类型来实现此接口的不同实现之间的互操作性。
接口方法如下
方法 | 说明 |
---|---|
CompletionStage thenApply(Function<? super T,? extends U> fn) | 返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段的结果作为所提供函数的参数来执行该阶段。 |
CompletionStage thenApplyAsync (Function<? super T,? extends U> fn) | 返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段的默认异步执行工具执行此阶段的结果,并将此阶段的结果作为所提供函数的参数。 |
CompletionStage thenApplyAsync (Function<? super T,? extends U> fn, Executor executor) | 返回一个新的 CompletionStage,当此阶段正常完成时,将使用提供的 Executor 执行该阶段,并将此阶段的结果作为提供的函数的参数。 |
CompletionStage thenAccept(Consumer<? super T> action) | 返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段的结果作为提供的操作的参数来执行该阶段。 |
CompletionStage thenAcceptAsync(Consumer<? super T> action) | 返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段的默认异步执行工具执行该阶段,并将此阶段的结果作为所提供操作的参数。 |
CompletionStage thenAcceptAsync(Consumer<? super T> action, Executor executor) | 返回一个新的 CompletionStage,当此阶段正常完成时,将使用提供的 Executor 执行该阶段,并将此阶段的结果作为提供的操作的参数。 |
CompletionStage thenRun(Runnable action) | 返回一个新的 CompletionStage,当此阶段正常完成时,执行给定的操作。 |
CompletionStage thenRunAsync(Runnable action) | 返回一个新的 CompletionStage,当此阶段正常完成时,使用此阶段的默认异步执行工具执行给定的操作。 |
CompletionStage thenRunAsync(Runnable action, Executor executor) | 返回一个新的 CompletionStage,当此阶段正常完成时,使用提供的 Executor 执行给定的操作。 |
<U,V> CompletionStage thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | 返回一个新的 CompletionStage,当此阶段和另一个给定阶段都正常完成时,将使用两个结果作为所提供函数的参数执行。 |
<U,V> CompletionStage thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | 返回一个新的 CompletionStage,当此阶段和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行该阶段,并将两个结果作为提供函数的参数。 |
<U,V> CompletionStage thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) | 返回一个新的 CompletionStage,当这个阶段和另一个给定阶段正常完成时,使用提供的执行器执行,两个结果作为提供的函数的参数。 |
CompletionStage thenAcceptBoth (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) | 返回一个新的 CompletionStage,当这个阶段和另一个给定阶段都正常完成时,将使用两个结果作为所提供操作的参数来执行。 |
CompletionStage thenAcceptBothAsync (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) | 返回一个新的 CompletionStage,当此阶段和另一个给定阶段正常完成时,将使用此阶段的默认异步执行工具执行该阶段,并将两个结果作为所提供操作的参数。 |
CompletionStage thenAcceptBothAsync (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor) | 返回一个新的 CompletionStage,当这个阶段和另一个给定阶段正常完成时,使用提供的执行器执行,两个结果作为提供的函数的参数。 |
CompletionStage runAfterBoth(CompletionStage<?> other, Runnable action) | 返回一个新的 CompletionStage,当这个阶段和另一个给定阶段都正常完成时,执行给定的操作。 |
CompletionStage runAfterBothAsync(CompletionStage<?> other, Runnable action) | 返回一个新的 CompletionStage,当此阶段和另一个给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定操作。 |
CompletionStage runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) | 返回一个新的 CompletionStage,当这个阶段和另一个给定阶段正常完成时,使用提供的执行器执行给定的操作。 |
CompletionStage applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn) | 返回一个新的 CompletionStage,当这个或另一个给定阶段正常完成时,将使用相应的结果作为提供的函数的参数执行。 |
CompletionStage applyToEitherAsync (CompletionStage<? extends T> other, Function<? super T, U> fn) | 返回一个新的 CompletionStage,当此阶段或其他给定阶段正常完成时,将使用此阶段的默认异步执行工具执行该阶段,并将相应的结果作为所提供函数的参数。 |
CompletionStage applyToEitherAsync (CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor) | 返回一个新的 CompletionStage,当此阶段或其他给定阶段正常完成时,将使用提供的执行程序执行,并将相应的结果作为提供的函数的参数。 |
CompletionStage acceptEither (CompletionStage<? extends T> other, Consumer<? super T> action) | 返回一个新的 CompletionStage,当此阶段或其他给定阶段正常完成时,将使用相应的结果作为提供的操作的参数执行。 |
CompletionStage acceptEitherAsync (CompletionStage<? extends T> other, Consumer<? super T> action) | 返回一个新的 CompletionStage,当此阶段或其他给定阶段正常完成时,将使用此阶段的默认异步执行工具执行该阶段,并将相应的结果作为所提供操作的参数。 |
CompletionStage acceptEitherAsync (CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor) | 返回一个新的 CompletionStage,当此阶段或其他给定阶段正常完成时,将使用提供的执行程序执行,并将相应的结果作为提供的函数的参数。 |
CompletionStage runAfterEither(CompletionStage<?> other, Runnable action) | 返回一个新的 CompletionStage,当此阶段或其他给定阶段正常完成时,将执行给定操作。 |
CompletionStage runAfterEitherAsync (CompletionStage<?> other, Runnable action) | 返回一个新的 CompletionStage,当此阶段或其他给定阶段正常完成时,使用此阶段的默认异步执行工具执行给定操作。 |
CompletionStage runAfterEitherAsync (CompletionStage<?> other, Runnable action, Executor executor) | 返回一个新的 CompletionStage,当此阶段或其他给定阶段正常完成时,使用提供的执行程序执行给定操作。 |
CompletionStage thenCompose (Function<? super T, ? extends CompletionStage> fn) | 返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段作为提供函数的参数执行。 |
CompletionStage thenComposeAsync (Function<? super T, ? extends CompletionStage> fn) | 返回一个新的 CompletionStage,当此阶段正常完成时,将使用此阶段的默认异步执行工具执行该阶段,并将此阶段作为所提供函数的参数。 |
CompletionStage thenComposeAsync (Function<? super T, ? extends CompletionStage> fn, Executor executor) | 返回一个新的 CompletionStage,当此阶段正常完成时,将使用提供的 Executor 执行该阶段,并将此阶段的结果作为提供的函数的参数。 |
CompletionStage exceptionally (Function<Throwable, ? extends T> fn) | 返回一个新的 CompletionStage,当此阶段异常完成时,将使用此阶段的异常作为所提供函数的参数来执行该阶段。 否则,如果此阶段正常完成,则返回的阶段也会以相同的值正常完成。 |
CompletionStage whenComplete (BiConsumer<? super T, ? super Throwable> action) | 返回一个与此阶段具有相同结果或异常的新 CompletionStage,它在此阶段完成时执行给定的操作。 当这个阶段完成时,使用这个阶段的结果(如果没有,则为 null)和异常(如果没有,则为 null)作为参数调用给定的操作。 当动作返回时,返回阶段完成。 如果提供的操作本身遇到异常,则返回的阶段异常完成并出现此异常,除非此阶段也异常完成。 |
CompletionStage whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action) | 返回具有与此阶段相同的结果或异常的新 CompletionStage,在此阶段完成时使用此阶段的默认异步执行工具执行给定操作。 当这个阶段完成时,使用这个阶段的结果(如果没有,则为 null)和异常(如果没有,则为 null)作为参数调用给定的操作。 当动作返回时,返回阶段完成。 如果提供的操作本身遇到异常,则返回的阶段异常完成并出现此异常,除非此阶段也异常完成。 |
CompletionStage whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action, Executor executor) | 返回一个新的 CompletionStage,其结果或异常与此阶段相同,当此阶段完成时,它使用提供的 Executor 执行给定的操作。 当这个阶段完成时,使用这个阶段的结果(如果没有,则为 null)和异常(如果没有,则为 null)作为参数调用给定的操作。 当动作返回时,返回阶段完成。 如果提供的操作本身遇到异常,则返回的阶段异常完成并出现此异常,除非此阶段也异常完成。 |
CompletionStage handle (BiFunction<? super T, Throwable, ? extends U> fn) | 返回一个新的 CompletionStage,当此阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数来执行该阶段。 当这个阶段完成时,使用这个阶段的结果(如果没有,则为 null)和异常(如果没有,则为 null)作为参数调用给定的函数,并使用函数的结果来完成返回的阶段。 |
CompletionStage handleAsync (BiFunction<? super T, Throwable, ? extends U> fn) | 返回一个新的 CompletionStage,当此阶段正常或异常完成时,将使用此阶段的默认异步执行工具执行该阶段,并将此阶段的结果和异常作为提供函数的参数。 当这个阶段完成时,使用这个阶段的结果(如果没有,则为 null)和异常(如果没有,则为 null)作为参数调用给定的函数,并使用函数的结果来完成返回的阶段。 |
CompletionStage handleAsync (BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) | 返回一个新的 CompletionStage,当此阶段正常或异常完成时,将使用提供的执行程序执行该阶段的结果和异常作为提供函数的参数。 当这个阶段完成时,使用这个阶段的结果(如果没有,则为 null)和异常(如果没有,则为 null)作为参数调用给定的函数,并使用函数的结果来完成返回的阶段。 |
CompletableFuture toCompletableFuture() | 返回一个 CompletableFuture 保持与此阶段相同的完成属性。 如果此阶段已经是 CompletableFuture,则此方法可能会返回此阶段本身。 否则,此方法的调用可能等效于 thenApply(x -> x),但返回 CompletableFuture 类型的实例。 不选择与其他实现互操作的 CompletionStage 实现可能会抛出 UnsupportedOperationException。 |
CompletableFuture
CompletableFuture实现了Future接口,同时CompletableFuture实现了对任务编排的能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
一个可以显式完成的 Future(设置它的值和状态),并且可以用作 CompletionStage,支持在完成时触发的依赖函数和操作。
当两个或更多线程尝试完成、完成异常或取消 CompletableFuture 时,只有其中一个成功。
除了这些以及直接操作状态和结果的相关方法之外,CompletableFuture 还使用以下策略实现接口 CompletionStage:
- 为非异步方法的依赖完成提供的操作可以由完成当前 CompletableFuture 的线程执行,也可以由完成方法的任何其他调用者执行。
- 所有没有显式 Executor 参数的异步方法都使用 ForkJoinPool.commonPool() 执行(除非它不支持至少两个并行级别,在这种情况下,会创建一个新线程来运行每个任务)。 为了简化监控、调试和跟踪,所有生成的异步任务都是标记接口 CompletableFuture.AsynchronousCompletionTask 的实例。
- 所有 CompletionStage 方法都是独立于其他公共方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖的影响。
CompletableFuture 还使用以下策略实现 Future:
- 因为(与 FutureTask 不同)这个类不能直接控制导致它完成的计算,所以取消被视为另一种形式的异常完成。 方法 cancel 与 completeExceptionally(new CancellationException()) 具有相同的效果。 方法 isCompletedExceptionally 可用于确定 CompletableFuture 是否以任何异常方式完成。
- 如果出现 CompletionException 异常完成,get() 和 get(long, TimeUnit) 方法会抛出 ExecutionException,原因与对应的 CompletionException 中的原因相同。 为了在大多数情况下简化使用,此类还定义了 join() 和 getNow 方法,它们在这些情况下直接抛出 CompletionException。
其实现了Future和CompletionStage。
CompletableFuture组成
成员变量
volatile Object result; // 结果或者异常信息结果
volatile Completion stack; // 完成的结果的堆栈 Treiber stack
// useCommonPool这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数),若线程数大于1,采用useCommonPool作为默认线程池
private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
// ThreadPerTaskExecutor 默认启动一个线程执行
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
// Completion.tryFire 的模式
static final int SYNC = 0; // 同步
static final int ASYNC = 1; // 异步
static final int NESTED = -1; // 嵌套
主要方法
异步方法
public static CompletableFuture<Void> runAsync(Runnable runnable)
:以Runnable函数为参数,返回一个新的 CompletableFuture,它在运行给定操作后由 ForkJoinPool.commonPool() 中运行的任务异步完成。public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
:以Runnable 函数为参数,返回一个新的 CompletableFuture,它在运行给定操作后由提供的线程池executor运行的任务异步完成。public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
:以Supplier函数为参数,返回一个新的 CompletableFuture,它由在 ForkJoinPool.commonPool() 中运行的任务异步完成,其值通过调用给定的供应商获得。public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
:以Supplier函数为参数,返回一个新的 CompletableFuture,它在运行给定操作后由提供的线程池executor运行的任务异步完成,其值通过调用给定的供应商获得。
依赖关系
thenApply()系列
:把前面任务的执行结果,交给后面的Function
thenRun()系列
:前面的任务执行完,执行后面的操作。thenCompose()系列
:用来连接两个有依赖关系的任务,结果由第二个任务返回thenAccept()系列
:把前面任务的执行结果,交给后面的Consumer
集合关系
thenCombine()系列
:合并任务,有返回值thenAcceptBoth()系列
:两个任务执行完成后,将结果交给thenAccepetBoth
处理,无返回值runAfterBoth()系列
:两个任务都执行完成后,执行下一步操作(Runnable
类型任务)
聚合关系
applyToEither系列
:两个任务哪个执行的快,就使用哪一个结果,有返回值acceptEither系列
:两个任务哪个执行的快,就消费哪一个结果,无返回值runAfterEither系列
:任意一个任务执行完成,进行下一步操作(Runnable
类型任务)
并行执行
allOf()系列
:当所有给定的CompletableFuture完成时,返回一个新的 CompletableFutureanyOf()系列
:当任何一个给定的CompletablFuture
完成时,返回一个新的CompletableFuture
获取结果
get()系列
:如有必要,等待此Future完成,然后返回其结果。join()系列
:完成时返回结果值,如果异常完成则抛出(未经检查的)异常。 为了更好地符合通用函数形式的使用,如果完成此 CompletableFuture 所涉及的计算引发异常,则此方法将引发(未经检查的)CompletionException,并将底层异常作为其原因。
结果处理
whenComplete()系列
:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作exceptionally()系列
:返回一个新的CompletableFuture
,当前面的CompletableFuture
完成时,它也完成,当它异常完成时,给定函数的异常触发这个CompletableFuture
的完成
任务类型的转换
CompletableFuture默认的线程池是ForkJoinPool,它执行的任务是ForkJoinTask类型的,而向ForkJoinTask提交的任务并非是ForkJoinTask,内部是用来众多内部类和方法来实现任务类型的适配。
四种任务原型
CompletableFuture执行的任务共有四个类型,Runnable、Consumer、Supplier、Function。
无参数 | 有参数 | |
---|---|---|
无返回值 | Runnable接口 对应的提交方法:runAsync、thenRun | Consumer接口 对应的提交方法:thenAccept |
有返回值 | Supplier接口 对应的提交方法:supplierAsync | Function接口 对应的提交方法:thenApply |
由于runAsync与supplierAsync是CompletableFuture的静态方法;而thenAccept、thenApply、thenRun是CompletableFuture的成员方法。
初始时没有CompletableFuture对象只能提交Runnable或Supplier任务,在生成CompletableFuture对象之后可以使用其他成员方法提交Runnable,Consumer,Function。
CompletableFuture内部类集成关系
转换方法
在下述方法中会将提交的Runnable、Consumer、Supplier、Function类的的任务,封装到继承了ForkJoinTask的各种子类中,这样就可以提交到ForkJoinPool中执行了。
-
uniApplyStage()
-
uniComposeStage()
-
uniExceptionallyStage()
-
uniHandleStage()
-
uniWhenCompleteStage()
-
uniRunStage()
-
uniAcceptStage()
-
orRunStage()
-
orAcceptStage()
-
orApplyStage()
-
biRunStage()
-
biAcceptStage()
-
biApplyStage()
任务链式执行过程分析
链式执行的代码为CompletableFuture.supplyAsync(...).thenApplyAsync(...)
。
public static void testChainExecution() throws ExecutionException, InterruptedException
CompletableFuture<Integer> result = CompletableFuture.supplyAsync(() ->
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] The first operation.");
return 10;
, POOL_EXECUTOR).thenApplyAsync(x ->
System.out.println("[" + new SimpleDateFormat("HH:mm:ss:sss").format(new Date()) + "--" + Thread.currentThread().getName() + "] The second operation.");
return x * 2;
, POOL_EXECUTOR);
result.get();
step1:.supplyAsync(…)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
return asyncSupplyStage(screenExecutor(executor), supplier);
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f)
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
// 封装AsyncSupply对象
e.execute(new AsyncSupply<U>(d, f));
return d;
AsyncSupply对象在里面还有Supplier fn与CompletableFuture dep对象。fn代表需要执行的任务,dep代表了执行的结果,由于继承了ForkJoinTask,所以AsyncSupply对象可以被ForkJoinPool执行,而这里使用的自定义的线程池。
step2:.thenApplyAsync(…)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor)
return uniApplyStage(screenExecutor(executor), fn);
private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T,? extends V> f)
if (f == null) throw new NullPointerException();
CompletableFuture<V> d = new CompletableFuture<V>();
// 当线程池不为空 或者 此前的任务还未执行结束需要将本任务进行压栈
if (e != null || !d.uniApply(this, f, null))
// e 代表线程池 d 代表执行结果CompletableFuture this上一步执行结果CompletableFuture f需要执行的任务
UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
// 压栈
push(c);
// 如果触发,则执行完成操作,如果存在,则返回可能需要传播的依赖项。
c.tryFire(SYNC);
return d;
final void push(UniCompletion<?,?> c)
if (c != null)
// tryPushStack(c)失败,代表压栈失败
while (result == null && !tryPushStack(c))
// 将结点的next置空
lazySetNext(c, null); // clear on failure
/** 尝试压栈*/
final boolean tryPushStack(Completion c)
// stack栈顶指针
Completion h = stack;
// 入栈
lazySetNext(c, h);
// 更新栈顶指针
return UNSAFE.compareAndSwapObject(this, STACK, h, c);
/** 压栈*/
static void lazySetNext(Completion c, Completion next)
UNSAFE.putOrderedObject(c, NEXT, next);
由于第一个任务没有执行结束,需要将第二个任务构造出UniApply对象,存放到第一个任务的栈里。new UniApply<T,V>(e, d, this, f);
,e 代表线程池;d 代表执行结果CompletableFuture;this代表上一步执行结果CompletableFuture;f代表需要执行的任务。
当线程池执行AsyncSupply中的run方法,在执行中会去执行d.postComplete();
,会在这个方法中把压入栈的UniApply对象取出来执行。
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn)
this.dep = dep; this.fn = fn;
public final Void getRawResult() return null;
public final void setRawResult(Void v)
public final boolean exec() run(); return true;
public void run()
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null)
dep = null; fn = null;
if (d.result == null)
try
d.completeValue(f.get());
catch (Throwable ex)
d.completeThrowable(ex);
d.postComplete();
final void postComplete()
/*
* On each step, variable f holds current dependents to pop
* and run. It is extended along only one path at a time,
* pushing others to avoid unbounded recursion.
*/
CompletableFuture<?> f = this; Completion h; // h栈顶结点
// (h = f.stack) != null成立说明栈非空,当前f发生了变化,需要在检查
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null))
CompletableFuture<?> d; Completion t;
// 出栈,调整栈顶指针成功,stack指向了源h.next
if (f.casStack(h, t = h.next))
// 若t不是栈底
if (t != null)
// 若f不是当前结果,将h重新压入栈中继续执行出栈
if (f != this)
pushStack(h);
continue;
// f就是当前结果,将h从栈中分离
h.next = null; // detach
// 执行出栈的任务,去执行UniApply.tryFire(NESTED)
f = (d = h.tryFire(NESTED以上是关于JUC系列Executor框架之CompletionFuture的主要内容,如果未能解决你的问题,请参考以下文章