Spring Reactive with MongoDB 失败超过 1000 条记录
Posted
技术标签:
【中文标题】Spring Reactive with MongoDB 失败超过 1000 条记录【英文标题】:Spring Reactive with MongoDB fails for more than 1000 records 【发布时间】:2019-04-11 08:58:31 【问题描述】:我有两个 Spring Boot 服务,第一个是逐行读取文件并转换成一个通量并发出 POST 请求:
webClient.post()
.uri("/foobar/bulk")
.contentType(APPLICATION_STREAM_JSON)
.body(createFlux(), Foobar.class)
.retrieve()
.bodyToMono(Void.class)
.subscribe();
第二个服务接收到Flux<Foobar>
并将其保存到数据库中:
@ResponseStatus(HttpStatus.CREATED)
@PostMapping(value = "/foobar/bulk", consumes = APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> bulkInsert(@RequestBody Flux<Foobar> foobars)
return foobarReactiveRepository.insert(foobars).then();
但是只有约 1000 个对象被保存到 mongo db 中,然后它失败了(在第一次服务中):
reactor.core.Exceptions$ErrorCallbackNotImplemented:
org.springframework.web.reactive.function.client.WebClientResponseException$InternalServerError: 500 Internal Server Error
Caused by: org.springframework.web.reactive.function.client.WebClientResponseException$InternalServerError: 500 Internal Server Error
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:151) ~[spring-webflux-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultResponseSpec.lambda$createResponseException$7(DefaultWebClient.java:466) ~[spring-webflux-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:118) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.2.RELEASE.jar:3.2.2.RELEASE]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:378) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:202) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:343) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:325) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:372) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:522) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141) ~[reactor-netty-0.8.2.RELEASE.jar:0.8.2.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) ~[netty-codec-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.29.Final.jar:4.1.29.Final]
不确定它是否重要,但在第二个服务日志中我可以看到大约 10 行,例如:
[ntLoopGroup-2-7] org.mongodb.driver.connection : Opened connection [connectionIdlocalValue:7, serverValue:1424] to localhost:27017
如果我为spring.data.mongodb.uri
增加maxPoolSize
参数,我会在数据库中获得更多对象和更多类似上面的日志。
我正在使用
spring-boot-starter-webflux:2.1.0.RELEASE'
spring-boot-starter-data-mongodb-reactive:2.1.0.RELEASE
我是否配置错误或使用了错误的 mongo/reactive API?
【问题讨论】:
我过去也做过这样的 API。我也遇到了同样的错误,但不知何故,即使在显示 500 错误后更新成功了。 与其一一插入,不如一口气插入。将通量转换为 foobar 列表的单声道,然后将该列表插入到 repo 中。 @uneq95 我不想这样做,因为我正在读取一个超过 10k 行的大文件 然后分批更新。 看到这个dzone.com/articles/bulk-operations-in-mongodb 【参考方案1】:实际上所有数据都已插入数据库,但出现了我描述的错误。我修改了一点代码,现在一切正常:
webClient.post()
.uri("/foobar/bulk")
.contentType(APPLICATION_STREAM_JSON)
.body(createFlux(), Foobar.class)
.retrieve()
.bodyToMono(Long.class)
.subscribe(count -> log.info("items sent and received: " + count));
接收者:
@PostMapping(value = "/bulk", consumes = APPLICATION_STREAM_JSON_VALUE)
public Mono<Long> bulkInsert(@RequestBody Flux<Foobar> foobars)
return foobarReactiveRepository.insert(foobars).count();
【讨论】:
对我来说似乎是一种解决方法,而不是合法的解决方案。以上是关于Spring Reactive with MongoDB 失败超过 1000 条记录的主要内容,如果未能解决你的问题,请参考以下文章
Spring reactive WebClient GET json response with Content-Type "text/plain;charset=UTF-8"
[RxJS] Reactive Programming - Sharing network requests with shareReplay()
[RxJS] Reactive Programming - Using cached network data with RxJS -- withLatestFrom()
FunDA- Reactive Streams:Play with IterateesEnumerator and Enumeratees