反应器:Flux<object> .subscribe() 与 .toStream()

Posted

技术标签:

【中文标题】反应器:Flux<object> .subscribe() 与 .toStream()【英文标题】:Reactor: Flux<object> .subscribe() vs. .toStream() 【发布时间】:2020-03-21 04:17:01 【问题描述】:

我有一个函数:Flux queryPerson(),它查询数据库以生成对象并在 Flux 中返回它们。当我使用 .subscribe() 时,应用程序只是通过代码运行并退出。它不会等待结果返回查询。但是当我使用 .toStream() 阻止流时,我可以看到打印输出。我做错了什么?

personRepository
    .queryPerson()
    .map(x -> x.getFirst().concat("ok"))
    .subscribe(i -> System.out.println(i))
    //.toStream().forEach(System.out::println)
;

【问题讨论】:

【参考方案1】:

我假设您没有某种网络应用程序,而是一个命令行运行器或简单的 Java 应用程序。考虑到应用程序在异步任务之前完成是正常的。

.订阅

订阅是一种使用传入数据的异步方式,在您订阅 Flux 后,您会立即将控制权返回给调用线程。

这正是反应式编程的工作方式,您定义行为,您可以在其他线程和调用线程中以很好的抽象方式运行它。

正如它在Flux docs 中所说的那样

由于序列可以异步,这将立即返回 控制到调用线程。这可以给人的印象 在主线程或单元测试中执行时不会调用使用者 例如。

.toStream

另一方面,使用 .toStream 您会收到一个 Java 流,即使它的大小未知,您仍然可以像普通 Java 流一样以同步方式对其进行迭代。

更多解释可见.toStream docs of Flux

【讨论】:

感谢您的回答。你说的对。我只是在控制台中运行一个普通的 java 应用程序。这是否意味着我只能在非 Web 应用程序中使用 toStream()? 订阅背后的整个想法是不要阻塞你的主线程来等待答案。例如,假设在 Web 应用程序中,您进行了一些繁重的数据库查询,然后返回给客户端并给出答案,非阻塞/反应式代码是可行的方法,但是因为您有一个主线程并且它在打印您之前结束(程序结束)看不到他们。如果您的程序运行时间更长,您会看到这些打印,我不知道您的情况是真正的应用程序还是培训应用程序,只需使用 toStream 并对其进行迭代以查看结果。 互联网上有一篇关于反应式编程的好文章,我强烈建议至少阅读它的第一部分gist.github.com/staltz/868e7e9bc2a7b8c1f754 @topcan5 与其说你需要一个 Web 应用程序,不如说你需要一个更复杂的应用程序才能真正看到反应式编程的好处。基本上,您需要一个寿命更长的应用程序,将工作转移到后台是有意义的。一个例子显然是一个 Web 应用程序。另一个是桌面 GUI 应用程序。

以上是关于反应器:Flux<object> .subscribe() 与 .toStream()的主要内容,如果未能解决你的问题,请参考以下文章

如何检查 Flux<Object> 是不是为空?

何时使用 Mono<List<Object>> 以及何时使用 Flux<Object> 用于 RestController 方法

Spring反应式编程:如何创建一个动态的Publishers列表作为Flux.merge的输入。

Flux.onErrorContinue 参数类型

如何处理 Flux.fromStream 中的异常抛出

react-native-router-flux 在本机基本页脚选项卡中导航