如何使用 TriggerDagRunOperator 触发 Airflow -dag

Posted

技术标签:

【中文标题】如何使用 TriggerDagRunOperator 触发 Airflow -dag【英文标题】:How do I trigger Airflow -dag using TriggerDagRunOperator 【发布时间】:2018-01-16 00:44:07 【问题描述】:

我找到了以下链接:

https://www.linkedin.com/pulse/airflow-lesson-1-triggerdagrunoperator-siddharth-anand

这确实解释了如何使用TriggerDagRunOperator 来执行单独的 Airflow dag。该文档使用 Airflow 自己的示例 dag,但我很难理解这些,因为它们没有使用任何传感器。

谁能解释我如何使用TriggerDagRunOperatorSqlSensor 开始单独的dag?当我的 SQL Server 作业任务完成时,我正在尝试启动单独的 DAG。我知道如何使用SqlSensor 检查 SQL Server 作业的状态,但我不知道如何将结果附加到TriggerDagRunOperator 以启动单独的 DAG。

我不想使用 Airflow CLI 或在一个 DAG 中执行这两项任务。基本上,我希望这只是触发 dag。

以下是我当前的代码,其中缺少关键的conditionally_trigger

# File Name: check-when-db1-sql-task-is-done

from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from airflow.operators import SqlSensor
from datetime import datetime


default_args = 
        'owner': 'airflow',
        'retry_delay': timedelta(minutes=5),


dag = DAG('check-when-db1-sql-task-is-done',
        description='Check-when-DB1-SQL-task-is-done',
        default_args=default_args,
        schedule_interval='@once',
        start_date=datetime.now(),
        )

# returns-0-or-1-based-on-job-task-status
sqlsensor = SqlSensor (
        task_id='sql-sensor',
        poke_interval=30,
        timeout=3200,
        sql="""select last_run_outcome from msdb.dbo.sysjobsteps where job_id = '249A5A5D-6AFC-4D6B-8CB1-27C16724A450' and step_id = '1' and last_run_date = (select convert(varchar(24),getdate(),112)); """,    
        mssql_conn_id='db1',
        dag=dag,
        )

# dag-to-start
trigger = TriggerDagRunOperator (
        task_id='start-ssh-job',
        trigger_dag_id="qa-knime-ssh-task",
        python_callable=conditionally_trigger,
        params='condition_param': True,
                'message': 'Hello World',
        dag=dag)

【问题讨论】:

【参考方案1】:

我的理解是TriggerDagRunOperator是当你想用python函数来判断是否触发SubDag的时候。该函数在您的代码和示例中称为conditionally_trigger

在您的情况下,您正在使用传感器来控制流量,并且不需要传递函数。您可以使用SubDagOperator 而不是TriggerDagRunOperator 或传递一个简单的始终为真函数作为python_callable

...
python_callable=lambda(context, dag_run_obj):dag_run_obj,
...

【讨论】:

这正是我在被example 和TriggerDagRunOperator docs 抓住后一直在寻找的确认。您是否也支持(或反对)如果不是将参数(run_id & payload)传递给新触发 DAG,在@987654335 中返回True @ 就足够了(显然它不是这样设计的,但为了便于理解..)? 一点语法查询:在lambda参数中真的需要括号吗(我是python的新手)? 括号是可选的。对于您的另一个问题,我认为的答案是:是的,重要的是 python_callable 返回 true 以表示 subdag 应该运行;如果 python_callable 返回 false,则 subdag 不会运行。供您使用,看起来SubDagOperator 会更合适。

以上是关于如何使用 TriggerDagRunOperator 触发 Airflow -dag的主要内容,如果未能解决你的问题,请参考以下文章

如何使用本机反应创建登录以及如何验证会话

如何在自动布局中使用约束标识符以及如何使用标识符更改约束? [迅速]

如何使用 AngularJS 的 ng-model 创建一个数组以及如何使用 jquery 提交?

如何使用laravel保存所有行数据每个行名或相等

如何使用 Math.Net 连接矩阵。如何使用 Math.Net 调用特定的行或列?

WSARecv 如何使用 lpOverlapped?如何手动发出事件信号?