WebClient 请求级别超时抛出称为默认 onErrorDropped 的运算符

Posted

技术标签:

【中文标题】WebClient 请求级别超时抛出称为默认 onErrorDropped 的运算符【英文标题】:WebClient request level timeout Throws Operator called default onErrorDropped 【发布时间】:2019-11-17 20:54:09 【问题描述】:

我们正在使用 Spring Reactive WebClient 进行 http 调用。它在下面使用 JettyClientHttpConnector。要设置请求级别超时,我们使用 Mono 超时 API。

如果服务器没有响应,单声道超时,但它也会抛出

Operator called default onErrorDropped java.lang.InterruptedException

在几次出现此错误后,我们在码头连接池中遇到线程不足的问题。

java.lang.IllegalStateException: Insufficient configured threads: required=200 \u003c max=200 for QueuedThreadPool[HttpClient@49e4c2d5]@687d1782STARTED,8\u003c=199\u003c=200,i=1,q=0[ReservedThreadExecutor@3eaa0b62s=0/1,p=0]

我们怀疑由于 onErrorDropped 异常,连接没有被释放。 Mono超时后如何确保连接资源被释放?

依赖

org.springframework:spring-webflux:jar:5.1.4.RELEASE
org.springframework.boot:spring-boot-starter-jetty:jar:2.1.2.RELEASE
org.springframework.cloud:spring-cloud-stream-reactive:jar:2.2.0.M1

WebClient 配置

WebClient.Builder.clone()
.clientConnector(new JettyClientHttpConnector(new HttpClient())).build();

客户来电


URI request = UriComponentsBuilder
.fromUriString("foo")
.path("/path")
.build();

return webClient.put()
   .uri(request)
   .body(BodyInserters.fromObject(fooBar))
   .retrieve()
   .onStatus(HttpStatus::isError, clientResponse -> 
 clientResponse.toEntity(String.class)
                .map(body -> new RunTimeException(body.toString())))
   .bodyToMono(Foo2.class)
   .timeout(Duration.ofMillis(100))
   .retry(3, retryError -> retryError instanceof TimeoutException);

例外

单声道超时:-

java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 500ms in 'lift' (and no fallback has been configured)
reactor.core.publisher.Operators         : Operator called default onErrorDropped java.lang.InterruptedException: null
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[na:1.8.0_192]
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[na:1.8.0_192]
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[na:1.8.0_192]
        at org.eclipse.jetty.io.ManagedSelector.doStart(ManagedSelector.java:106) ~[jetty-io-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:138) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:108) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.io.SelectorManager.doStart(SelectorManager.java:262) ~[jetty-io-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:138) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:108) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.client.AbstractConnectorHttpClientTransport.doStart(AbstractConnectorHttpClientTransport.java:64) ~[jetty-client-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:138) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:108) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.client.HttpClient.doStart(HttpClient.java:250) ~[jetty-client-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.springframework.http.client.reactive.JettyClientHttpConnector.connect(JettyClientHttpConnector.java:98) ~[spring-web-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:103) ~[spring-webflux-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) ~[reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96) [spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96) [spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96) [spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
        at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onNext(MonoSubscribeOn.java:143) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96) [spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:3695) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at org.springframework.cloud.sleuth.instrument.async.TraceCallable.call(TraceCallable.java:70) [spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_192]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_192]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.8.0_192]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_192]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_192]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_192]
reactor.core.scheduler.Schedulers        : Scheduler worker in group main failed with an uncaught exception java.lang.InterruptedException: null
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[na:1.8.0_192]
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[na:1.8.0_192]
        at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) ~[na:1.8.0_192]
        at org.eclipse.jetty.io.ManagedSelector.doStart(ManagedSelector.java:106) ~[jetty-io-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:138) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:108) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.io.SelectorManager.doStart(SelectorManager.java:262) ~[jetty-io-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:138) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:108) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.client.AbstractConnectorHttpClientTransport.doStart(AbstractConnectorHttpClientTransport.java:64) ~[jetty-client-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:138) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:108) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.client.HttpClient.doStart(HttpClient.java:250) ~[jetty-client-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68) ~[jetty-util-9.4.14.v20181114.jar!/:9.4.14.v20181114]
        at org.springframework.http.client.reactive.JettyClientHttpConnector.connect(JettyClientHttpConnector.java:98) ~[spring-web-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
        at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:103) ~[spring-webflux-5.1.4.RELEASE.jar!/:5.1.4.RELEASE]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) ~[reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96) ~[spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96) ~[spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505) ~[reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) ~[reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96) ~[spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
        at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onNext(MonoSubscribeOn.java:143) ~[reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:96) ~[spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505) ~[reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61) ~[reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.publisher.MonoLiftFuseable.subscribe(MonoLiftFuseable.java:55) ~[reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:3695) ~[reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123) ~[reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) [reactor-core-3.2.5.RELEASE.jar!/:3.2.5.RELEASE]
        at org.springframework.cloud.sleuth.instrument.async.TraceCallable.call(TraceCallable.java:70) ~[spring-cloud-sleuth-core-2.1.1.RELEASE.jar!/:2.1.1.RELEASE]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_192]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_192]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_192]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_192]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_192]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_192]

【问题讨论】:

你应该为 webclient docs.spring.io/spring/docs/current/spring-framework-reference/…定义超时时间 @ThomasAndolf 我们想为每个请求级别设置超时,在客户端级别设置超时会消除这种灵活性。 Mono#timeout 与 webclient 连接无关。 Webclient 超时在 tcp 级别上工作,而 Mono#timeout 在发射级别上工作。 【参考方案1】:

该类别的错误 (onXxxDropped) 表示“协议错误”,即接收到的信号不符合 Reactive Streams 规范。这里一个onError 信号(InterruptedException)在另一个onError(超时)之后被接收。

我不确定是什么导致了中断(jetty 或 webflux),以及是否可以在 WebClient 中修复根本原因,但您可以注册一个 global 挂钩来更改默认值onErrorDropped 行为(参见Hooks#onErrorDropped(Consumer))。

或者,您可以使用以下(不支持)代码在有问题的Flux 上注册一个本地挂钩:

Flux<T> doubleErroringFlux;
Consumer<Throwable> droppedErrorConsumer = ...;
Flux<T> fluxToUser = doubleErroringFlux
    .subscriberContext(Context.of("reactor.onErrorDropped.local",
    droppedErrorConsumer);
fluxToUse.subscribe(); //or pass down the method, or whatever

【讨论】:

在消费者中是否可以告诉WebClient返回连接池,以避免线程不足的问题? 不,但这不是必须的。超时触发上游取消,因此应正确分类连接等资源 我已经更新了完整的堆栈跟踪,看来InterruptedException 来自码头HttpClient。我认为问题不在于MonoTimeout,而在于InterruptedException。是否需要单独处理以确保资源被释放。

以上是关于WebClient 请求级别超时抛出称为默认 onErrorDropped 的运算符的主要内容,如果未能解决你的问题,请参考以下文章

WebClient类有没有连接超时,默认值是多少

Spring Boot WebClient OAuth - 同时命中多个请求时超时

限定页面执行时间,请求超时抛出异常或提示

Spring5 WebClient的超时负载均衡异常重试

NPM 请求模块(REST 客户端)的默认超时是多少?

上传时webclient超时