如何在 Airflow 中设置 DAG 之间的依赖关系?

Posted

技术标签:

【中文标题】如何在 Airflow 中设置 DAG 之间的依赖关系?【英文标题】:How to set dependencies between DAGs in Airflow? 【发布时间】:2016-10-27 14:24:15 【问题描述】:

我正在使用Airflow 来安排批处理作业。我有一个每天晚上运行的 DAG (A) 和另一个每月运行一次的 DAG (B)。 B 依赖于 A 已成功完成。但是 B 需要很长时间才能运行,因此我想将其保存在单独的 DAG 中,以实现更好的 SLA 报告。

如何让 DAG B 的运行依赖于 DAG A 在同一天的成功运行?

【问题讨论】:

另见Wiring top-level DAGs together 【参考方案1】:

您可以使用名为 ExternalTask​​Sensor 的运算符来实现此行为。 您在 DAG(B) 中的任务 (B1) 将被安排并等待 DAG(A) 中的任务 (A2) 成功

External Task Sensor documentation

【讨论】:

但是我们无法可视化依赖关系,对吧? @nono 是的。你不会的。 这是推荐的方法吗?我有一个日常任务需要等待 DagA(由 5 个任务组成)和 DagB(5 个单独的任务)。我的 DagC 应该等待这两个都成功,然后从数据库中查询两个表,聚合并加入它们,然后发送一些电子邮件/文件。 @nono 我猜你可以写一些东西来解析你所有的 dag 定义文件并找到 ExternalTask​​Sensor dag 引用并生成一个网络图。当使用上游/下游函数(和位移快捷方式)在调度程序中加载 dag 时,一定会发生类似的事情。它会稍微复杂一些,因为您需要查看所有 dag 定义。一个好主意,但绝对可行。我想反过来 - 使用更少的 dag 和许多 subdags - 是这种功能的重点。【参考方案2】:

看起来TriggerDagRunOperator 也可以使用,您可以使用python callable 来添加一些逻辑。如此处所述:https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

【讨论】:

是的,triggerDagRunOp 可用于处理 DAG 间的依赖关系,但是,当您有更多相互依赖的 DAG 时,该过程会很棘手。 Airflow 中似乎缺少此功能。不知道开发商有没有打算朝这个方向做? @ozw1z5rd 我建议你去:cwiki.apache.org/confluence/display/AIRFLOW/Roadmap 或在 gitter 或 thea airflow 邮件列表上询问。【参考方案3】:

当需要跨DAG依赖时,往往有两个要求:

    DAG B 上的任务 B1 需要在 DAG A 上的任务 A1 完成后运行。正如其他人所提到的,这可以使用ExternalTaskSensor 来实现:

    B1 = ExternalTaskSensor(task_id="B1",
                            external_dag_id='A',
                            external_task_id='A1',
                            mode="reschedule")
    

    当用户清除 DAG A 上的任务 A1 时,我们希望 Airflow 清除 DAG B 上的任务 B1 以使其重新运行。这可以使用ExternalTaskMarker 来实现(自 Airflow v1.10.8 起)。

    A1 = ExternalTaskMarker(task_id="A1", 
                            external_dag_id="B",
                            external_task_id="B1")
    

请参阅有关跨 DAG 依赖关系的文档以获取更多详细信息:https://airflow.apache.org/docs/stable/howto/operator/external.html

【讨论】:

很遗憾,由于github.com/apache/airflow/issues/14260,ExternalTaskMarker 目前在 Airflow 2.0.1 中没有用处

以上是关于如何在 Airflow 中设置 DAG 之间的依赖关系?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Airflow - 如何在目标 DAG 中使用 TriggerDagRunOperator 设置 execution_date 以使用当前 execution_date

Apache Atlas 和 Airflow 集成

大数据调度平台Airflow:Airflow使用

Airflow DAG 从哪里获得它的依赖关系?

Airflow 中文文档:使用测试模式配置

Airflow 中文文档:使用测试模式配置