Mono vs CompletableFuture

Posted

技术标签:

【中文标题】Mono vs CompletableFuture【英文标题】: 【发布时间】:2019-07-18 20:50:02 【问题描述】:

CompletableFuture 在单独的线程(使用线程池)上执行任务并提供回调函数。假设我在 CompletableFuture 中有一个 API 调用。那是 API 调用阻塞吗?线程会被阻塞直到它没有得到 API 的响应吗? (我知道主线程/tomcat 线程将是非阻塞的,但是 CompletableFuture 任务正在执行的线程呢?)

据我所知,Mono 是完全无阻塞的。

如果我错了,请指出这一点并纠正我。

【问题讨论】:

【参考方案1】:

CompletableFuture 是异步的。但它是非阻塞的吗?

关于 CompletableFuture 的一个正确之处在于它是真正的异步的,它允许您从调用者线程异步运行您的任务,并且 API (例如 thenXXX)允许您在结果可用时对其进行处理。另一方面,CompletableFuture 并不总是非阻塞的。例如,当你运行以下代码时,会在默认的ForkJoinPool上异步执行:

CompletableFuture.supplyAsync(() -> 
    try 
        Thread.sleep(1000);
    
    catch (InterruptedException e) 

    

    return 1;
);

很明显ForkJoinPool中执行任务的Thread最终会被阻塞,也就是说我们不能保证调用是非阻塞的。

另一方面,CompletableFuture 公开了 API,允许您使其真正实现非阻塞。

例如,您始终可以执行以下操作:

public CompletableFuture myNonBlockingHttpCall(Object someData) 
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    myAsyncHttpClient.execute(someData, (result, exception -> 
        if(exception != null) 
            uncompletedFuture.completeExceptionally(exception);
            return;
        
        uncompletedFuture.complete(result);
    )

    return uncompletedFuture;

如您所见,CompletableFuturefuture 的 API 为您提供了 completecompleteExceptionally 方法,可以在需要时完成您的执行,而不会阻塞任何线程。

单声道与 CompletableFuture

在上一节中,我们概述了 CF 行为,但 CompletableFuture 和 Mono 之间的核心区别是什么?

值得一提的是,我们也可以阻塞 Mono。没有人阻止我们编写以下内容:

Mono.fromCallable(() -> 
    try 
        Thread.sleep(1000);
    
    catch (InterruptedException e) 

    

    return 1;
)

当然,一旦我们订阅了future,调用者线程就会被阻塞。但是我们总是可以通过提供一个额外的subscribeOn 运算符来解决这个问题。尽管如此,Mono 更广泛的 API 并不是关键特性。

为了了解CompletableFutureMono 之间的主要区别,让我们回到前面提到的myNonBlockingHttpCall 方法实现。

public CompletableFuture myUpperLevelBusinessLogic() 
    var future = myNonBlockingHttpCall();

    // ... some code

    if (something) 
       // oh we don't really need anything, let's just throw an exception
       var errorFuture = new CompletableFuture();
       errorFuture.completeExceptionally(new RuntimeException());

       return errorFuture;
    

   return future;

CompletableFuture 的情况下,一旦调用该方法,它就会急切地执行对另一个服务/资源的HTTP 调用。即使我们在验证了一些前置/后置条件后并不真正需要执行结果,但它会开始执行,并且将为这项工作分配额外的 CPU/DB-Connections/What-Ever-Machine-Resources。

相比之下,Mono 类型根据定义是惰性的:

public Mono myNonBlockingHttpCallWithMono(Object someData) 
    return Mono.create(sink -> 
            myAsyncHttpClient.execute(someData, (result, exception -> 
                if(exception != null) 
                    sink.error(exception);
                    return;
                
                sink.success(result);
            )
    );
 

public Mono myUpperLevelBusinessLogic() 
    var mono = myNonBlockingHttpCallWithMono();

    // ... some code

    if (something) 
       // oh we don't really need anything, let's just throw an exception

       return Mono.error(new RuntimeException());
    

   return mono;

在这种情况下,在订阅最终的mono 之前什么都不会发生。因此,只有当myNonBlockingHttpCallWithMono方法返回的Mono被订阅时,提供给Mono.create(Consumer)的逻辑才会被执行。

我们可以走得更远。我们可以让我们的执行更加懒惰。您可能知道,Mono 扩展了 Reactive Streams 规范中的 Publisher。 Reactive Streams 的尖叫特性是背压支持。因此,使用Mono API,我们可以仅在真正需要数据并且我们的订阅者准备好使用它们时执行:

Mono.create(sink -> 
    AtomicBoolean once = new AtomicBoolean();
    sink.onRequest(__ -> 
        if(!once.get() && once.compareAndSet(false, true) 
            myAsyncHttpClient.execute(someData, (result, exception -> 
                if(exception != null) 
                    sink.error(exception);
                    return;
                
                sink.success(result);
            );
        
    );
);

在此示例中,我们仅在订阅者调用 Subscription#request 时执行数据,因此它声明已准备好接收数据。

总结

CompletableFuture 是异步的,可以是非阻塞的 CompletableFuture 很渴望。你不能推迟执行。但是你可以取消它们(这总比没有好) Mono 是异步/非阻塞的,可以通过使用不同的运算符组合主 Mono,轻松地对不同的 Thread 执行任何调用。 Mono 真的很懒惰,它允许根据订阅者的存在及其消费数据的准备情况推迟执行启动。

【讨论】:

感谢奥莱的详细解释。真的很感激。这意味着我的理解是正确的:如果我在 CompletableFuture 中进行 api 调用需要 1 秒才能响应,那么 ForkJoinPool 中的线程最终会被阻塞 1 秒?如果我错了,请纠正我。 @XYZ ForkJoinPool 的底层机制更智能一些,所以在任务数量较多的情况下,它可以开始在 fork 中进行另一项工作,而不是阻塞它,但是一旦所有工作完成,它开始加入任务,最终会被阻止。 但正如我所说,这取决于客户端的底层实现。 此外,如果在将任务发送到工作线程之前请求任务结果,则可以在当前线程上执行 ForkJoinTask。这意味着如果您向池提交任务但直接在句柄上调用get(),则提交线程有可能执行该操作【参考方案2】:

以 Oleh 的回答为基础,CompletableFuture 的一个可能的懒惰解决方案是

public CompletableFuture myNonBlockingHttpCall(CompletableFuture<ExecutorService> dispatch, Object someData) 
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    dispatch.thenAccept(x -> x.submit(() -> 
        myAsyncHttpClient.execute(someData, (result, exception -> 
            if(exception != null) 
                uncompletedFuture.completeExceptionally(exception);
                return;
            
            uncompletedFuture.complete(result);
        )
    ));

    return uncompletedFuture;

然后,以后你就简单地做

dispatch.complete(executor);

这将使CompletableFuture 等同于Mono,但我猜没有背压。

【讨论】:

以上是关于Mono vs CompletableFuture的主要内容,如果未能解决你的问题,请参考以下文章

ExecutorService vs CompletableFuture

Spring Boot 2. 异步 API。 CompletableFuture vs. Reactive

Xamarin vs. Mono vs. Monodevelop [关闭]

Mono / XBuild Invariant Language (Invariant Culture) vs. VS2015 MSBuild 的 Neutral Language

Flux 和 Mono 中的 compose() vs. transform() vs. as() vs. map()

mono和VS有时候Attach失败的解决办法