block()/blockFirst()/blockLast() 在 exchange() 之后调用 bodyToMono 时出现阻塞错误

Posted

技术标签:

【中文标题】block()/blockFirst()/blockLast() 在 exchange() 之后调用 bodyToMono 时出现阻塞错误【英文标题】:block()/blockFirst()/blockLast() are blocking error when calling bodyToMono AFTER exchange() 【发布时间】:2018-12-29 04:47:22 【问题描述】:

我正在尝试使用 Webflux 将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,则 api 返回成功,但在生成文件时使用 DTO 详细说明错误,而不是文件本身。这是使用了一个非常古老且设计不佳的 api,所以请原谅使用 post 和 api 设计。

来自 api 调用 (exchange()) 的响应是 ClientResponse。从这里我可以使用 bodyToMono 转换为 ByteArrayResource,它可以流式传输到文件,或者,如果创建文件时出错,那么我也可以使用 bodyToMono 转换为 DTO。但是,我似乎无法根据 ClientResponse 标头的内容做任何事情。

在运行时我得到一个由

引起的 IllegalStateException

block()/blockFirst()/blockLast()是阻塞的,线程reactor-http-client-epoll-12不支持这个

我认为我的问题是我不能在同一个函数链中调用 block() 两次。

我的代码sn-p是这样的:

webClient.post()
        .uri(uriBuilder -> uriBuilder.path("/file/")
                                      .queryParams(params).build())
        .exchange()
        .doOnSuccess(cr -> 
                if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) 
                    NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
                    createErrorFile(dto);
                
                else 
                    ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
                    createSpreadsheet(bAr);
                
            
        )
        .block();

基本上,我想根据标头中定义的 MediaType 以不同方式处理 ClientResponse。

这可能吗?

【问题讨论】:

不要阻止,subscribe。应该没有理由打电话给block。如果您正在使用 WebFlux,那么您这样做是因为您想构建一个反应式管道,如果您正在调用 block,那么您并没有这样做。换句话说,如果你调用 block,只需使用一个普通的旧 RestTemplate - 无论如何,你的代码看起来非常程序化和副作用,所以将它硬塞到 Reactor 不会让它产生神奇的反应。 为了澄清我们正在尝试使用 WebClient 将文件从 Web API 流式传输到磁盘。有错误时响应可能是 200 OK application/json,或者没有错误时响应可能是 200 OK Content-Disposition。我们如何在不加载文件的情况下使用 WebClient 来做到这一点完全在记忆中? 【参考方案1】:

首先,有几件事可以帮助您理解解决此用例的代码 sn-p。

    永远不要在返回反应类型的方法中调用阻塞方法;您将阻塞应用程序的少数线程之一,这对应用程序非常不利 从 Reactor 3.2 开始,blocking within a reactive pipeline throws an error 按照 cmets 的建议,调用 subscribe 也不是一个好主意。它或多或少像在单独的线程中将这项工作作为任务启动。完成后您会收到一个回调(可以为 subscribe 方法提供 lambdas),但实际上您正在将当前管道与该任务解耦。在这种情况下,可能会关闭客户端 HTTP 响应并清理资源,然后您才有机会读取完整的响应正文以将其写入文件 如果您不想在内存中缓冲整个响应,Spring 提供了DataBuffer(想想可以池化的 ByteBuffer 实例)。 如果您正在实现的方法本身是阻塞的(例如返回 void),您可以调用 block,例如在测试用例中。

这是一个代码 sn-p,您可以使用它来执行此操作:

Mono<Void> fileWritten = WebClient.create().post()
        .uri(uriBuilder -> uriBuilder.path("/file/").build())
        .exchange()
        .flatMap(response -> 
            if (MediaType.APPLICATION_JSON_UTF8.equals(response.headers().contentType().get())) 
                Mono<NoPayloadResponseDto> dto = response.bodyToMono(NoPayloadResponseDto.class);
                return createErrorFile(dto);
            
            else 
                Flux<DataBuffer> body = response.bodyToFlux(DataBuffer.class);
                return createSpreadsheet(body);
            
        );
// Once you get that Mono, you should give plug it into an existing
// reactive pipeline, or call block on it, depending on the situation

如您所见,我们不会在任何地方阻塞,处理 I/O 的方法会返回 Mono&lt;Void&gt;,这是一个 done(error) 回调的反应式等价物,它会在事情完成以及是否发生错误时发出信号。

由于我不确定createErrorFile 方法应该做什么,我为createSpreadsheet 提供了一个示例,它只是将正文字节写入文件。请注意,由于数据缓冲区可能会被回收/池化,因此我们需要在完成后释放它们。

private Mono<Void> createSpreadsheet(Flux<DataBuffer> body) 
    try 
        Path file = //...
        WritableByteChannel channel = Files.newByteChannel(file, StandardOpenOption.WRITE);
        return DataBufferUtils.write(body, channel).map(DataBufferUtils::release).then();
     catch (IOException exc) 
        return Mono.error(exc);
    

通过此实现,您的应用程序将在给定时间在内存中保存一些 DataBuffer 实例(反应式运算符出于性能原因预取值),并且会在字节以反应式方式进入时写入它们。

【讨论】:

bodyToMono(Resource.class) 怎么样?那不应该已经处理缓冲了吗? ResourceDecoder 仅支持内存中的变体,例如 InputStreamResource.classByteArrayResource.class。因为你给bodyToMono 提供了一个类而不是一个实例,所以你不能真正要求它写入现有文件。 那么 bodyToMono(ByteArrayResource.class) 也会将整个文件加载到内存中而不是将字节流式传输到文件系统? ExchangeFilterFunction 不是比创建像 createSpreadsheet 这样的单独方法更适合吗? 我不明白您希望该交换功能如何工作。请提出一个新问题,表明您的想法 此解决方案使文件处理程序在 Linux 中保持打开状态。该文件被写入为 0KB,并且有大量处理该文件的子进程。我找不到像使用 channel.close() 那样关闭文件的方法。有时在 Windows 中打开 Excel 文件时表示它已经打开。【参考方案2】:

[2021 年 10 月 19 日更新]

toProcessor() 现已弃用。

考虑使用

myMono.toFuture().get();

正如投票最多的答案所述,永远不应该阻止。就我而言,这是唯一的选择,因为我们在命令式代码中使用了响应式库。拦截可以通过wrapping the mono in a processor:

myMono.toProcessor().block()

【讨论】:

这是一种已弃用的方法。文档要求我们使用共享【参考方案3】:

要在服务器请求池之外执行客户端请求,请使用myWebClientMono.share().block();

【讨论】:

share() 到底是做什么的?? 它将在当前工作池之外执行阻塞调用。【参考方案4】:

对我来说,添加网络依赖解决了这个问题。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

【讨论】:

只是为了让您了解为什么这可以解决问题。它实际上没有解决问题。相反,它完全忽略它。 OP 收到此错误是因为它们在 Netty 上运行。将此添加为依赖项会告诉 Spring 改为在 Tomcat 上运行。在我参加的一次会议上,Spring 团队说:> 如果您同时添加 starter-web 和 starter-webflux,我们假设您正在为 WebClient 添加 webflux,我们将在 Tomcat 上运行您的应用程序。 抱歉,只是在混合中添加依赖项并不能真正解决问题。非阻塞开发需要实际了解您在做什么。我同意@Bwvolleyball。 我同意@Bwvolleyball 和@tftd。但是,如果有人只想使用WebClient 并且不关心阻塞/非阻塞功能,那么我的答案将起作用。因为我的要求并不是真正需要反应式编程能力。【参考方案5】:
RestResultMessage message= createWebClient()
                .get()
                .uri(uri)
                .exchange()
                .map(clientResponse -> 
                    //delegation
                    ClientResponseWrapper wrapper = new 
                                 ClientResponseWrapper(clientResponse);
                    return Mono.just(wrapper);
                )
                .block() //wait until request is not done
                .map(result ->   
                    //convert to any data
                    if (!result.statusCode().isError())
                       //extract the result from request
                        return create(RestResultMessage.Result.success, result.bodyToMono(String.class).block());
                     else 
                        return create(RestResultMessage.Result.error, result.statusCode().name());
                    
                )
                .block();

【讨论】:

以上是关于block()/blockFirst()/blockLast() 在 exchange() 之后调用 bodyToMono 时出现阻塞错误的主要内容,如果未能解决你的问题,请参考以下文章

为啥我解决了“Debug Assertion Failed OpenCv is_block_type_valid(header->_block_use)”

[转]iOS代码块Block

block和delegate的选择

__block 和__weak 区别及使用

block使用场景和注意事项

iOS block 为啥官方文档建议用 copy 修饰