如何在 Spring 响应式 WebClient 中返回 Kotlin Coroutines Flow

Posted

技术标签:

【中文标题】如何在 Spring 响应式 WebClient 中返回 Kotlin Coroutines Flow【英文标题】:How to return a Kotlin Coroutines Flow in Spring reactive WebClient 【发布时间】:2019-09-05 03:16:45 【问题描述】:

Spring 5.2 带来了 Kotlin 协程支持,Spring 反应式 WebClient 在 Kotlin 扩展中获得了协程支持。

我创建了将 GET /posts 公开为流的后端服务,请检查代码 here。

@GetMapping("")
fun findAll(): Flow<Post> =
        postRepository.findAll()

在客户端示例中,我尝试通过以下方式使用 WebClient 来使用此 api。

@GetMapping("")
suspend fun findAll(): Flow<Post> =
        client.get()
                .uri("/posts")
                .accept(MediaType.APPLICATION_JSON)
                .awaitExchange()
                .awaitBody()

由于 Flow 类型的 Jackson 序列化而失败。

由于上述表达式中的 awaitXXX 方法,我必须使用 suspend 修饰符来玩这个。

但是,如果我将正文类型更改为 Any,则以下操作有效,请检查 compelete codes。

GetMapping("")
suspend fun findAll() =
        client.get()
                .uri("/posts")
                .accept(MediaType.APPLICATION_JSON)
                .awaitExchange()
                .awaitBody<Any>()

阅读 spring ref doc 的 the Kotlin Coroutines 后,Flux 应转换为 Kotlin 协程流。如何处理流的返回类型并在此处删除suspend

更新:返回类型改为Flow,在这里查看最新的source codes,我认为它可能是Spring 5.2.0.M2的一部分。 suspend 修饰符是 webclient api 中的 2 阶段协同程序操作所必需的,如下面的 Sébastien Deleuze 所解释的。

【问题讨论】:

【参考方案1】:

首先要了解的是,返回Flow 不需要为处理程序方法本身使用挂起函数。使用Flow,挂起函数通常被隔离在 lambda 参数中。但是在这个(常见)用例中,由于WebClient 2 阶段 API(首先获取响应,然后获取正文),我们需要为 awaitExchange 暂停处理程序方法,然后将正文作为 FlowbodyToFlow分机:

@GetMapping("")
suspend fun findAll() =
    client.get()
        .uri("/posts")
        .accept(MediaType.APPLICATION_JSON)
        .awaitExchange()
        .bodyToFlow<Post>()

从 Spring Framework 5.2 M2 和 Spring Boot 2.2 M3 开始支持此功能(请参阅 related issue)。另见我的related detailed blog post。

【讨论】:

为什么不直接在exchange上添加bodyToFlow呢? WebClient 是一个 2 阶段 API,因为您对主体执行的操作可能取决于对响应状态或标头等其他信息的处理。我已经提交了对 master 的修复,可以随意测试。

以上是关于如何在 Spring 响应式 WebClient 中返回 Kotlin Coroutines Flow的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 WebClient 使用响应式 Spring Rest API

Spring Boot响应式WebClient调用遗留端点

Spring WebClient与RestTemplate性能对比——响应式Spring的道法

Spring WebFlux WebClient - 如何解决 400 错误请求

Spring Webflux 构建响应式 Restful Web 服务

Spring Webflux 构建响应式 Restful Web 服务