在 reactor Java 中顺序执行 Reactive 任​​务

Posted

技术标签:

【中文标题】在 reactor Java 中顺序执行 Reactive 任​​务【英文标题】:Sequential execution of Reactive tasks in reactor Java 【发布时间】:2021-06-02 15:52:33 【问题描述】:

我正在努力将阻塞顺序编排框架转换为反应式。目前,这些任务是动态的,并通过 JSON 输入输入引擎。引擎拉取类并执行run() 方法,并将状态与每个任务的响应一起保存。 如何在反应器中实现相同的链接?如果这是一个静态 DAG,我会用 flatMapthen 运算符链接它,但由于它是动态的,我该如何继续执行响应式任务并收集每个任务的输出?

示例: 非反应式接口:

public interface OrchestrationTask 
 OrchestrationContext run(IngestionContext ctx);

核心引擎

public Status executeDAG(String id) 
  IngestionContext ctx = ContextBuilder.getCtx(id);
  List<OrchestrationTask> tasks = app.getEligibleTasks(id);

  for(OrchestrationTask task : tasks) 
    // Eligible tasks are executed sequentially and results are collected.
    OrchestrationContext stepContext = task.run(ctx);
    if(!evaluateResult(stepContext)) break;
  
  return Status.SUCCESS;

按照上面的示例,如果我将任务转换为返回 Mono> 那么,我如何等待或链接其他任务以对先前任务的结果进行操作? 任何帮助表示赞赏。谢谢。

更新::

反应式任务示例。

public class SampleTask implements OrchestrationTask   
  @Override
  public Mono<OrchestrationContext> run(OrchestrationContext context)   
  // Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
  return Mono.just(context).delayElements(Duration.ofSeconds(2));
 

所以我将有一系列任务来完成各种事情,但每个任务的响应都依赖于前一个任务并存储在编排上下文中。每当发生错误时,编排上下文标志将设置为 false 并且通量应该停止。

【问题讨论】:

你能举一个你的阻塞代码的例子吗?从你的描述我无法完全想象它是如何工作的。 @MichaelBerry 更新了帖子。如果您还需要更多信息,请告诉我。谢谢。 要明确一点——这里需要什么反应?您提到 task.run() 返回一个单声道,但它是 - 还是 app.getEligibleTasks()、ContextBuilder.getCtx() 和 evaluateResult() 也需要通过返回反应式发布者来避免阻塞操作?此外,该方法目前似乎总是返回SUCCESS,但只会在每个StepContext 上执行evaluateResult(),直到其中一个返回false。这是您需要在响应式实现中模拟的行为吗? @michaelberry 其他方法没有阻塞操作。只有 run 会返回 Mono,它会根据 run 中编写的任务返回状态。此方法的状态为成功,但如果 evaluateResult 为 false,它将停止执行下一个任务。 @MichaelBerry 有什么想法吗? 【参考方案1】:

当然,我们可以:

从任务列表创建通量(如果可以反应生成任务列表,那么您可以直接用通量替换该数组列表,否则保持原样); flatMap() 每个任务到您的 task.run() 方法(根据问题现在返回 Mono; 确保我们只在 evaluateResult() 为真时使用元素; ...最后还是像以前一样返回SUCCESS 状态。

因此,将所有这些放在一起,只需将循环和返回语句替换为:

Flux.fromIterable(tasks)
        .flatMap(task -> task.run(ctx))
        .takeWhile(stepContext -> evaluateResult(stepContext))
        .then(Mono.just(Status.SUCCESS));

(由于我们已将其设为响应式,因此您的方法显然需要返回 Mono&lt;Status&gt; 而不仅仅是 Status。)

根据评论更新 - 如果您只是希望它“一次执行一个”而不是同时执行多个,您可以使用 concatMap() 而不是 flatMap()

【讨论】:

任务列表的生成只是收集一个类类型的bean。我相信应该没问题吧? @PavanKumar 是的,除非它让他们通过一些奇怪的阻塞操作。 此外,evaluateResult 方法在返回 false 时应终止通量并返回编排上下文直到该点。有没有一种优雅的方法可以做到这一点? iterable 中的任务相互依赖。现在,这些任务是并行触发的,如果有 IO 调用(例如 Webclient),则在返回前一个任务的响应之前执行下一个任务。关于我们如何确保每个任务一个接一个地执行的任何想法?得到响应后。 @PavanKumar 我今天过得很慢 - 你是对的,concatMap() 正是用于此用例的正确运算符。

以上是关于在 reactor Java 中顺序执行 Reactive 任​​务的主要内容,如果未能解决你的问题,请参考以下文章

关于maven多个模块的build顺序 [INFO] Reactor Build Order

两种高性能 I/O 设计模式 Reactor 和 Proactor

Java中NIO及基础实现

Spring WebFlux学习记录

Spring WebFlux学习记录

Spring WebFlux学习记录