如何使用 Python 在 Airflow 中成功触发另一个 DAG 时触发 DAG?

Posted

技术标签:

【中文标题】如何使用 Python 在 Airflow 中成功触发另一个 DAG 时触发 DAG?【英文标题】:How to Trigger a DAG on the success of a another DAG in Airflow using Python? 【发布时间】:2020-08-14 07:32:12 【问题描述】:

我有一个 python DAG Parent Job 和 DAG Child JobChild Job 中的任务应在成功完成每天运行的Parent Job 任务时触发。如何添加外部作业触发器?

我的代码

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())


default_args = 
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)


dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')

execute_notebook = PostgresOperator(
  task_id='data_sql',
  postgres_conn_id='REDSHIFT_CONN',
  sql="SELECT * FROM athena_rs.shipments limit 5",
  dag=dag
)

【问题讨论】:

这能回答你的问题吗? How to set dependencies between DAGs in Airflow? @LuckyGuess 该示例显示一个任务另一个 dag 触发另一个任务中的另一个任务。在这里我认为他在看什么,完成一个 DAG 完全触发下一个 DAG。如果你能举个例子就好了。 我强烈建议使用TriggerDagRunOperator 来执行响应式触发,而不是ExternalTaskSensor 来执行基于轮询的触发 @y2k-shubham,如果你能像下面写的那样写一个例子,它也会为其他人学习。我也面临同样的问题。 @pankaj 我添加了一个描述TriggerDagRunOperator使用的答案 【参考方案1】:

作为@pankaj 的requested,我在此添加一个描述反应性触发的sn-p,使用TriggerDagRunOperator(而不是基于投票的触发ExternalTaskSensor)

from typing import List

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule

# DAG object
my_dag: DAG = DAG(dag_id='my_dag',
                  start_date=..)
..
# a list of 'tail' tasks: tasks that have no downstream tasks
tail_tasks_of_first_dag: List[BaseOperator] = my_magic_function_that_determines_all_tail_tasks(..)
..

# our trigger task
my_trigger_task: TriggerDagRunOperator = TriggerDagRunOperator(dag=my_dag,
                                                               task_id='my_trigger_task',
                                                               trigger_rule=TriggerRule.ALL_SUCCESS,
                                                               external_dag_id='id_of_dag_to_be_triggered')
# our trigger task should run when all 'tail' tasks have completed / succeeded
tail_tasks_of_first_dag >> my_trigger_task

请注意,sn-p 仅供参考;它尚未经过测试


注意事项/参考文献

Get all Airflow Leaf Nodes/Tasks Wiring top-level DAGs together What is the difference between airflow trigger rule “all_done” and “all_success”?

【讨论】:

成功了。我的孩子在父母身上取得了成功。我仍然有疑问。我孩子的 dag 是 dag = DAG('Child', default_args=default_args, catchup=False, schedule_interval='@daily') 。我的父 DAG 计划在上午 8:30 运行。父 DAG 在上午 8:30 运行后完成后运行子作业,并且它在上午 12:00 再次运行。我在 DAG 中遗漏了一些东西。 @aeapen 您可能希望将子 DAG 的 schedule_interval 设置为 None。这样它就不会由 Airflow 自动运行;并且只会在父 DAG 完成时触发【参考方案2】:

答案已在this thread 中。下面是演示代码:

父代:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator

default_args = 
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 29),


dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')

leave_work = DummyOperator(
    task_id='leave_work',
    dag=dag,
)
cook_dinner = DummyOperator(
    task_id='cook_dinner',
    dag=dag,
)

leave_work >> cook_dinner

孩子:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor

default_args = 
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 4, 29),


dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')

# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
    task_id='wait_for_dinner',
    external_dag_id='Parent_dag',
    external_task_id='cook_dinner',
    start_date=datetime(2020, 4, 29),
    execution_delta=timedelta(hours=1),
    timeout=3600,
)

have_dinner = DummyOperator(
    task_id='have_dinner',
    dag=dag,
)
play_with_food = DummyOperator(
    task_id='play_with_food',
    dag=dag,
)

wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food

图片:

Parent_dag

Child_dag

【讨论】:

我试过这个我得到一个超时错误。[MainThread] INFO airflow.task.operators - [2020-05-01 09:51:14,444] external_task_sensor.py:115 Poking for RS_Input_Cleansing.events_input_sql on 2020-04-29T23:00:00+00:00 ... [MainThread] ERROR airflow.task - [2020-05-01 09:51:14,508] taskinstance.py:1088 Snap. Time is OUT. Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 955, in _run_raw_task result = task_copy.execute(context=context) 如果Parent Dag 已经处于成功状态。如果手动触发Child Dag 会运行吗? 您可能需要打开另一个问题并发布完整日志。但是要回答你的问题,是的,如果RS_Input_Cleansing.events_input_sql 准时完成,child dag 会自动运行 父 dag 的最后一个任务需要 2 小时才能完成。那是超时的原因吗,因为我们已经给了 1 小时的超时时间 是的,增加timeout。现在设置为 3600 秒,即 1 小时。【参考方案3】:

我相信您正在寻找 SubDags operator,在更大的 dag 中运行 Dag。 请注意,像下面的示例中那样创建许多 subdag 会很快变得混乱,因此我建议将每个 subdag 拆分到一个文件中,然后将其导入到一个主文件中。

SubDagOperator 使用简单,你需要提供一个 Id、一个 subdag(子)和一个 dag(父)

subdag_2 = SubDagOperator(
        task_id="just_some_id", 
        subdag=child_subdag, <---- this must be a DAG
        dag=parent_dag, <----- this must be a DAG
        )

它看起来像这样:

来自their examples repo

from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
def subdag(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(
            dag_id='%s.%s' % (parent_dag_name, child_dag_name),
            default_args=args,
            schedule_interval="@daily",
            )

    for i in range(5):
        DummyOperator(
                task_id='%s-task-%s' % (child_dag_name, i + 1),
                default_args=args,
                dag=dag_subdag,
                )

    return dag_subdag

DAG_NAME = 'example_subdag_operator'

args = 
        'owner': 'airflow',
        'start_date': days_ago(2),
        

dag = DAG(
        dag_id=DAG_NAME,
        default_args=args,
        schedule_interval="@once",
        tags=['example']
        )

start = DummyOperator(
        task_id='start-of-main-job',
        dag=dag,
        )

some_other_task = DummyOperator(
        task_id='some-other-task',
        dag=dag,
        )


end = DummyOperator(
        task_id='end-of-main-job',
        dag=dag,
        )

subdag = SubDagOperator(
        task_id='run-this-dag-after-previous-steps',
        subdag=subdag(DAG_NAME, 'run-this-dag-after-previous-steps', args),
        dag=dag,
        )

start >> some_other_task >> end >> subdag

【讨论】:

谢谢。我正在寻找另一个案例。可能我可能已经混淆了。 Parent Dag 基本上是另一份工作,而 Child Dag 是另一份工作。我想在第一份工作成功完成后触发 Child'。如果你能帮助我,那就太好了 @aeapen job 是什么意思,它是一个任务,一个 DAG 吗?这段代码完全符合您的描述......您唯一能匹配您的情况的就是在此管道的末尾添加 Parent Dag。我将更新 dag 以演示它...但是心态不是独立拥有两个不同的 DAG,而是嵌套了 DAGS,Parent -> Child...这对我来说更有意义 另一种方法是使用 ExternalTask​​Sensor 运算符从另一个 DAG 触发一个 DAG,我认为这更令人困惑 @aeapen 我更新了我的解决方案,这更适合你想要的吗? 我的意思是 Job 是一个 DAG,它包含多个任务。

以上是关于如何使用 Python 在 Airflow 中成功触发另一个 DAG 时触发 DAG?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Airflow 中设置 DAG 之间的依赖关系?

如何在 Airflow 中创建条件任务

DolphinDB +Python Airflow 高效实现数据清洗

Airflow DAG - 如何首先检查BQ(必要时删除)然后运行数据流作业?

无法在 Airflow Python 3 中发布 Pubsub 消息

使用来自 Airflow 的 Python 在 Pub/Sub 中发布消息时遇到问题