如何在 Spring Webflux / Reactor Netty Web 应用程序中执行阻塞调用

Posted

技术标签:

【中文标题】如何在 Spring Webflux / Reactor Netty Web 应用程序中执行阻塞调用【英文标题】:How to execute blocking calls within a Spring Webflux / Reactor Netty web application 【发布时间】:2018-09-19 15:28:25 【问题描述】:

在我的用例中,我有一个带有 Reactor Netty 的 Spring Webflux 微服务,我有以下依赖项:

org.springframework.boot.spring-boot-starter-webflux (2.0.1.RELEASE) org.springframework.boot.spring-boot-starter-data-mongodb-reactive (2.0.1.RELEASE) org.projectreactor.reactor-spring (1.0.1.RELEASE)

对于一个非常具体的情况,我需要从我的 Mongo 数据库中检索一些信息,并将其处理为使用我的响应式WebClient 发送的查询参数。由于WebClientUriComponentsBuilder 接受发布者(单声道/通量),我使用#block() 调用来接收结果。

由于reactor-core(版本0.7.6.RELEASE)已包含在最新的spring-boot-dependencies(版本2.0.1.RELEASE)中,因此无法再使用:block()/blockFirst()/blockLast() are blocking, which is not supported in thread xxx,参见->@987654321 @

我的代码sn-p:

public Mono<FooBar> getFooBar(Foo foo) 
    MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
    parameters.add("size", foo.getSize());
    parameters.addAll("bars", barReactiveCrudRepository.findAllByIdentifierIn(foo.getBarIdentifiers()) // This obviously returns a Flux
        .map(Bar::toString)
        .collectList()
        .block());

    String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
        .port(8081)
        .path("/foo-bar")
        .queryParams(parameters)
        .build()
        .toString();

    return webClient.get()
        .uri(url)
        .retrieve()
        .bodyToMono(FooBar.class);

这适用于 spring-boot 版本 2.0.0.RELEASE,但由于升级到版本 2.0.1.RELEASE 并因此从 reactor-core 升级到版本 0.7.6.RELEASE,它不再被允许。

我看到的唯一真正的解决方案是包含一个块(非反应性)存储库/mongo 客户端,但我不确定是否鼓励这样做。有什么建议?

【问题讨论】:

感谢您指出这一点。我们有非常相似的代码,在我们下次升级时会遇到一些问题。 【参考方案1】:

WebClient 的请求 URL 不接受 Publisher 类型,但没有什么可以阻止您执行以下操作:

public Mono<FooBar> getFooBar(Foo foo) 

    Mono<List<String>> bars = barReactiveCrudRepository
        .findAllByIdentifierIn(foo.getBarIdentifiers())
        .map(Bar::toString)
        .collectList();

    Mono<FooBar> foobar = bars.flatMap(b -> 

        MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
        parameters.add("size", foo.getSize());
        parameters.addAll("bars", b);

        String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
            .port(8081)
            .path("/foo-bar")
            .queryParams(parameters)
            .build()
            .toString();

        return webClient.get()
            .uri(url)
            .retrieve()
            .bodyToMono(FooBar.class);
    );
    return foobar;         

如果有的话,这个新的反应堆核心检查使您免于在 WebFlux 处理程序中间使用这个阻塞调用使整个应用程序崩溃。

【讨论】:

这不是阻塞吗?如果是,可能一些 publishOn(Scheduler.elastic()) 或类似的东西会有意义,对吧? (不过,我不是这方面的专家) @Frischling 请指出您认为阻塞的部分。 整个 webClient.get()... 部分。它被打包在一个 Mono 中,是的,但最终数据被传输,并且可以任意慢;所以如果传输了很多数据,并且需要很多时间,它应该阻塞主线程,对吧? 调用 webClient.get() 不会对 I/O 做任何事情。订阅 Mono 将开始整个过程​​。该反应式管道没有阻塞 I/O,因此任何时候都不会阻塞任何线程。遥控器可能需要很长时间才能回答(我们无法解决整个世界的延迟问题),但仍然没有区别:没有阻塞操作,没有线程被阻塞。 谢谢布赖恩!这似乎做到了,我对反应式编程还比较陌生,这是我自己无法解决的一种情况。感谢您的帮助和很好的例子,希望对其他人也一样!

以上是关于如何在 Spring Webflux / Reactor Netty Web 应用程序中执行阻塞调用的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 WebFlux 在 Spring Boot 2 中设置登录页面?

如何在 Spring Boot WebFlux 中使用 GET 请求注销

如何使用WebFlux在Spring Boot 2中设置登录页面?

如何使用 Spring Boot 对 WebFlux 进行异常处理?

如何使用 spring webflux 读取请求正文

Spring 5 webflux如何在Webclient上设置超时