在 Spring Boot 客户端中接收通量

Posted

技术标签:

【中文标题】在 Spring Boot 客户端中接收通量【英文标题】:Receiving a Flux in a Spring Boot Client 【发布时间】:2017-07-17 18:42:52 【问题描述】:

这是Spring 5 Web Reactive - How can we use WebClient to retrieve streamed data in a Flux?的后续问题

我尝试遵循如何使用 Spring WebClient 接收 Flux 的建议,但实际上遇到了一个网络问题。

在服务器端,代码是一个简单的控制器,暴露了 Mongo 存储库的 findAll 方法:

@RequestMapping("power")
public Flux<Power> getAll() 
    return powerRepository.findAll();

在客户端,消费代码就像上面给出的答案:

@Bean
public CommandLineRunner readFromApiServer() 
    return new CommandLineRunner() 
        @Override
        public void run(String... args) throws Exception 
            WebClient webClient = WebClient.create("http://localhost:8080");
            Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
                    .flatMap(response -> response.bodyToFlux(Power.class));
            flux.subscribe();
        
    ;

但这会引发异常:

2017-02-27 08:19:41.281 错误 99026 --- [ctor-http-nio-5] r.ipc.netty.channel.ChannelOperations:[HttpClient] 错误 处理连接。请求关闭频道

io.netty.util.IllegalReferenceCountException:refCnt:0,递减:1 在 io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:101) ~[netty-all-4.1.8.Final.jar:4.1.8.Final]

我正在使用当前的 Spring Boot 2.0.0 BUILD-SNAPSHOT。

这个异常告诉我什么?我怎样才能做到正确?

【问题讨论】:

【参考方案1】:

所有CommandLineRunner bean 在 Spring Boot 应用程序启动时执行。如果没有守护线程(即当前应用程序不是 Web 应用程序),则应用程序将在所有运行器执行完毕后关闭。

在您的情况下,仅使用flux.subscribe()“启动链并请求无限需求”(javadoc),因此此方法调用不会阻塞。我怀疑这个命令行运行程序在你有机会对你的通量做任何事情之前返回,应用程序关闭并且你的网络资源被关闭。

此外,您没有对 HTTP 请求的结果做任何事情。 我认为使用以下内容更新您的代码示例应该可以解决问题:

@Bean
public CommandLineRunner readFromApiServer() 
    return new CommandLineRunner() 
        @Override
        public void run(String... args) throws Exception 
            WebClient webClient = WebClient.create("http://localhost:8080");
            Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
                    .flatMap(response -> response.bodyToFlux(Power.class));
            List<Power> powerList = flux.collectList().block();
            // do something with the result?
        
    ;

【讨论】:

实际上我在主程序中有一个无限循环,所以有一个无守护线程阻止应用程序关闭。但看起来消费者在自己的线程中运行,如果没有被阻塞,则立即返回。我没有在列表中收集结果,而是在它们到达时使用“Powers”:flux.doOnNext(power -> System.out.println(power.getValue())).blockLast();这工作正常。感谢您的支持!

以上是关于在 Spring Boot 客户端中接收通量的主要内容,如果未能解决你的问题,请参考以下文章

Angularjs 正在从 Spring Boot 接收 XSRF-TOKEN 但不发送 X-XSRF-TOKEN

如何将 spring-boot 作为客户端应用程序运行?

如何设置Spring Boot中@RequestBody反序列化实体的默认值

Spring Boot中无法拦截和操作HttpServletResponse

Spring Boot Webflux/Netty - 检测关闭的连接

在 Angular 5 中接收通量 SSE