如何在 Airflow 中设置 DAG 之间的依赖关系?
Posted
技术标签:
【中文标题】如何在 Airflow 中设置 DAG 之间的依赖关系?【英文标题】:How to set dependencies between DAGs in Airflow? 【发布时间】:2016-10-27 14:24:15 【问题描述】:我正在使用Airflow 来安排批处理作业。我有一个每天晚上运行的 DAG (A) 和另一个每月运行一次的 DAG (B)。 B 依赖于 A 已成功完成。但是 B 需要很长时间才能运行,因此我想将其保存在单独的 DAG 中,以实现更好的 SLA 报告。
如何让 DAG B 的运行依赖于 DAG A 在同一天的成功运行?
【问题讨论】:
另见Wiring top-level DAGs together 【参考方案1】:您可以使用名为 ExternalTaskSensor 的运算符来实现此行为。 您在 DAG(B) 中的任务 (B1) 将被安排并等待 DAG(A) 中的任务 (A2) 成功
External Task Sensor documentation
【讨论】:
但是我们无法可视化依赖关系,对吧? @nono 是的。你不会的。 这是推荐的方法吗?我有一个日常任务需要等待 DagA(由 5 个任务组成)和 DagB(5 个单独的任务)。我的 DagC 应该等待这两个都成功,然后从数据库中查询两个表,聚合并加入它们,然后发送一些电子邮件/文件。 @nono 我猜你可以写一些东西来解析你所有的 dag 定义文件并找到 ExternalTaskSensor dag 引用并生成一个网络图。当使用上游/下游函数(和位移快捷方式)在调度程序中加载 dag 时,一定会发生类似的事情。它会稍微复杂一些,因为您需要查看所有 dag 定义。一个好主意,但绝对可行。我想反过来 - 使用更少的 dag 和许多 subdags - 是这种功能的重点。【参考方案2】:看起来TriggerDagRunOperator 也可以使用,您可以使用python callable 来添加一些逻辑。如此处所述:https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand
【讨论】:
是的,triggerDagRunOp 可用于处理 DAG 间的依赖关系,但是,当您有更多相互依赖的 DAG 时,该过程会很棘手。 Airflow 中似乎缺少此功能。不知道开发商有没有打算朝这个方向做? @ozw1z5rd 我建议你去:cwiki.apache.org/confluence/display/AIRFLOW/Roadmap 或在 gitter 或 thea airflow 邮件列表上询问。【参考方案3】:当需要跨DAG依赖时,往往有两个要求:
DAG B
上的任务 B1
需要在 DAG A
上的任务 A1
完成后运行。正如其他人所提到的,这可以使用ExternalTaskSensor
来实现:
B1 = ExternalTaskSensor(task_id="B1",
external_dag_id='A',
external_task_id='A1',
mode="reschedule")
当用户清除 DAG A
上的任务 A1
时,我们希望 Airflow 清除 DAG B
上的任务 B1
以使其重新运行。这可以使用ExternalTaskMarker
来实现(自 Airflow v1.10.8 起)。
A1 = ExternalTaskMarker(task_id="A1",
external_dag_id="B",
external_task_id="B1")
请参阅有关跨 DAG 依赖关系的文档以获取更多详细信息:https://airflow.apache.org/docs/stable/howto/operator/external.html
【讨论】:
很遗憾,由于github.com/apache/airflow/issues/14260,ExternalTaskMarker
目前在 Airflow 2.0.1 中没有用处以上是关于如何在 Airflow 中设置 DAG 之间的依赖关系?的主要内容,如果未能解决你的问题,请参考以下文章
Apache Airflow - 如何在目标 DAG 中使用 TriggerDagRunOperator 设置 execution_date 以使用当前 execution_date