在 reactor Java 中顺序执行 Reactive 任务
Posted
技术标签:
【中文标题】在 reactor Java 中顺序执行 Reactive 任务【英文标题】:Sequential execution of Reactive tasks in reactor Java 【发布时间】:2021-06-02 15:52:33 【问题描述】:我正在努力将阻塞顺序编排框架转换为反应式。目前,这些任务是动态的,并通过 JSON 输入输入引擎。引擎拉取类并执行run()
方法,并将状态与每个任务的响应一起保存。
如何在反应器中实现相同的链接?如果这是一个静态 DAG,我会用 flatMap
或 then
运算符链接它,但由于它是动态的,我该如何继续执行响应式任务并收集每个任务的输出?
示例: 非反应式接口:
public interface OrchestrationTask
OrchestrationContext run(IngestionContext ctx);
核心引擎
public Status executeDAG(String id)
IngestionContext ctx = ContextBuilder.getCtx(id);
List<OrchestrationTask> tasks = app.getEligibleTasks(id);
for(OrchestrationTask task : tasks)
// Eligible tasks are executed sequentially and results are collected.
OrchestrationContext stepContext = task.run(ctx);
if(!evaluateResult(stepContext)) break;
return Status.SUCCESS;
按照上面的示例,如果我将任务转换为返回 Mono> 那么,我如何等待或链接其他任务以对先前任务的结果进行操作? 任何帮助表示赞赏。谢谢。
更新::
反应式任务示例。
public class SampleTask implements OrchestrationTask
@Override
public Mono<OrchestrationContext> run(OrchestrationContext context)
// Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
return Mono.just(context).delayElements(Duration.ofSeconds(2));
所以我将有一系列任务来完成各种事情,但每个任务的响应都依赖于前一个任务并存储在编排上下文中。每当发生错误时,编排上下文标志将设置为 false 并且通量应该停止。
【问题讨论】:
你能举一个你的阻塞代码的例子吗?从你的描述我无法完全想象它是如何工作的。 @MichaelBerry 更新了帖子。如果您还需要更多信息,请告诉我。谢谢。 要明确一点——这里需要什么反应?您提到 task.run() 返回一个单声道,但它是 - 还是 app.getEligibleTasks()、ContextBuilder.getCtx() 和 evaluateResult() 也需要通过返回反应式发布者来避免阻塞操作?此外,该方法目前似乎总是返回SUCCESS
,但只会在每个StepContext
上执行evaluateResult()
,直到其中一个返回false。这是您需要在响应式实现中模拟的行为吗?
@michaelberry 其他方法没有阻塞操作。只有 run 会返回 Mono,它会根据 run 中编写的任务返回状态。此方法的状态为成功,但如果 evaluateResult 为 false,它将停止执行下一个任务。
@MichaelBerry 有什么想法吗?
【参考方案1】:
当然,我们可以:
从任务列表创建通量(如果可以反应生成任务列表,那么您可以直接用通量替换该数组列表,否则保持原样);flatMap()
每个任务到您的 task.run()
方法(根据问题现在返回 Mono
;
确保我们只在 evaluateResult()
为真时使用元素;
...最后还是像以前一样返回SUCCESS
状态。
因此,将所有这些放在一起,只需将循环和返回语句替换为:
Flux.fromIterable(tasks)
.flatMap(task -> task.run(ctx))
.takeWhile(stepContext -> evaluateResult(stepContext))
.then(Mono.just(Status.SUCCESS));
(由于我们已将其设为响应式,因此您的方法显然需要返回 Mono<Status>
而不仅仅是 Status
。)
根据评论更新 - 如果您只是希望它“一次执行一个”而不是同时执行多个,您可以使用 concatMap()
而不是 flatMap()
。
【讨论】:
任务列表的生成只是收集一个类类型的bean。我相信应该没问题吧? @PavanKumar 是的,除非它让他们通过一些奇怪的阻塞操作。 此外,evaluateResult 方法在返回 false 时应终止通量并返回编排上下文直到该点。有没有一种优雅的方法可以做到这一点? iterable 中的任务相互依赖。现在,这些任务是并行触发的,如果有 IO 调用(例如 Webclient),则在返回前一个任务的响应之前执行下一个任务。关于我们如何确保每个任务一个接一个地执行的任何想法?得到响应后。 @PavanKumar 我今天过得很慢 - 你是对的,concatMap()
正是用于此用例的正确运算符。以上是关于在 reactor Java 中顺序执行 Reactive 任务的主要内容,如果未能解决你的问题,请参考以下文章
关于maven多个模块的build顺序 [INFO] Reactor Build Order