Spring Reactive with MongoDB 失败超过 1000 条记录

Posted

技术标签:

【中文标题】Spring Reactive with MongoDB 失败超过 1000 条记录【英文标题】:Spring Reactive with MongoDB fails for more than 1000 records 【发布时间】:2019-04-11 08:58:31 【问题描述】:

我有两个 Spring Boot 服务,第一个是逐行读取文件并转换成一个通量并发出 POST 请求:

webClient.post()
            .uri("/foobar/bulk")
            .contentType(APPLICATION_STREAM_JSON)
            .body(createFlux(), Foobar.class)
            .retrieve()
            .bodyToMono(Void.class)
            .subscribe();

第二个服务接收到Flux<Foobar>并将其保存到数据库中:

@ResponseStatus(HttpStatus.CREATED)
@PostMapping(value = "/foobar/bulk", consumes = APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> bulkInsert(@RequestBody Flux<Foobar> foobars) 
    return foobarReactiveRepository.insert(foobars).then();

但是只有约 1000 个对象被保存到 mongo db 中,然后它失败了(在第一次服务中):

reactor.core.Exceptions$ErrorCallbackNotImplemented: 

org.springframework.web.reactive.function.client.WebClientResponseException$InternalServerError: 500 Internal Server Error
Caused by: org.springframework.web.reactive.function.client.WebClientResponseException$InternalServerError: 500 Internal Server Error
    at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:151) ~[spring-webflux-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultResponseSpec.lambda$createResponseException$7(DefaultWebClient.java:466) ~[spring-webflux-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:118) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
    at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:378) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:202) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:343) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:325) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:372) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:522) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]

不确定它是否重要,但在第二个服务日志中我可以看到大约 10 行,例如:

[ntLoopGroup-2-7] org.mongodb.driver.connection            : Opened connection [connectionIdlocalValue:7, serverValue:1424] to localhost:27017

如果我为spring.data.mongodb.uri 增加maxPoolSize 参数,我会在数据库中获得更多对象和更多类似上面的日志。

我正在使用

spring-boot-starter-webflux:2.1.0.RELEASE' spring-boot-starter-data-mongodb-reactive:2.1.0.RELEASE

我是否配置错误或使用了错误的 mongo/reactive API?

【问题讨论】:

我过去也做过这样的 API。我也遇到了同样的错误,但不知何故,即使在显示 500 错误后更新成功了。 与其一一插入,不如一口气插入。将通量转换为 foobar 列表的单声道,然后将该列表插入到 repo 中。 @uneq95 我不想这样做,因为我正在读取一个超过 10k 行的大文件 然后分批更新。 看到这个dzone.com/articles/bulk-operations-in-mongodb 【参考方案1】:

实际上所有数据都已插入数据库,但出现了我描述的错误。我修改了一点代码,现在一切正常:

 webClient.post()
            .uri("/foobar/bulk")
            .contentType(APPLICATION_STREAM_JSON)
            .body(createFlux(), Foobar.class)
            .retrieve()
            .bodyToMono(Long.class)
            .subscribe(count -> log.info("items sent and received: " + count));

接收者:

@PostMapping(value = "/bulk", consumes = APPLICATION_STREAM_JSON_VALUE)
public Mono<Long> bulkInsert(@RequestBody Flux<Foobar> foobars) 
    return foobarReactiveRepository.insert(foobars).count();

【讨论】:

对我来说似乎是一种解决方法,而不是合法的解决方案。

以上是关于Spring Reactive with MongoDB 失败超过 1000 条记录的主要内容,如果未能解决你的问题,请参考以下文章

Spring reactive WebClient GET json response with Content-Type "text/plain;charset=UTF-8"

[RxJS] Reactive Programming - Sharing network requests with shareReplay()

[RxJS] Reactive Programming - Using cached network data with RxJS -- withLatestFrom()

FunDA- Reactive Streams:Play with IterateesEnumerator and Enumeratees

Spring boot加载REACTIVE程序过程

Spring Boot Reactive Streams