Spring中RESTful API上flux行为的解释

Posted

技术标签:

【中文标题】Spring中RESTful API上flux行为的解释【英文标题】:Explanation of flux behavior on RESTful API in Spring 【发布时间】:2019-11-08 14:37:54 【问题描述】:

我正在努力理解 Spring 的 Webflux 和反应式 API,如果使用注释而不是路由/处理程序声明 REST 端点,我希望能解释一下我看到的通量行为差异。

我看到的是通量(流?)在通过注释定义的 REST 端点获取时正在执行,而不是从使用路由/处理程序语义定义的 REST 端点获取。

谁能解释观察到的行为的差异?下面提供的代码和控制台输出...

注意:DemoRequestHandler.getOddIntsMult() 中的注释代码将导致 Flux 执行并遍历包含的整数以查找/返回奇数值。我想我真正的问题是“为什么在一个实例中需要 subscribe() 而不是另一个?”

带注释的 REST 控制器...

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import static org.springframework.web.bind.annotation.RequestMethod.GET;

@RestController
public class DemoFluxController 

    @RequestMapping(method=GET, value="/v1/fluxMult")
    public Flux<Integer> getMultFlux() 
        System.out.println("DEBUG -> FluxController.getMultFlux()");
        return DemoFlux.getOddInts(DemoFlux.multIntFlux);
    

用于返回奇整数通量的测试类...

import reactor.core.publisher.Flux;

public class DemoFlux 

    public static Flux<Integer> multIntFlux = Flux.range(1, 20);

    private static boolean isOdd(Integer intVal) 
        System.out.printf("DEBUG -> DemoFlux.isOdd( %d )%n", intVal);
        return intVal % 2 != 0;
    

    public static Flux<Integer> getOddInts(Flux<Integer> intFlux) 
        return intFlux.filter(DemoFlux::isOdd);
    

声明备用 REST 端点的路由器实现...

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

@Configuration
public class DemoRouter 

    @Bean
    public RouterFunction<ServerResponse> route(DemoRequestHandler requestHandler) 
        return RouterFunctions.route(RequestPredicates.GET("/v2/fluxMult")
                          .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), requestHandler::getOddIntsMult);
    

与 REST 端点的路由关联的请求处理程序。

import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

@Component
public class DemoRequestHandler 

    public Mono<ServerResponse> getOddIntsMult(ServerRequest request) 
        System.out.println("DEBYG -> DemoRequestHandler.getOddIntsMult()");

        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
            .body(BodyInserters.fromObject(DemoFlux.getOddInts(DemoFlux.multIntFlux)));
//                .body(BodyInserters.fromObject(DemoFlux.getOddInts(DemoFlux.multIntFlux).subscribe()));
    

运行的控制台输出。请注意根据访问的 REST 端点的不同行为...

DEBUG -> DemoFluxController.getMultFlux()
DEBUG -> DemoFlux.getOddInts()
DEBUG -> DemoFlux.isOdd( 1 )
DEBUG -> DemoFlux.isOdd( 2 )
DEBUG -> DemoFlux.isOdd( 3 )
DEBUG -> DemoFlux.isOdd( 4 )
DEBUG -> DemoFlux.isOdd( 5 )
DEBUG -> DemoFlux.isOdd( 6 )
DEBUG -> DemoFlux.isOdd( 7 )
DEBUG -> DemoFlux.isOdd( 8 )
DEBUG -> DemoFlux.isOdd( 9 )
DEBUG -> DemoFlux.isOdd( 10 )
DEBUG -> DemoFlux.isOdd( 11 )
DEBUG -> DemoFlux.isOdd( 12 )
DEBUG -> DemoFlux.isOdd( 13 )
DEBUG -> DemoFlux.isOdd( 14 )
DEBUG -> DemoFlux.isOdd( 15 )
DEBUG -> DemoFlux.isOdd( 16 )
DEBUG -> DemoFlux.isOdd( 17 )
DEBUG -> DemoFlux.isOdd( 18 )
DEBUG -> DemoFlux.isOdd( 19 )
DEBUG -> DemoFlux.isOdd( 20 )

DEBUG -> DemoRequestHandler.getOddIntsMult()
DEBUG -> DemoFlux.getOddInts()

【问题讨论】:

【参考方案1】:

它们的行为不同,因为您没有对它们进行相同的编码。

人们在使用 webflux/reactive 编程时最常犯的错误之一是他们在应用程序中间订阅,因为他们想要或需要某物的具体价值。

WebFlux/反应式编程基本上是我们声明一个我们希望在有人订阅时发生的事件链。在这种情况下,它是客户端订阅的时候。所以你能感觉到有什么不对劲真是太好了,因为有!

你的问题在这里:

public Mono<ServerResponse> getOddIntsMult(ServerRequest request) 
    System.out.println("DEBYG -> DemoRequestHandler.getOddIntsMult()");

    return ServerResponse.ok()
        .contentType(MediaType.APPLICATION_JSON)
        .body(BodyInserters.fromObject(              // This body inserter here
            DemoFlux.getOddInts(DemoFlux.multIntFlux)));

发生的情况是 BodyInserter 需要一个具体的值,如果您查看它的源代码,您会发现 body inserter 将 body 包装在一个单声道中。因此,您所做的是将Flux 包装在Mono 中,因此您实际上得到的是Mono&lt;Flux&lt;Integer&gt;&gt;

在您使用 webflux 工作一段时间后,当您得到类似以下内容时,您会发现情况就是这样:


    "disposed": true,
    "scanAvailable": true

然后你知道你得到了一个包裹的单声道/通量,你在某个地方错过了一个平面图。

如果您重写它并删除BodyInserter,它将按预期工作。

public Mono<ServerResponse> getOddIntsMult(ServerRequest request) 
    System.out.println("DEBYG -> DemoRequestHandler.getOddIntsMult()");

    return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
            .body(DemoFlux.getOddInts(DemoFlux.multIntFlux), Integer.class);                

所以没有区别,您的@RequestMapping(method=GET, value="/v1/fluxMult") 不使用DemoRequestHandler,因此您得到不同的结果。

【讨论】:

谢谢!这很有效,让我对 WebFlux/reactive 有了更多的了解。此外,您引用的返回值(即“disposed”:true,...)正是我所看到的。 不用担心,很高兴我能帮上忙。一开始有点棘手,但一段时间后你就会进入功能性思维方式。

以上是关于Spring中RESTful API上flux行为的解释的主要内容,如果未能解决你的问题,请参考以下文章

Webclient 与简单 Flux.just 的 Flux 行为不同

Java / Spring中与Restful API交互的工具[重复]

如何在 Postman 中查看 Spring 5 Reactive API 的响应?

如何在 Spring Boot 中同时公开 SOAP Web 服务和 RESTful API?

Spring Boot - Restful API

Log Spring webflux 类型 - Mono 和 Flux