如何使用上游任务的输出来驱动 requires() 的结果?
Posted
技术标签:
【中文标题】如何使用上游任务的输出来驱动 requires() 的结果?【英文标题】:How to use the output of an upstream task to drive the result of requires()? 【发布时间】:2020-02-26 06:41:01 【问题描述】:在 Luigi 中,我有一个任务,我想根据另一个上游任务的输出动态生成依赖项列表。例如:
class TaskA:
param = IntParameter()
class TaskB:
def main(self):
pass
def output(self):
return [1,2,3,4]
class TaskC:
def requires(self):
return [TaskB()] + [TaskA(param=p) for p in TaskB().output()]
总之,我在 TaskC 中创建了一组 TaskA 依赖项,基于 TaskB 的输出。
我已经尝试了一些方法,但 Luigi 似乎感到困惑,因为 TaskB 确实需要在 TaskC 返回其依赖项列表之前运行。但显然 Luigi 不能运行任何东西,直到它调用 TaskC.requires()
有什么方法可以让这项工作完成我在这里尝试做的事情吗?
在我的现实生活场景中,这些任务的实现要复杂得多,但这是任务如何连接的要点。
【问题讨论】:
【参考方案1】:这是一个很好的问题! Luigi 实际上为它提供了完美的解决方案,并在文档中进行了介绍:https://luigi.readthedocs.io/en/stable/tasks.html#dynamic-dependencies
基本上,您将需要TaskB
然后yield
来根据输出在run
函数中执行新任务。让我举个例子:
class TaskC:
def requires(self):
return TaskB()
def run(self):
yield [TaskA(param=p) for p in self.input()]
【讨论】:
【参考方案2】:您可以利用将任务 B 的输出存储在临时位置(如 Redis)并在 TaskC 中调用它。然后在TaskC中,可以根据需要使用“yield”创建任务。 Yield 既可以在 requires() 方法中使用,也可以在 run() 方法中使用。如果使用 in run() 方法,则必须处理依赖关系及其失败。
【讨论】:
以上是关于如何使用上游任务的输出来驱动 requires() 的结果?的主要内容,如果未能解决你的问题,请参考以下文章
驱动上下游高效协同,跨境B2B电商平台如何释放LED产业供应链核心价值
FlinkFlink UpsertStreamTableSink requires that has a full primary keys if update