WebClient 停止从 Flux 读取时的异常
Posted
技术标签:
【中文标题】WebClient 停止从 Flux 读取时的异常【英文标题】:Exceptions when a WebClient stops reading from a Flux 【发布时间】:2018-05-07 22:57:07 【问题描述】:我创建了一个返回无限 Flux 的服务器和一个从响应中异步读取对象的客户端。 我希望客户端取消订阅 Flux 并停止处理它。
服务器的控制器:
@GetMapping(path = "/infinite", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreamOfLongs()
return Flux.generate(sink -> sink.next("x"));
客户:
WebClient client = WebClient.create("http://localhost:8080");
Flux<String> flux = client.get()
.uri("/infinite")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class);
Disposable disposable = flux.subscribe(consumer);
Executors.newSingleThreadScheduledExecutor().schedule(() -> disposable.dispose(), 5, TimeUnit.SECONDS);
这是取消订阅信息流的正确方法吗? 当客户端“想要”停止读取更多数据时需要做什么?
当客户端取消订阅时(使用disposable.dispose())服务端抛出2个异常(IOException和UnsupportedOperationException):
java.io.IOException:管道损坏 在 sun.nio.ch.FileDispatcherImpl.writev0(Native Method) ~[na:1.8.0_131] 在 sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) ~[na:1.8.0_131] 在 sun.nio.ch.IOUtil.write(IOUtil.java:148) ~[na:1.8.0_131] 在 sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504) ~ [na:1.8.0_131] 在 io.netty.channel.socket.nio.NiosocketChannel.doWrite(NioSocketChannel.java:403) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 reactor.ipc.netty.channel.ChannelOperationsHandler$PublisherSender.onComplete(ChannelOperationsHandler.java:505) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE] 在 reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:130) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:184) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Flux.subscribe(Flux.java:6516) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:433) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE] 在 reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:179) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE] 在 io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final] 在 reactor.ipc.netty.NettyOutbound.lambda$sendObject$6(NettyOutbound.java:298) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE] 在 reactor.ipc.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:106) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE] 在 reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.ipc.netty.NettyOutbound.subscribe(NettyOutbound.java:356) ~[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE] 在 reactor.core.publisher.FluxConcatMap$ConcatMapDelayed.drain(FluxConcatMap.java:744) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxConcatMap$ConcatMapDelayed.onNext(FluxConcatMap.java:581) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:150) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE] 在 reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxGenerate$GenerateSubscription.next(FluxGenerate.java:164) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 io.github.msayag.webflux.MyController.lambda$getStreamOfLongs$0(MyController.java:44) ~[classes/:na] ... 在 io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) ~[netty-common-4.1.16.Final.jar:4.1.16.Final] 在 java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
紧随其后
2017-11-24 01:04:09.476 错误 83663 --- [ctor-http-nio-2] o.s.w.s.adapter.HttpWebHandlerAdapter:无法处理请求
java.lang.UnsupportedOperationException: null 在 java.util.Collections$UnmodifiableMap.put(Collections.java:1457) ~[na:1.8.0_131] 在 org.springframework.http.HttpHeaders.set(HttpHeaders.java:1439) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE] 在 org.springframework.http.HttpHeaders.setContentType(HttpHeaders.java:849) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE] 在 org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler.write(AbstractErrorWebExceptionHandler.java:235) ~[spring-boot-autoconfigure-2.0.0.M6.jar:2.0.0.M6] 在 org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler.lambda$handle$1(AbstractErrorWebExceptionHandler.java:228) ~[spring-boot-autoconfigure-2.0.0.M6.jar:2.0.0.M6] 在 reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1092) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1649) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:138) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:230) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:172) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1649) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1463) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1337) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Operators.complete(Operators.java:125) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:185) [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:251) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Operators.error(Operators.java:175) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:129) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Mono.subscribe(Mono.java:2913) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 org.springframework.http.server.reactive.ChannelSendOperator$WriteCompletionBarrier.onError(ChannelSendOperator.java:339) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE] 在 reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:87) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1332) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1135) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onError(MonoIgnoreThen.java:300) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:75) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxConcatMap$ConcatMapDelayed.drain(FluxConcatMap.java:660) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxConcatMap$ConcatMapDelayed.onNext(FluxConcatMap.java:581) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.onNext(ChannelSendOperator.java:150) ~[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE] 在 reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 reactor.core.publisher.FluxGenerate$GenerateSubscription.next(FluxGenerate.java:164) ~[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] 在 io.github.msayag.webflux.MyController.lambda$getStreamOfLongs$0(MyController.java:44) ~[classes/:na] ... 在 io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) ~[netty-common-4.1.16.Final.jar:4.1.16.Final] 在 java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
【问题讨论】:
【参考方案1】:据我所知,你这样做是对的。
Disposable::dispose
有效地取消了流,从Subscriber
的角度来看,当您不再对接收数据感兴趣时,您应该这样做。
在WebClient
端调用它会导致关闭HTTP 连接。我认为没有一种“更干净”的方式可以告诉服务器您不想再接收数据了。使用 HTTP/2,情况可能会有所不同,因为可以在不关闭整个连接的情况下关闭 HTTP 流。
从服务器的角度来看,客户端主动取消与客户端因错误而关闭连接看起来相同。所以异常都表明
-
连接在服务器尝试写入时关闭
响应没有被正确处理(服务器还有东西要写)
如果您对此行为有改进想法,请在https://jira.spring.io 上创建票证
【讨论】:
以上是关于WebClient 停止从 Flux 读取时的异常的主要内容,如果未能解决你的问题,请参考以下文章
我可以使用从 Spring5 的 WebClient 返回的 Flux 的 block() 方法吗?
Webclient 与简单 Flux.just 的 Flux 行为不同
如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream