让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?

Posted

技术标签:

【中文标题】让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?【英文标题】:making Airflow behave like Luigi: how to prevent tasks to be re-run in future runs of a DAG if their output was necessary to be obtained only once? 【发布时间】:2021-11-08 16:30:22 【问题描述】:

我来自 Luigi 的经验,如果一个文件由一个任务成功生成并且该任务也未修改,那么重新运行 DAG 不会重新运行该任务,而是会重用其先前获得的输出.

有什么方法可以通过 AirFlow 获得相同的行为?

目前,如果我重新运行 dag,它会重新执行所有任务,无论它们过去是否产生了成功(且未更改)的输出。所以,基本上我需要一个任务被标记为成功,如果它的代码没有改变。

【问题讨论】:

【参考方案1】:

这是 Airflow 的关键和重要特性,让所有任务都具有幂等性。这意味着在相同输入上重新运​​行任务通常应该用该数据的新处理版本覆盖输出 - 以便可以自动重新运行依赖于它的任务。但重新处理后的数据可能与原来的不同。

这就是为什么在 Airflow 中你有一个回填命令,这基本上意味着。

请为选定的过去运行重新运行此 DAG(例如上周的运行) - 但您应该从任务 X 开始重新处理(这将重新运行任务 X 和依赖于其输出的所有任务)。

这也意味着,当您想要重新运行过去 DAG 的部分内容,但您知道想要在某些任务的现有输出上进行中继时 - 您只需回填依赖于该任务输出的任务(但不是任务本身)。

通过定义应重新运行过去 DAG 运行中的哪些任务(您基本上通过将某些任务的输出设为回填目标来使它们无效),从而提供了更大的灵活性。

这比你提到的情况还多:

a) 如果你不想改变某个任务的输出——你不会回填那个任务——而是它后面的任务

b) 更重要的是 - 如果您想重新处理任务,即使在任务输入和任务本身被修改的情况下,您仍然可以这样做 - 通过回填该任务。

情况 b) 通常很重要,因为某些任务可能具有改变的隐含依赖关系 - 即使输入和任务没有改变,再次处理它可能会产生不同(通常更好)的结果。

我听说的一个很好的例子是电信运营商重新处理通话记录,您必须根据手机的 IMEI 确定手机型号。在这种情况下,您可能只有一个服务来进行映射,但是当制造商更新他们的模型数据库时,它可能会更新到更新的版本 - 当引入新手机时,更新会发生一些延迟,所以上周的定期重新处理即使从 DAG 的 Python 角度来看,输入(“调用列表”)和任务(“将 IMEIS 映射到手机模型”)没有改变,数据也可能会给出不同的结果。

Airflow 几乎总是调用外部服务来运行某些任务,而这些服务本身可能会随着时间的推移而改进 - 这意味着将重新处理限制在“无输入 + 无任务代码”已更改的情况下是非常有限的(但您仍然可以通过选择回填范围来故意决定它 - 即要重新处理哪些任务。

【讨论】:

以上是关于让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?的主要内容,如果未能解决你的问题,请参考以下文章

如何让 list_blob 表现得像 gsutil

Python Enum:如何让枚举成员表现得像零一样?

如何使 ProcessPoolExecutor 中的任务表现得像守护进程?

让 div 表现得像 iframe

让IE 8表现得像IE 7

如何让数据网格表现得像 ctrl 键处于活动状态?