CompletableFuture.allOf().orTimeout() 的意外行为
Posted
技术标签:
【中文标题】CompletableFuture.allOf().orTimeout() 的意外行为【英文标题】:Unexpected Behavior for CompletableFuture.allOf().orTimeout() 【发布时间】:2021-02-24 16:12:33 【问题描述】:有两种方法可以强制CompletableFuture
在给定时间后超时:
orTimeout(long timeout, TimeUnit unit)
get(long timeout, TimeUnit unit)
我希望它们的行为相同。但是,当应用于CompletableFuture.allOf(CompletableFuture<?>... cfs)
时,这两种提供超时的方式完全不同!
基本上,get()
似乎符合我的预期(它会阻塞当前线程,直到所有期货都完成),而orTimeout()
似乎表现得非常奇怪(它会尽快解除阻塞当前线程第一个未来已经完成)。
这里有一些代码来演示我观察到的行为:
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class AllOfWithTimeoutTest
public static final int TIMEOUT_IN_MILLIS = 100;
@Test
public void allOfOrTimeout1() throws InterruptedException, ExecutionException, TimeoutException
getAllOfFuture().get(TIMEOUT_IN_MILLIS, MILLISECONDS);
@Test
public void allOfOrTimeout2() throws ExecutionException, InterruptedException
getAllOfFuture().orTimeout(TIMEOUT_IN_MILLIS, MILLISECONDS);
private CompletableFuture<Void> getAllOfFuture()
return CompletableFuture.allOf(
CompletableFuture.runAsync(() -> sleep(1)),
CompletableFuture.runAsync(() -> sleep(2)),
CompletableFuture.runAsync(() -> sleep(3)),
CompletableFuture.runAsync(() -> sleep(4)),
CompletableFuture.runAsync(() -> sleep(5)),
CompletableFuture.runAsync(() -> sleep(6)),
CompletableFuture.runAsync(() -> sleep(7)),
CompletableFuture.runAsync(() -> sleep(8))
);
public static void sleep(int millis)
try
Thread.sleep(millis);
System.out.format("Had a nap for %s milliseconds.\r\n", millis);
catch (InterruptedException e)
e.printStackTrace();
;
allOfOrTimeout1()
的打印输出是我所期望的:
Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.
Had a nap for 3 milliseconds.
Had a nap for 4 milliseconds.
Had a nap for 5 milliseconds.
Had a nap for 6 milliseconds.
Had a nap for 7 milliseconds.
Had a nap for 8 milliseconds.
allOfOrTimeout2()
的打印输出不是我所期望的,每次执行都会略有不同。它通常打印在前 2 到 5 行之间,但从不打印 8 行。这是一个仅打印 2 行的版本:
Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.
另外,如果我在 IntelliJ 中运行整个测试,最后会得到一些额外的行:
Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.
Had a nap for 3 milliseconds.
Had a nap for 4 milliseconds.
Had a nap for 5 milliseconds.
Had a nap for 6 milliseconds.
Had a nap for 7 milliseconds.
Had a nap for 8 milliseconds.
Had a nap for 1 milliseconds.
Had a nap for 2 milliseconds.
Had a nap for 3 milliseconds.
Had a nap for 4 milliseconds.
Process finished with exit code 0
Had a nap for
我的问题是:
-
这是
orTimeout()
的预期行为吗?
如果不是,为什么要这样做?
【问题讨论】:
您可以在方法allOfOrTimeout2
结束前添加Thread.sleep()
,您应该会看到其余消息。
您有机会查看以下答案吗?谢谢
这个问题你得到答案了吗?
【参考方案1】:
这是 orTimeout() 的预期行为吗?
是的。
OrTimeout
是非阻塞异步调用,而get
是阻塞异步调用。
【讨论】:
【参考方案2】:是的,这是预期的行为。
我认为首先要实现的关键是CompletableFuture
中的async
方法默认会在common pool 上运行任务,其中所有线程都设置为daemon threads,这意味着JVM 不会等到它完成退出。 CompletableFuture
的 Javadocs 中的一条注释提到了这一点:“所有没有显式 Executor 参数的异步方法都使用 ForkJoinPool.commonPool() 执行”
考虑到这一点,orTimeout()
使当前的CompletableFuture
(它是所有 8 个CompletableFuture
任务的组合)完成,如果它没有在给定的超时时间内正常完成,则会出现异常。与get()
不同,这不会阻塞当前线程,而只是在getAllOfFuture()
返回的CompletableFuture
上添加一个超时。在调用orTimeout()
之后,当前线程继续运行,由于没有什么可做的,JVM 在所有任务有机会运行之前就完成了,因为它们正在由守护线程执行。
至于为什么只有少数任务完成,我怀疑是因为JUnit测试后处理阶段,即测试方法完成后,主线程仍然需要运行一些JUnit后处理代码,它为一些工作线程提供了完成任务的时间。如果您在普通的 main 方法(在 JUnit 之外)中运行代码,orTimeout
示例可能甚至不会输出单个任务,因为线程会立即结束。
【讨论】:
【参考方案3】:我猜如果你这样写:
public static void allOfOrTimeout2()
CompletableFuture<Void> future1 = getAllOfFutures();
CompletableFuture<Void> future2 = future1.orTimeout(TIMEOUT_IN_MILLIS, MILLISECONDS);
让自己置身于main
线程并思考它应该如何执行allOfOrTimeout2()
,事情会变得更容易。
main
线程需要执行allOfOrTimeout2
。它进入这个方法并从那里返回一个CompletableFuture<Void>
。它会等待它完成吗?没有。甚至文档都说,如果你想要等待这个CompletableFuture
完成,你需要:
此方法的应用之一是在继续程序之前等待一组独立的 CompletableFuture 完成,如:CompletableFuture.allOf(c1, c2,c3).join();
但是你不调用它join
,你从方法中返回它并继续。
CompletableFuture
的执行确实开始了。它在来自ForkJoinPool
的线程中通过runAsync
执行(这也是supplyAsync
的工作方式:您将实际工作分派给不同的线程,然后调用supplyAsync
的线程继续运行)。
main
线程然后移动到这个:
CompletableFuture<Void> future2 = future1.orTimeout(TIMEOUT_IN_MILLIS, MILLISECONDS);
它需要调用orTimeout
并且文档说:
异常地完成这个 CompletableFuture 并带有 TimeoutException如果没有在给定的超时之前完成。
future1
完成了吗?这又是一个时间问题,但很可能没有(要完成所有runAsync
需要完成的任务)。所以没有超时,allOfOrTimeout2
的调用就结束了。
现在那些在runAsync
中运行代码的线程被称为守护线程,VM 不必等待它们结束才能关闭。所以main
线程结束了,那些线程并没有停止VM,所以一切都结束了。
由于整个输出取决于线程的调度方式,因此无法预测。您甚至无法预测 future2
是否会在 VM 关闭之前完成。
当然,如果您想要可预测的结果:您要么等待future2
完成,要么将执行程序传递给runAsync
。
【讨论】:
以上是关于CompletableFuture.allOf().orTimeout() 的意外行为的主要内容,如果未能解决你的问题,请参考以下文章
传递给 CompletableFuture.allOf() 的所有期货都会运行吗?
Java CompletableFuture:allOf等待所有异步线程任务结束
CompletableFutureCompletableFuture测试runAsync()方法调用CompletableFuture.join()/get()方法阻塞主线程