使用Project Reactor对反应流进行递归
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Project Reactor对反应流进行递归相关的知识,希望对你有一定的参考价值。
我的目标是遍历目录图并使用反应流和Project Reactor记录所有名称。
由于文件系统是远程的,因此对它的调用是阻塞的。因此,我希望将阻塞调用的执行与其他非阻塞异步代码分开。我正在使用这个建议:http://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking。
这是我需要遍历的结构:
/
/jupiter
/phase-1
/sub-phase-1
/sub-phase-2
/sub-phase-3
/phase-2
/phase-3
/phase-4
/earth
/phase-1
/sub-phase-1
/sub-phase-2
/sub-phase-3
/phase-2
/phase-3
/phase-4
/mars
/phase-1
/sub-phase-1
/sub-phase-2
/sub-phase-3
/phase-2
/phase-3
/phase-4
这是我到目前为止提出的代码:
public class ReactorEngine {
private static Logger log = LoggerFactory.getLogger(ReactorEngine.class);
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Server server = new Server();
Flux.fromIterable(server.getChildren("/"))
.flatMap(parent -> Mono.fromCallable(() -> server.getChildren(parent)).subscribeOn(Schedulers.elastic()))
.publishOn(Schedulers.elastic())
.doOnTerminate(latch::countDown)
.subscribe(ReactorEngine::handleResponse);
latch.await();
}
private static void handleResponse(List<String> value) {
log.info("Received: " + value);
}
}
public class Server {
public List<String> getChildren(final String path) {
// Generate some I/O
...
}
}
所以我从顶级目录开始,并异步请求第一级(他们的孩子)。一切顺利,这是输出:
15:35:05.902 [main] INFO org.playground.async.mock.Server - Requesting children of: /...
15:35:07.062 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:35:07.140 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/...
15:35:07.140 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/...
15:35:07.140 [elastic-5] INFO org.playground.async.mock.Server - Requesting children of: /mars/...
15:35:08.140 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/earth/phase-1/, /earth/phase-2/, /earth/phase-3/]
15:35:08.141 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/jupiter/phase-1/, /jupiter/phase-2/, /jupiter/phase-3/]
15:35:08.141 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: [/mars/phase-1/, /mars/phase-2/, /mars/phase-3/]
现在我的问题是如何将作为结果的元素放回到flux中,以便引擎将递归调用server.getChildren(parent)直到遍历整个目录图形?
实际上是递归的方式,或者是否有更好的“反应”方式,可能通过运营商?
谢谢!
编辑
西蒙建议的expand(Function)
算子非常适合遍历图表。我已将代码更改为:
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
Server server = new Server();
Flux.fromIterable(server.getChildren("/"))
.expand(p -> Flux.fromIterable(server.getChildren(p)).subscribeOn(Schedulers.elastic()))
.publishOn(Schedulers.elastic())
.doOnTerminate(latch::countDown)
.subscribe(ReactorEngine::handleResponse);
latch.await();
}
但是,我失去了调用我的服务器的阻塞server.getChildren(String)
方法的异步方式。正如您在这些日志中看到的,每个子目录是同步获取的,每秒一次:
15:57:55.398 [main] INFO org.playground.async.mock.Server - Requesting children of: /...
15:57:56.558 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
15:57:56.593 [main] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/...
15:57:56.593 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/
15:57:57.594 [main] INFO org.playground.async.mock.Server - Requesting children of: /earth/...
15:57:57.594 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/
15:57:58.594 [main] INFO org.playground.async.mock.Server - Requesting children of: /mars/...
15:57:58.594 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/
15:57:59.599 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-1/...
15:57:59.599 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-1/
15:58:00.600 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-2/...
15:58:00.600 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-2/
15:58:01.600 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-3/...
15:58:01.600 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-3/
15:58:02.601 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /jupiter/phase-4/...
15:58:02.601 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /jupiter/phase-4/
15:58:03.602 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-1/...
15:58:03.603 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-1/
15:58:04.604 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-2/...
15:58:04.604 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-2/
15:58:05.604 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-3/...
15:58:05.604 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-3/
15:58:06.605 [elastic-4] INFO org.playground.async.mock.Server - Requesting children of: /earth/phase-4/...
15:58:06.605 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /earth/phase-4/
15:58:07.605 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-1/...
15:58:07.605 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-1/
15:58:08.606 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-2/...
15:58:08.606 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-2/
15:58:09.607 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-3/...
15:58:09.607 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-3/
15:58:10.608 [elastic-3] INFO org.playground.async.mock.Server - Requesting children of: /mars/phase-4/...
15:58:10.608 [elastic-2] INFO org.playground.async.engine.ReactorEngine - Received: /mars/phase-4/
你能否提供一个关于如何将Mono.fromCallable(() -> server.getChildren(parent)).subscribeOn(Schedulers.elastic())
的电话带回计划的提示?我可以打电话给没有Flux.fromCallable()
,也许是有充分理由的。
但由于我对反应式编程和Project Reactor的概念不熟悉,因此很难绕过这种异步方式。
谢谢。
有一个运营商:)看看expand
和expandDeep
以上是关于使用Project Reactor对反应流进行递归的主要内容,如果未能解决你的问题,请参考以下文章
带有消息代理(例如 Kafka)的事件驱动微服务与反应式编程(RxJava、Project Reactor)以及改进的协议(RSocket)