如何取消 Java 8 可完成的未来?

Posted

技术标签:

【中文标题】如何取消 Java 8 可完成的未来?【英文标题】:How to cancel Java 8 completable future? 【发布时间】:2014-06-12 18:41:24 【问题描述】:

我正在玩 Java 8 可完成的期货。我有以下代码:

CountDownLatch waitLatch = new CountDownLatch(1);

CompletableFuture<?> future = CompletableFuture.runAsync(() -> 
    try 
        System.out.println("Wait");
        waitLatch.await(); //cancel should interrupt
        System.out.println("Done");
     catch (InterruptedException e) 
        System.out.println("Interrupted");
        throw new RuntimeException(e);
    
);

sleep(10); //give it some time to start (ugly, but works)
future.cancel(true);
System.out.println("Cancel called");

assertTrue(future.isCancelled());

assertTrue(future.isDone());
sleep(100); //give it some time to finish

使用 runAsync 我安排执行等待闩锁的代码。接下来我取消未来,期望内部抛出一个中断的异常。但似乎线程在 await 调用上仍然被阻塞,即使未来被取消(断言通过),也永远不会抛出 InterruptedException。使用 ExecutorService 的等效代码按预期工作。这是 CompletableFuture 还是我的示例中的错误?

【问题讨论】:

你能用Executors.newFixedThreadPoolExecutors.newWorkStealingPool 重现这个问题吗?比较两个不同的 executor 实现,而不是比较 futures 和可完成的 futures,这个问题会更清楚。 JavaDoc 说 cancel(true) 会使用 CancellationException 取消,但您没有发现。 @nosid 你说得对,newWorkStealingPool 显然也不支持取消 【参考方案1】:

显然,这是故意的。方法 CompletableFuture::cancel 的 Javadoc 声明:

[参数:] mayInterruptIfRunning - 此值在此实现中影响,因为中断不用于控制处理。

有趣的是,ForkJoinTask::cancel 方法对参数 mayInterruptIfRunning 使用了几乎相同的措辞。

我对这个问题有一个猜测:

interruption 旨在与阻塞操作一起使用,例如 sleepwait 或 I/O 操作, 但 CompletableFutureForkJoinTask 都不打算用于阻塞操作。

CompletableFuture 应该创建一个新的CompletionStage,而不是阻塞,而 CPU 绑定任务是 fork-join 模型的先决条件。因此,对他们中的任何一个使用 interruption 都会破坏他们的目的。另一方面,它可能会增加复杂性,如果按预期使用,这不是必需的。

【讨论】:

为什么你认为 CompletableFutureForkJoinTask 不适合用于阻塞操作? CompletableFuture 和其他响应式东西的重点是:不要让线程等待一些长时间操作的结果来浪费线程。相反,当结果在这里时,提供一个要调用的回调。反应式方法需要更少的线程。 我之前的评论并不完全正确。 CompletableFuture 允许为阻塞和非阻塞代码提供统一的语法。这在将代码库从阻塞样式迁移到非阻塞时可能很有用【参考方案2】:

CancellationException 是内部 ForkJoin 取消例程的一部分。当您检索future的结果时会出现异常:

try  future.get(); 
      catch (Exception e)
          System.out.println(e.toString());            
      

花了一些时间在调试器中看到这一点。 JavaDoc 并不清楚正在发生什么或您应该期待什么。

【讨论】:

【参考方案3】:

当您调用CompletableFuture#cancel 时,您只会停止链条的下游部分。上游部分,i。 e.最终会调用complete(...)completeExceptionally(...) 的东西不会得到任何信号表明不再需要结果。

那些“上游”和“下游”的东西是什么?

让我们考虑以下代码:

CompletableFuture
        .supplyAsync(() -> "hello")               //1
        .thenApply(s -> s + " world!")            //2
        .thenAccept(s -> System.out.println(s));  //3

在这里,数据从上到下流动——从供应商创建,到功能修改,再到println消费。特定步骤上方的部分称为上游,下方的部分称为下游。例如。第 1 步和第 2 步是第 3 步的上游。

这是幕后发生的事情。这并不精确,而是一个方便的思维模型。

    供应商(步骤 1)正在执行(在 JVM 的公共 ForkJoinPool 内)。 然后供应商的结果由complete(...) 传递给下游的下一个CompletableFuture。 收到结果后,CompletableFuture 调用下一步 - 一个函数(第 2 步),它接受上一步的结果并返回将进一步传递的内容到下游 CompletableFuturecomplete(...)。 收到第 2 步结果后,第 3 步CompletableFuture 调用消费者System.out.println(s)。消费者完成后,下游CompletableFuture 将收到它的值,(Void) null

正如我们所见,这条链中的每个CompletableFuture 都必须知道谁在下游等待将值传递给他们的complete(...)(或completeExceptionally(...))。但是CompletableFuture 不必知道它的上游(或上游 - 可能有几个)。

因此,在第 3 步调用 cancel() 不会中止第 1 步和第 2 步,因为没有从第 3 步到第 2 步的链接。

假设如果您使用的是CompletableFuture,那么您的步骤足够小,因此即使执行几个额外的步骤也不会造成伤害。

如果您希望取消在上游传播,您有两种选择:

自己实现 - 创建一个专用的CompletableFuture(将其命名为cancelled),在每一步之后都会检查它(类似于step.applyToEither(cancelled, Function.identity())) 使用 RxJava 2、ProjectReactor/Flux 或 Akka Streams 等反应式堆栈

【讨论】:

【参考方案4】:

您需要 CompletionStage 的替代实现来完成真正的线程中断。我刚刚发布了一个小型库,正是为了这个目的——https://github.com/vsilaev/tascalate-concurrent

【讨论】:

【参考方案5】:

如果您真的希望能够取消任务,那么您必须使用Future 本身(例如由ExecutorService.submit(Callable&lt;T&gt;) 返回,而不是CompletableFuture。正如nosid,CompletableFuture 完全忽略对cancel(true) 的任何调用。

我怀疑JDK团队没有实现中断是因为:

    打断总是很老套,让人难以理解,也难以相处。尽管对InputStream.read() 的调用是阻塞调用,Java I/O 系统甚至都不是可中断的! (而且 JDK 团队没有计划让标准 I/O 系统再次可中断,就像 Java 早期那样。) JDK 团队一直在非常努力地从早期 Java 时代开始逐步淘汰旧的损坏 API,例如 Object.finalize()Object.wait()Thread.stop() 等。我相信 Thread.interrupt() 被认为是在最终必须弃用和替换的事物类别。因此,较新的 API(如 ForkJoinPoolCompletableFuture)已经不支持它。 CompletableFuture 旨在构建 DAG 结构的操作管道,类似于 Java Stream API。很难简洁地描述数据流 DAG 的一个节点的中断应该如何影响 DAG 其余部分的执行。 (当任何节点中断时,是否应该立即取消所有并发任务?) 我怀疑 JDK 团队只是不想处理正确的中断问题,因为 JDK 和库如今已达到内部复杂程度。 (lambda 系统的内部结构——呃。)

解决这个问题的一个非常老套的方法是让每个CompletableFuture 对自身的引用导出到外部可见的AtomicReference,然后可以在需要时从另一个外部线程直接中断Thread 引用。或者,如果您使用自己的ExecutorService 启动所有任务,在您自己的ThreadPool 中,您可以手动中断任何或所有已启动的线程,即使CompletableFuture 拒绝通过cancel(true) 触发中断。 (请注意,尽管CompletableFuture lambdas 不能抛出已检查异常,因此如果您在 CompletableFuture 中有可中断等待,则必须作为未检查异常重新抛出。)

更简单地说,您可以在外部范围内声明 AtomicReference&lt;Boolean&gt; cancel = new AtomicReference&lt;&gt;(),并定期从每个 CompletableFuture 任务的 lambda 内部检查此标志。

您还可以尝试设置 Future 实例的 DAG 而不是 CompletableFuture 实例的 DAG,这样您就可以准确指定任何一个任务中的异常和中断/取消应该如何影响其他当前运行的任务. I show how to do this in my example code in my question here,它运行良好,但它是很多样板。

【讨论】:

【参考方案6】:

即使调用了Future.cancel(..),等待的调用仍然会阻塞。正如其他人所提到的,CompletableFuture 不会使用中断来取消任务。

根据CompletableFuture.cancel(..)的javadoc:

mayInterruptIfRunning这个值在这个实现中没有影响,因为中断不用于控制处理。

即使实现会导致中断,您仍然需要阻塞操作才能取消任务或通过Thread.interrupted()检查状态。

不要中断Thread,这可能并不总是那么容易做到,您可以在您的操作中设置检查点,您可以在其中优雅地终止当前任务。这可以在将要处理的某些元素的循环中完成,或者您在操作的每个步骤之前检查取消状态并自己抛出CancellationException

棘手的部分是在任务中获取CompletableFuture 的引用,以便调用Future.isCancelled()。这是一个如何完成的示例:

public abstract class CancelableTask<T> 

    private CompletableFuture<T> task;

    private T run() 
        try 
            return compute();
         catch (Throwable e) 
            task.completeExceptionally(e);
        
        return null;
    

    protected abstract T compute() throws Exception;

    protected boolean isCancelled() 
        Future<T> future = task;
        return future != null && future.isCancelled();
    

    public Future<T> start() 
        synchronized (this) 
            if (task != null) throw new IllegalStateException("Task already started.");
            task = new CompletableFuture<>();
        
        return task.completeAsync(this::run);
    

编辑:这里将改进的CancelableTask 版本作为静态工厂:

public static <T> CompletableFuture<T> supplyAsync(Function<Future<T>, T> operation) 
    CompletableFuture<T> future = new CompletableFuture<>();
    return future.completeAsync(() -> operation.apply(future));

这里是测试方法:

@Test
void testFuture() throws InterruptedException 
    CountDownLatch started = new CountDownLatch(1);
    CountDownLatch done = new CountDownLatch(1);

    AtomicInteger counter = new AtomicInteger();
    Future<Object> future = supplyAsync(task -> 
        started.countDown();
        while (!task.isCancelled()) 
            System.out.println("Count: " + counter.getAndIncrement());
        
        System.out.println("Task cancelled");
        done.countDown();
        return null;
    );

    // wait until the task is started
    assertTrue(started.await(5, TimeUnit.SECONDS));
    future.cancel(true);
    System.out.println("Cancel called");

    assertTrue(future.isCancelled());

    assertTrue(future.isDone());
    assertTrue(done.await(5, TimeUnit.SECONDS));

如果您真的想使用除CompletableFuture 之外的中断,那么您可以将自定义Executor 传递给CompletableFuture.completeAsync(..),您可以在其中创建自己的Thread,覆盖cancel(..) 中的CompletableFuture 和打断你的Thread

【讨论】:

以上是关于如何取消 Java 8 可完成的未来?的主要内容,如果未能解决你的问题,请参考以下文章

Java 8 CompletableFuture 与 Netty 未来

知道未来是不是已经开始

Java开发五年,java面试未来职业规划

如何在没有 java.util.Timer 的情况下实现延迟的未来?

谈谈 Java 的未来

java 可赎回和未来