在 Mono 对象上执行 block() 时出现异常我从 ReactiveMongoRepository 对象返回

Posted

技术标签:

【中文标题】在 Mono 对象上执行 block() 时出现异常我从 ReactiveMongoRepository 对象返回【英文标题】:Getting exception while doing block() on Mono object I got back from ReactiveMongoRepository object 【发布时间】:2020-02-08 21:19:41 【问题描述】:

我有一个服务将数据流式传输到第二个服务,该服务接收对象流并将它们保存到我的 MongoDB。 在我从流服务获得的 Flux 对象上的 subscribe 函数中,我使用 ReactiveMongoRepository 接口中的 save 方法。 当我尝试使用 block 函数并获取数据时,出现以下错误:

2019-10-11 13:30:38.559  INFO 19584 --- [localhost:27017] org.mongodb.driver.connection            : Opened connection [connectionIdlocalValue:1, serverValue:25] to localhost:27017
2019-10-11 13:30:38.566  INFO 19584 --- [localhost:27017] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescriptionaddress=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersionversionList=[4, 0, 1], minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=6218300
2019-10-11 13:30:39.158  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : onNext(Quote(id=null, ticker=AAPL, price=164.8, instant=2019-10-11T10:30:38.800Z))
2019-10-11 13:30:39.411  INFO 19584 --- [ctor-http-nio-4] quote-monitor-service                    : cancel()
2019-10-11 13:30:39.429  INFO 19584 --- [ntLoopGroup-2-2] org.mongodb.driver.connection            : Opened connection [connectionIdlocalValue:3, serverValue:26] to localhost:27017
2019-10-11 13:30:39.437  WARN 19584 --- [ctor-http-nio-4] io.netty.util.ReferenceCountUtil         : Failed to release a message: DefaultHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
    at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
    at 
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
    at

我的代码:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> 
                    Mono<Quote> savedQuote = quoteRepository.save(quote);
                    System.out.println("I saved a quote! Id: " +savedQuote.block().getId());
                );

经过一番挖掘,我设法让它工作,但我不明白为什么它现在工作。 新代码:

stockQuoteClient.getQuoteStream()
                .log("quote-monitor-service")
                .subscribe(quote -> 
                       Mono<Quote> savedQuote = quoteRepository.insert(quote);
                       savedQuote.subscribe(result ->
                                 System.out.println("I saved a quote! Id :: " + result.getId()));
    );

block() 的定义:订阅这个 Mono 并无限期阻塞,直到收到下一个信号。

subscribe()的定义:订阅这个Mono,请求无限制的需求。

有人可以帮我理解为什么阻止不起作用而订阅起作用吗? 我在这里错过了什么?

【问题讨论】:

相关:***.com/questions/51449889/… 【参考方案1】:

阻塞很糟糕,因为它占用了等待响应的线程。这非常在一个反应​​式框架中非常糟糕,它只有很少的线程可供使用,并且被设计为没有它们应该被不必要地阻塞。

这正是响应式框架旨在避免的事情,所以在这种情况下,它只会阻止你这样做:

block()/blockFirst()/blockLast()是阻塞的,线程reactor-http-nio-4不支持

相比之下,您的新代码是异步工作的。线程不会被阻塞,因为在存储库返回一个值之前实际上什么都不会发生(然后您传递给 savedQuote.subscribe() 的 lambda 被执行,将您的结果打印到控制台。)

但是,从反应流的角度来看,新代码仍然不是最佳/正常的,因为您在 subscribe 方法中执行所有逻辑。正常的做法是对我们进行一系列 flatMap/map 调用来转换流中的项目,并使用doOnNext() 来实现副作用(例如打印出一个值):

stockQuoteClient.getQuoteStream()
            .log("quote-monitor-service")
            .flatMap(quoteRepository::insert)
            .doOnNext(result -> System.out.println("I saved a quote! Id :: " + result.getId())))
            .subscribe();

如果您正在对反应器/反应流进行大量工作,那么总体上值得阅读它们。它们对于非阻塞工作非常强大,但与更“标准”的 Java 相比,它们确实需要一种不同的思维方式(和编码方式)。

【讨论】:

以上是关于在 Mono 对象上执行 block() 时出现异常我从 ReactiveMongoRepository 对象返回的主要内容,如果未能解决你的问题,请参考以下文章

我可以运行 2 个 Mono 异步 Reactor Core 吗?

连接到安装了单声道的 Informix DB 时出现 ODBC 错误

调试Xamarin.Android时出现缺少"Mono.Posix 2.0.0"的错误

使用 MPI.NET 和 Mono 框架在超级计算机的 linux 节点上执行分布式计算

block创建时出现Typedef redefinition with different types错误

Mono类型解析