错误:POST 方法、XML 请求的“只允许一个连接接收订阅者”

Posted

技术标签:

【中文标题】错误:POST 方法、XML 请求的“只允许一个连接接收订阅者”【英文标题】:Error: "Only one connection receive subscriber allowed" for POST method, XML Request 【发布时间】:2019-06-23 11:51:36 【问题描述】:

我正在编写一个简单的 post 方法来将 XML 正文发布到 url。当我调用 post 方法时,我收到“仅允许一个连接接收订阅者”错误消息。

下面是我用来调用发布方法的服务类。

@Service
public class Service 

    WebClient client = WebClient.create();

    private String publishEndpoint = "xyz.com";

    public Mono<Object> publish() 
        String body = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n"
                + "<dealerReview xmlns=\"http://api.cars.com/schemas/dealer_review.xsd\">\n"
                + "    <customerIdentifier>9240112369283</customerIdentifier>\n" + "    <consumerDealerReview>\n"
                + "        <consumerDealerReviewIdentifier>0</consumerDealerReviewIdentifier>\n"
                + "        <visitReason>\n" + "            <id>0</id>\n" + "            <code>TestingReason</code>\n"
                + "        </visitReason>\n" + "        <title>Test comment</title>\n" + "            <rating>\n"
                + "                <id>\n" + "                    <code>OverallFacilities</code>\n"
                + "                    <ratingNumber>4.0</ratingNumber>\n" + "                </id>\n"
                + "            </rating>\n" + "        </categoryRatings>\n"
                + "        <ipAddress>127.0.0.1</ipAddress>\n" + "        <affiliate>\n"
                + "            <name>yahoo</name>\n" + "        </affiliate>\n" + "    </consumerDealerReview>\n"
                + "</dealerReview>";

        return client.post().uri(publishEndpoint).header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_XML_VALUE)
                .body(BodyInserters.fromObject(body)).retrieve().bodyToMono(Object.class);
    

下面是我得到的堆栈跟踪。

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:277) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:127) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:290) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.core.publisher.FluxPeek.subscribe(FluxPeek.java:83) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume.subscribe(FluxOnErrorResume.java:47) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreElements.subscribe(MonoIgnoreElements.java:37) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:185) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:251) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1521) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onError(MonoIgnoreThen.java:235) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators.error(Operators.java:181) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:52) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1521) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onError(MonoIgnoreThen.java:235) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:76) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:100) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators.error(Operators.java:181) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxError.subscribe(FluxError.java:43) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Flux.subscribe(Flux.java:7734) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators.error(Operators.java:181) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:157) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:86) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators.error(Operators.java:180) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:277) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:127) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:290) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.core.publisher.FluxPeek.subscribe(FluxPeek.java:83) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxOnErrorResume.subscribe(FluxOnErrorResume.java:47) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreElements.subscribe(MonoIgnoreElements.java:37) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3608) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:118) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) [reactor-core-3.2.3.RELEASE.jar:3.2.3.RELEASE]
    at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:378) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:202) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:343) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:325) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:372) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:511) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141) [reactor-netty-0.8.3.RELEASE.jar:0.8.3.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1432) [netty-handler-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1199) [netty-handler-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1243) [netty-handler-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278) [netty-codec-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965) [netty-transport-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:541) [netty-all-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:396) [netty-all-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:189) [netty-all-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:267) [netty-all-4.1.31.Final.jar:4.1.31.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897) [netty-common-4.1.31.Final.jar:4.1.31.Final]

我尝试这样做Spring WebFlux: Only one connection receive subscriber allowed,但没有成功。

这是 HTTP 响应:

 "timestamp": "2019-01-31T06:37:14.306+0000", 
  "path": "/cars/publishReview", "status": 500, 
  "error": "Internal Server Error", "message": "401 Unauthorized" 

仅供参考,如果有帮助,GET 方法工作正常。

【问题讨论】:

【参考方案1】:

从您的WebClient 调用和您的HTTP 响应来看,您的应用程序可能使用的是Spring Framework 5.1.4.RELEASE 之前的版本,您可能会点击this issue。这仅适用于服务器以 4xx / 5xx 响应状态响应的情况。

您的请求提供了字符串正文和 XML Content-Type 标头。这看起来是有效的,即使您可以直接提供一个对象并依赖 JAXB 进行 XML 序列化。

在这种情况下,您的请求可能缺少某种凭据,因为服务器正在响应 HTTP 401(但实际上是 500 状态,这很奇怪)。您应该首先修复请求以使其有效。我认为这与WebClient 是正交的,您可以首先使用curl 来制作该请求。

现在,一旦请求部分被修复并且服务器将按预期回复,您的客户端调用的.bodyToMono(Object.class) 部分似乎很可疑。那时你可能会遇到反序列化问题。根据预期的响应格式,您应该为反序列化选择不同的类型,并可能在您的请求中提供一个“Accept”标头来告诉服务器您对 Content-Type 的期望。

【讨论】:

我在我的项目中使用 spring-boot 2.1。你能指出一个关于如何发送 xml 请求的示例,我怀疑我的请求格式不正确? Spring Boot 2.1.0 依赖于 Spring Framework 5.1.2.RELEASE 这里是 HTTP 响应: "timestamp": "2019-01-31T06:37:14.306+0000", "path": "/cars/publishReview", "status": 500, "error": "Internal Server Error", "message": "401 Unauthorized" 我的请求没有凭据,因此服务器以 401 响应。为我的请求添加了凭据,现在工作正常。

以上是关于错误:POST 方法、XML 请求的“只允许一个连接接收订阅者”的主要内容,如果未能解决你的问题,请参考以下文章

java接受post请求问题,求大侠帮忙~~

WCF Rest POST 方法接受 JSON 和 XML

POST 请求的 Spring Controller 405 错误

错误 405 请求方法 'POST' 不支持 Spring Security

Angular 7:发送 post 请求给出错误 405(不允许的方法)

django post请求 403错误解决方法