如何使用 Python 在 Airflow 中成功触发另一个 DAG 时触发 DAG?
Posted
技术标签:
【中文标题】如何使用 Python 在 Airflow 中成功触发另一个 DAG 时触发 DAG?【英文标题】:How to Trigger a DAG on the success of a another DAG in Airflow using Python? 【发布时间】:2020-08-14 07:32:12 【问题描述】:我有一个 python DAG Parent Job
和 DAG Child Job
。 Child Job
中的任务应在成功完成每天运行的Parent Job
任务时触发。如何添加外部作业触发器?
我的代码
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args =
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')
execute_notebook = PostgresOperator(
task_id='data_sql',
postgres_conn_id='REDSHIFT_CONN',
sql="SELECT * FROM athena_rs.shipments limit 5",
dag=dag
)
【问题讨论】:
这能回答你的问题吗? How to set dependencies between DAGs in Airflow? @LuckyGuess 该示例显示一个任务另一个 dag 触发另一个任务中的另一个任务。在这里我认为他在看什么,完成一个 DAG 完全触发下一个 DAG。如果你能举个例子就好了。 我强烈建议使用TriggerDagRunOperator
来执行响应式触发,而不是ExternalTaskSensor
来执行基于轮询的触发
@y2k-shubham,如果你能像下面写的那样写一个例子,它也会为其他人学习。我也面临同样的问题。
@pankaj 我添加了一个描述TriggerDagRunOperator
使用的答案
【参考方案1】:
作为@pankaj 的requested,我在此添加一个描述反应性触发的sn-p,使用TriggerDagRunOperator
(而不是基于投票的触发ExternalTaskSensor
)
from typing import List
from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
# DAG object
my_dag: DAG = DAG(dag_id='my_dag',
start_date=..)
..
# a list of 'tail' tasks: tasks that have no downstream tasks
tail_tasks_of_first_dag: List[BaseOperator] = my_magic_function_that_determines_all_tail_tasks(..)
..
# our trigger task
my_trigger_task: TriggerDagRunOperator = TriggerDagRunOperator(dag=my_dag,
task_id='my_trigger_task',
trigger_rule=TriggerRule.ALL_SUCCESS,
external_dag_id='id_of_dag_to_be_triggered')
# our trigger task should run when all 'tail' tasks have completed / succeeded
tail_tasks_of_first_dag >> my_trigger_task
请注意,sn-p 仅供参考;它尚未经过测试
注意事项/参考文献
Get all Airflow Leaf Nodes/Tasks Wiring top-level DAGs together What is the difference between airflow trigger rule “all_done” and “all_success”?【讨论】:
成功了。我的孩子在父母身上取得了成功。我仍然有疑问。我孩子的 dag 是dag = DAG('Child', default_args=default_args, catchup=False, schedule_interval='@daily')
。我的父 DAG 计划在上午 8:30 运行。父 DAG 在上午 8:30 运行后完成后运行子作业,并且它在上午 12:00 再次运行。我在 DAG 中遗漏了一些东西。
@aeapen 您可能希望将子 DAG 的 schedule_interval
设置为 None
。这样它就不会由 Airflow 自动运行;并且只会在父 DAG 完成时触发【参考方案2】:
答案已在this thread 中。下面是演示代码:
父代:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args =
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')
leave_work = DummyOperator(
task_id='leave_work',
dag=dag,
)
cook_dinner = DummyOperator(
task_id='cook_dinner',
dag=dag,
)
leave_work >> cook_dinner
孩子:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
default_args =
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')
# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
task_id='wait_for_dinner',
external_dag_id='Parent_dag',
external_task_id='cook_dinner',
start_date=datetime(2020, 4, 29),
execution_delta=timedelta(hours=1),
timeout=3600,
)
have_dinner = DummyOperator(
task_id='have_dinner',
dag=dag,
)
play_with_food = DummyOperator(
task_id='play_with_food',
dag=dag,
)
wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food
图片:
狗
Parent_dag
Child_dag
【讨论】:
我试过这个我得到一个超时错误。[MainThread] INFO airflow.task.operators - [2020-05-01 09:51:14,444] external_task_sensor.py:115 Poking for RS_Input_Cleansing.events_input_sql on 2020-04-29T23:00:00+00:00 ... [MainThread] ERROR airflow.task - [2020-05-01 09:51:14,508] taskinstance.py:1088 Snap. Time is OUT. Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 955, in _run_raw_task result = task_copy.execute(context=context)
如果Parent Dag
已经处于成功状态。如果手动触发Child Dag
会运行吗?
您可能需要打开另一个问题并发布完整日志。但是要回答你的问题,是的,如果RS_Input_Cleansing.events_input_sql
准时完成,child dag 会自动运行
父 dag 的最后一个任务需要 2 小时才能完成。那是超时的原因吗,因为我们已经给了 1 小时的超时时间
是的,增加timeout
。现在设置为 3600 秒,即 1 小时。【参考方案3】:
我相信您正在寻找 SubDags operator,在更大的 dag 中运行 Dag。 请注意,像下面的示例中那样创建许多 subdag 会很快变得混乱,因此我建议将每个 subdag 拆分到一个文件中,然后将其导入到一个主文件中。
SubDagOperator 使用简单,你需要提供一个 Id、一个 subdag(子)和一个 dag(父)
subdag_2 = SubDagOperator(
task_id="just_some_id",
subdag=child_subdag, <---- this must be a DAG
dag=parent_dag, <----- this must be a DAG
)
它看起来像这样:
来自their examples repo
from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@daily",
)
for i in range(5):
DummyOperator(
task_id='%s-task-%s' % (child_dag_name, i + 1),
default_args=args,
dag=dag_subdag,
)
return dag_subdag
DAG_NAME = 'example_subdag_operator'
args =
'owner': 'airflow',
'start_date': days_ago(2),
dag = DAG(
dag_id=DAG_NAME,
default_args=args,
schedule_interval="@once",
tags=['example']
)
start = DummyOperator(
task_id='start-of-main-job',
dag=dag,
)
some_other_task = DummyOperator(
task_id='some-other-task',
dag=dag,
)
end = DummyOperator(
task_id='end-of-main-job',
dag=dag,
)
subdag = SubDagOperator(
task_id='run-this-dag-after-previous-steps',
subdag=subdag(DAG_NAME, 'run-this-dag-after-previous-steps', args),
dag=dag,
)
start >> some_other_task >> end >> subdag
【讨论】:
谢谢。我正在寻找另一个案例。可能我可能已经混淆了。 Parent Dag 基本上是另一份工作,而 Child Dag 是另一份工作。我想在第一份工作成功完成后触发 Child'。如果你能帮助我,那就太好了 @aeapenjob
是什么意思,它是一个任务,一个 DAG 吗?这段代码完全符合您的描述......您唯一能匹配您的情况的就是在此管道的末尾添加 Parent Dag。我将更新 dag 以演示它...但是心态不是独立拥有两个不同的 DAG,而是嵌套了 DAGS,Parent -> Child...这对我来说更有意义
另一种方法是使用 ExternalTaskSensor 运算符从另一个 DAG 触发一个 DAG,我认为这更令人困惑
@aeapen 我更新了我的解决方案,这更适合你想要的吗?
我的意思是 Job 是一个 DAG,它包含多个任务。以上是关于如何使用 Python 在 Airflow 中成功触发另一个 DAG 时触发 DAG?的主要内容,如果未能解决你的问题,请参考以下文章
DolphinDB +Python Airflow 高效实现数据清洗
Airflow DAG - 如何首先检查BQ(必要时删除)然后运行数据流作业?