让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?
Posted
技术标签:
【中文标题】让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?【英文标题】:making Airflow behave like Luigi: how to prevent tasks to be re-run in future runs of a DAG if their output was necessary to be obtained only once? 【发布时间】:2021-11-08 16:30:22 【问题描述】:我来自 Luigi 的经验,如果一个文件由一个任务成功生成并且该任务也未修改,那么重新运行 DAG 不会重新运行该任务,而是会重用其先前获得的输出.
有什么方法可以通过 AirFlow 获得相同的行为?
目前,如果我重新运行 dag,它会重新执行所有任务,无论它们过去是否产生了成功(且未更改)的输出。所以,基本上我需要一个任务被标记为成功,如果它的代码没有改变。
【问题讨论】:
【参考方案1】:这是 Airflow 的关键和重要特性,让所有任务都具有幂等性。这意味着在相同输入上重新运行任务通常应该用该数据的新处理版本覆盖输出 - 以便可以自动重新运行依赖于它的任务。但重新处理后的数据可能与原来的不同。
这就是为什么在 Airflow 中你有一个回填命令,这基本上意味着。
请为选定的过去运行重新运行此 DAG(例如上周的运行) - 但您应该从任务 X 开始重新处理(这将重新运行任务 X 和依赖于其输出的所有任务)。
这也意味着,当您想要重新运行过去 DAG 的部分内容,但您知道想要在某些任务的现有输出上进行中继时 - 您只需回填依赖于该任务输出的任务(但不是任务本身)。
通过定义应重新运行过去 DAG 运行中的哪些任务(您基本上通过将某些任务的输出设为回填目标来使它们无效),从而提供了更大的灵活性。
这比你提到的情况还多:
a) 如果你不想改变某个任务的输出——你不会回填那个任务——而是它后面的任务
b) 更重要的是 - 如果您想重新处理任务,即使在任务输入和任务本身被修改的情况下,您仍然可以这样做 - 通过回填该任务。
情况 b) 通常很重要,因为某些任务可能具有改变的隐含依赖关系 - 即使输入和任务没有改变,再次处理它可能会产生不同(通常更好)的结果。
我听说的一个很好的例子是电信运营商重新处理通话记录,您必须根据手机的 IMEI 确定手机型号。在这种情况下,您可能只有一个服务来进行映射,但是当制造商更新他们的模型数据库时,它可能会更新到更新的版本 - 当引入新手机时,更新会发生一些延迟,所以上周的定期重新处理即使从 DAG 的 Python 角度来看,输入(“调用列表”)和任务(“将 IMEIS 映射到手机模型”)没有改变,数据也可能会给出不同的结果。
Airflow 几乎总是调用外部服务来运行某些任务,而这些服务本身可能会随着时间的推移而改进 - 这意味着将重新处理限制在“无输入 + 无任务代码”已更改的情况下是非常有限的(但您仍然可以通过选择回填范围来故意决定它 - 即要重新处理哪些任务。
【讨论】:
以上是关于让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?的主要内容,如果未能解决你的问题,请参考以下文章