如何使用上游任务的输出来驱动 requires() 的结果?

Posted

技术标签:

【中文标题】如何使用上游任务的输出来驱动 requires() 的结果?【英文标题】:How to use the output of an upstream task to drive the result of requires()? 【发布时间】:2020-02-26 06:41:01 【问题描述】:

在 Luigi 中,我有一个任务,我想根据另一个上游任务的输出动态生成依赖项列表。例如:

class TaskA:
  param = IntParameter()

class TaskB:
  def main(self):
    pass
  def output(self):
    return [1,2,3,4]

class TaskC:
  def requires(self):
    return [TaskB()] + [TaskA(param=p) for p in TaskB().output()]

总之,我在 TaskC 中创建了一组 TaskA 依赖项,基于 TaskB 的输出。

我已经尝试了一些方法,但 Luigi 似乎感到困惑,因为 TaskB 确实需要在 TaskC 返回其依赖项列表之前运行。但显然 Luigi 不能运行任何东西,直到它调用 TaskC.requires()

有什么方法可以让这项工作完成我在这里尝试做的事情吗?

在我的现实生活场景中,这些任务的实现要复杂得多,但这是任务如何连接的要点。

【问题讨论】:

【参考方案1】:

这是一个很好的问题! Luigi 实际上为它提供了完美的解决方案,并在文档中进行了介绍:https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies

基本上,您将需要TaskB 然后yield 来根据输出在run 函数中执行新任务。让我举个例子:

class TaskC:
  def requires(self):
    return TaskB()

  def run(self):
    yield [TaskA(param=p) for p in self.input()]

【讨论】:

【参考方案2】:

您可以利用将任务 B 的输出存储在临时位置(如 Redis)并在 TaskC 中调用它。然后在TaskC中,可以根据需要使用“yield”创建任务。 Yield 既可以在 requires() 方法中使用,也可以在 run() 方法中使用。如果使用 in run() 方法,则必须处理依赖关系及其失败。

【讨论】:

以上是关于如何使用上游任务的输出来驱动 requires() 的结果?的主要内容,如果未能解决你的问题,请参考以下文章

Linux Kernel上游驱动的源码结构要求

如何设置特定的本地分支来跟踪特定的上游分支[重复]

GCM如何使用Jaxl实现上游的服务器端

如何向 Linux 内核上游提交 Patch

驱动上下游高效协同,跨境B2B电商平台如何释放LED产业供应链核心价值

FlinkFlink UpsertStreamTableSink requires that has a full primary keys if update