CompletableFuture.allOf().orTimeout() 的意外行为

Posted

技术标签:

【中文标题】CompletableFuture.allOf().orTimeout() 的意外行为【英文标题】:Unexpected Behavior for CompletableFuture.allOf().orTimeout() 【发布时间】:2021-02-24 16:12:33 【问题描述】:

有两种方法可以强制CompletableFuture 在给定时间后超时:

orTimeout(long timeout, TimeUnit unit) get(long timeout, TimeUnit unit)

我希望它们的行为相同。但是,当应用于CompletableFuture.allOf(CompletableFuture<?>... cfs) 时,这两种提供超时的方式完全不同!

基本上,get() 似乎符合我的预期(它会阻塞当前线程,直到所有期货都完成),而orTimeout() 似乎表现得非常奇怪(它会尽快解除阻塞当前线程第一个未来已经完成)。

这里有一些代码来演示我观察到的行为:

import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class AllOfWithTimeoutTest 

    public static final int TIMEOUT_IN_MILLIS = 100;

    @Test
    public void allOfOrTimeout1() throws InterruptedException, ExecutionException, TimeoutException 
        getAllOfFuture().get(TIMEOUT_IN_MILLIS, MILLISECONDS);
    

    @Test
    public void allOfOrTimeout2() throws ExecutionException, InterruptedException 
        getAllOfFuture().orTimeout(TIMEOUT_IN_MILLIS, MILLISECONDS);
    

    private CompletableFuture<Void> getAllOfFuture() 
        return CompletableFuture.allOf(
            CompletableFuture.runAsync(() -> sleep(1)),
            CompletableFuture.runAsync(() -> sleep(2)),
            CompletableFuture.runAsync(() -> sleep(3)),
            CompletableFuture.runAsync(() -> sleep(4)),
            CompletableFuture.runAsync(() -> sleep(5)),
            CompletableFuture.runAsync(() -> sleep(6)),
            CompletableFuture.runAsync(() -> sleep(7)),
            CompletableFuture.runAsync(() -> sleep(8))
        );
    

    public static void sleep(int millis) 
        try 
            Thread.sleep(millis);
            System.out.format("Had a nap for %s milliseconds.\r\n", millis);
         catch (InterruptedException e) 
            e.printStackTrace();
        
    ;

allOfOrTimeout1() 的打印输出是我所期望的:

Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.
Had a nap for 3 milliseconds.
Had a nap for 4 milliseconds.
Had a nap for 5 milliseconds.
Had a nap for 6 milliseconds.
Had a nap for 7 milliseconds.
Had a nap for 8 milliseconds.

allOfOrTimeout2() 的打印输出不是我所期望的,每次执行都会略有不同。它通常打印在前 2 到 5 行之间,但从不打印 8 行。这是一个仅打印 2 行的版本:

Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.

另外,如果我在 IntelliJ 中运行整个测试,最后会得到一些额外的行:

Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.
Had a nap for 3 milliseconds.
Had a nap for 4 milliseconds.
Had a nap for 5 milliseconds.
Had a nap for 6 milliseconds.
Had a nap for 7 milliseconds.
Had a nap for 8 milliseconds.



Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.

Had a nap for 3 milliseconds.
Had a nap for 4 milliseconds.

Process finished with exit code 0
Had a nap for 

我的问题是:

    这是orTimeout() 的预期行为吗? 如果不是,为什么要这样做?

【问题讨论】:

您可以在方法allOfOrTimeout2 结束前添加Thread.sleep(),您应该会看到其余消息。 您有机会查看以下答案吗?谢谢 这个问题你得到答案了吗? 【参考方案1】:

这是 orTimeout() 的预期行为吗?

是的。

OrTimeout 是非阻塞异步调用,而get 是阻塞异步调用。

【讨论】:

【参考方案2】:

是的,这是预期的行为。

我认为首先要实现的关键是CompletableFuture 中的async 方法默认会在common pool 上运行任务,其中所有线程都设置为daemon threads,这意味着JVM 不会等到它完成退出。 CompletableFuture 的 Javadocs 中的一条注释提到了这一点:“所有没有显式 Executor 参数的异步方法都使用 ForkJoinPool.commonPool() 执行”

考虑到这一点,orTimeout() 使当前的CompletableFuture(它是所有 8 个CompletableFuture 任务的组合)完成,如果它没有在给定的超时时间内正常完成,则会出现异常。与get() 不同,这不会阻塞当前线程,而只是在getAllOfFuture() 返回的CompletableFuture 上添加一个超时。在调用orTimeout() 之后,当前线程继续运行,由于没有什么可做的,JVM 在所有任务有机会运行之前就完成了,因为它们正在由守护线程执行。

至于为什么只有少数任务完成,我怀疑是因为JUnit测试后处理阶段,即测试方法完成后,主线程仍然需要运行一些JUnit后处理代码,它为一些工作线程提供了完成任务的时间。如果您在普通的 main 方法(在 JUnit 之外)中运行代码,orTimeout 示例可能甚至不会输出单个任务,因为线程会立即结束。

【讨论】:

【参考方案3】:

我猜如果你这样写:

public static void allOfOrTimeout2() 
    CompletableFuture<Void> future1 = getAllOfFutures();
    CompletableFuture<Void> future2 = future1.orTimeout(TIMEOUT_IN_MILLIS, MILLISECONDS);

让自己置身于main 线程并思考它应该如何执行allOfOrTimeout2(),事情会变得更容易。

main 线程需要执行allOfOrTimeout2。它进入这个方法并从那里返回一个CompletableFuture&lt;Void&gt;。它会等待它完成吗?没有。甚至文档都说,如果你想要等待这个CompletableFuture完成,你需要:

此方法的应用之一是在继续程序之前等待一组独立的 CompletableFuture 完成,如:CompletableFuture.allOf(c1, c2,c3).join();

但是你不调用它join,你从方法中返回它并继续。

CompletableFuture 的执行确实开始了。它在来自ForkJoinPool 的线程中通过runAsync 执行(这也是supplyAsync 的工作方式:您将实际工作分派给不同的线程,然后调用supplyAsync 的线程继续运行)。

main 线程然后移动到这个:

CompletableFuture<Void> future2 = future1.orTimeout(TIMEOUT_IN_MILLIS, MILLISECONDS);

它需要调用orTimeout 并且文档说:

异常地完成这个 CompletableFuture 并带有 TimeoutException如果没有在给定的超时之前完成

future1 完成了吗?这又是一个时间问题,但很可能没有(要完成所有runAsync 需要完成的任务)。所以没有超时,allOfOrTimeout2的调用就结束了。

现在那些在runAsync 中运行代码的线程被称为守护线程,VM 不必等待它们结束才能关闭。所以main线程结束了,那些线程并没有停止VM,所以一切都结束了。

由于整个输出取决于线程的调度方式,因此无法预测。您甚至无法预测 future2 是否会在 VM 关闭之前完成。


当然,如果您想要可预测的结果:您要么等待future2 完成,要么将执行程序传递给runAsync

【讨论】:

以上是关于CompletableFuture.allOf().orTimeout() 的意外行为的主要内容,如果未能解决你的问题,请参考以下文章

传递给 CompletableFuture.allOf() 的所有期货都会运行吗?

Java CompletableFuture:allOf等待所有异步线程任务结束

CompletableFutureCompletableFuture测试runAsync()方法调用CompletableFuture.join()/get()方法阻塞主线程