CompletableFuture 没有被执行。如果我使用 ExecutorService 池,它可以按预期工作,但不能使用默认的 forkJoin 公共池
Posted
技术标签:
【中文标题】CompletableFuture 没有被执行。如果我使用 ExecutorService 池,它可以按预期工作,但不能使用默认的 forkJoin 公共池【英文标题】:CompletableFuture is not getting executed. If I use the ExecutorService pool it works as expected but not with the common ForkJoinPool 【发布时间】:2019-01-23 13:37:49 【问题描述】:我正在尝试运行以下类,它会在不执行 CompletableFuture 的情况下终止。
public class ThenApplyExample
public static void main(String[] args) throws Exception
//ExecutorService es = Executors.newCachedThreadPool();
CompletableFuture<Student> studentCompletableFuture = CompletableFuture.supplyAsync(() ->
try
TimeUnit.SECONDS.sleep(2);
catch (InterruptedException e)
e.printStackTrace();
return 3;
)// If I put executorservice created n commented above, programme work as expected.
.thenApply(i ->
for (int j = 0; j <= i; j++)
System.out.println("Inside first then apply");
try
Thread.sleep(2000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("First then apply is finished");
return ++i;
)
.thenApply(i ->
System.out.println("Inside 2nd then apply");
try
Thread.sleep(2000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("Inside 2nd then apply stopped");
return i++;
)
.thenApply(i ->
System.out.println("Inside 3nd then apply");
try
Thread.sleep(2000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("Inside 3nd then apply stopped");
return "The i is ::: " + i;
)
.thenApply(s -> Student.builder().id(1).name(s).address("Some address").build());
System.out.println("Executing..");
System.out.println("Executing..");
System.out.println("Executing..");
System.out.println("Executing..");
System.out.println("Executing..");
//es.shutdown();
我得到的输出是
Executing..
Executing..
Executing..
Executing..
Executing..
而预期的输出是
Executing..
Executing..
Executing..
Executing..
Executing..
Inside first then apply
Inside first then apply
Inside first then apply
Inside first then apply
First then apply is finished
Inside 2nd then apply
Inside 2nd then apply stopped
Inside 3nd then apply
Inside 3nd then apply stopped
注意:在上述程序中,我没有使用 studentCompletableFuture.get()。我不想使用它,因为它会阻止代码。
如果我在程序的最后添加 studentCompletableFuture.get(),它会按预期工作,或者如果我在 supplyAsync 第二个参数中添加 executorservice(检查程序中的评论),它再次按预期工作。
我的问题是为什么程序使用默认的 ForkJoin 公共池时它会终止?
【问题讨论】:
没有深入分析您的代码,但这很可能是因为ForkJoinPool
中的线程是daemon 线程,而@987654325 中的线程@ 不是。
@Slaw 会有什么不同。无论是它的守护线程还是执行器服务池线程,它都应该执行异步调用
因为您的主线程只是启动异步任务并退出。由于ForkJoinPool
线程是守护线程,它们不会使JVM 保持活动状态。换句话说,唯一的非守护线程(主线程)退出,因此 JVM 在异步代码完成之前退出。这就是为什么调用get()
使它起作用的原因;它会导致主线程等待(从而使其保持活动状态)。
@Slaw 这是有道理的。在使用ForkJoinPool
公共池线程而不使用studentCompletableFuture.get() 时,是否有任何其他优雅的方法可以运行此程序。在 java 9 中,有一种方法是使用 studentCompletableFuture.completeOnTimeout 或使用 studentCompletableFuture.orTimeout()。但是,在 java8 中该怎么做呢?
@Slaw 我刚刚使用System.out.println("The thread is :: "+ Thread.currentThread().getName() + Thread.currentThread().isDaemon());
检查了 ForkJoinPool 普通线程不是守护线程第二个想法,无论它是否是守护线程,这些线程都有任务要完成,那么为什么这些让主线程线程退出。
【参考方案1】:
TL;DR:ForkJoinPool
使用守护进程线程,而ExecutorService
使用非守护进程线程。后者使JVM保持活力;前者没有。此外,主线程是一个非守护进程线程,当您阻塞它等待CompletableFuture
完成时,它仍然处于活动状态(从而保持 JVM 处于活动状态)。
守护线程与非守护线程
Java 中的Thread
可以是守护进程 线程或非守护进程 线程。 daemon 线程不会使 JVM 保持活动状态。这种行为是documented:
当 Java 虚拟机启动时,通常有一个非守护线程(通常调用某个指定类的名为
main
的方法)。 Java 虚拟机继续执行线程,直到发生以下任一情况 [强调]:Runtime
类的exit
方法已被调用,安全管理器已允许执行退出操作。 不是守护线程的所有线程都已死亡 [强调添加],要么从对run
方法的调用返回,要么抛出传播到 @ 之外的异常987654334@方法。
换句话说,有多少 daemon 线程处于活动状态或它们正在做什么并不重要——如果没有 非守护进程 线程处于活动状态,那么JVM 将退出。
“主”线程
如上述文档中所述,JVM 启动时通常有一个非守护进程线程。而这个线程通常是调用 main 方法的线程。除非其他非守护进程线程被启动(并保持活动状态),否则一旦主线程终止,JVM 就会退出。
ForkJoinPool
ForkJoinPool
使用 daemon 线程,至少在默认情况下是这样。这种行为也是documented:
所有工作线程都使用
Thread.isDaemon()
设置true
进行初始化。
– 最后一句,类Javadoc的第二段
这意味着提交给ForkJoinPool
的工作不会让JVM 保持活动状态。
执行器服务
Executors
中的工厂方法返回的大多数ExecutorService
实例默认配置为使用非守护进程 线程。不幸的是,这种默认行为似乎没有记录在案。但是,如果您希望池使用 daemon 线程,则可以提供 ThreadFactory
。
这种默认行为的一个例外是#newWorkStealingPool(...)
方法。他们返回一个ForkJoinPool
(一个实现细节)。
代码的行为
您的代码不同版本之间的行为差异可以通过使用非守护进程线程与守护进程线程来解释。
无需等待任务完成
您的原始代码如下所示(大大简化):
import java.util.concurrent.CompletableFuture;
public class Main
public static void main(String[] args)
CompletableFuture.runAsync(
() ->
System.out.println("Sleeping...");
Thread.sleep(2000L); // try-catch omitted for brevity
System.out.println("Done!");
);
该代码正在通过CompletableFuture#runAsync(Runnable)
启动一个异步任务,其中:
返回一个新的 CompletableFuture,它在运行给定操作后由
ForkJoinPool.commonPool()
中运行的任务异步完成。
如您所见,任务被传递给公共ForkJoinPool
。这意味着任务正在由 daemon 线程执行。您也不必等待任务完成。 runAsync
调用提交任务并立即返回。然后主线程简单地退出主方法并终止。由于唯一的非守护进程线程已经终止,JVM 也会退出——在异步任务有时间完成之前。
等待任务完成
当您修改代码以等待未来时:
import java.util.concurrent.CompletableFuture;
public class Main
public static void main(String[] args) throws Exception
CompletableFuture.runAsync(
() ->
System.out.println("Sleeping...");
Thread.sleep(2000L); // try-catch omitted for brevity
System.out.println("Done!");
)
.get(); // wait for future to complete
您现在阻塞了get()
调用中的主线程。所述线程保持阻塞,直到它被中断或任务完成(正常或异常)。这意味着 非守护进程 线程在任务完成之前一直保持活动状态,因此 JVM 保持活动状态。
使用自定义 ExecutorService
再次修改原代码,这次使用自定义ExecutorService
:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main
public static void main(String[] args)
ExecutorService executor = Executors.newCachedThreadPool();
CompletableFuture.runAsync(
() ->
System.out.println("Sleeping...");
Thread.sleep(2000L); // try-catch omitted for brevity
System.out.println("Done!");
,
executor); // use custom ExecutorService
executor.shutdown();
现在任务被提交给给定的ExecutorService
,而不是使用普通的ForkJoinPool
。在这种情况下,线程池正在使用 non-daemon 线程。这意味着这些线程将使 JVM 保持活跃。即使允许主线程退出 main 方法并在任务完成之前终止也是如此。
非守护进程线程被使用的事实是为什么调用#shutdown()
很重要。否则,允许线程持续存在并使JVM“无限期地”保持活动状态。虽然“缓存线程池”是可能的,但具体来说,它可能会让所有线程最终因空闲时间过长而死。
注意调用#shutdown()
仍然允许完成所有已提交的任务。
处理评论
在你的一个 cmets 中你问:
在使用 ForkJoinPool 通用池线程而不使用 studentCompletableFuture.get() 时,是否有任何其他优雅的方式来运行此操作。
我不确定你会认为什么更“优雅”,但你可以改用#join()
方法。该方法的行为与#get()
非常相似,但不会引发检查异常。但请注意:对#join()
的调用不能被中断。也没有超时重载(尽管您可以在 Java 9+ 中将其与 orTimeout
/ completeOnTimeout
结合使用)。
在您提到的另一个 cmets 中:
我刚刚使用
System.out.println("The thread is :: "+ Thread.currentThread().getName() + Thread.currentThread().isDaemon());
检查了 ForkJoinPool 公共线程不是守护线程
我不知道您为什么或如何看到这一点,但以下内容:
import java.util.concurrent.CompletableFuture;
public class Main
public static void main(String[] args)
CompletableFuture.runAsync(
() ->
Thread t = Thread.currentThread();
System.out.printf("Thread[name=%s, daemon=%s]%n", t.getName(), t.isDaemon());
)
.join();
给出这个输出:
Thread[name=ForkJoinPool.commonPool-worker-3, daemon=true]
【讨论】:
1+。如果你不介意,关于等待的小附录。ExecutorService
和 CompletableFuture
与自定义 ForkJoinPool
work very different 如果您打算 await
完成任务。以上是关于CompletableFuture 没有被执行。如果我使用 ExecutorService 池,它可以按预期工作,但不能使用默认的 forkJoin 公共池的主要内容,如果未能解决你的问题,请参考以下文章