如何在 Airflow 中创建条件任务

Posted

技术标签:

【中文标题】如何在 Airflow 中创建条件任务【英文标题】:How to create a conditional task in Airflow 【发布时间】:2017-09-26 11:11:29 【问题描述】:

我想在 Airflow 中创建一个条件任务,如下面的架构中所述。预期的情况如下:

任务 1 执行 如果Task 1成功,则执行Task 2a Else 如果任务 1 失败,则执行任务 2b 最终执行任务 3

以上所有任务都是 SSHExecuteOperator。 我猜我应该使用 ShortCircuitOperator 和/或 XCom 来管理条件,但我不清楚如何实现它。你能描述一下解决方案吗?

【问题讨论】:

【参考方案1】:

Airflow 有一个BranchPythonOperator,可以用来更直接地表达分支依赖。

docs 描述了它的用途:

BranchPythonOperator 与 PythonOperator 非常相似,只是它需要一个返回 task_id 的 python_callable。跟随返回的task_id,跳过所有其他路径。 Python 函数返回的 task_id 必须直接引用 BranchPythonOperator 任务下游的任务。

...

如果您想跳过某些任务,请记住您不能有空路径,如果是这样,请创建一个虚拟任务。

代码示例

def dummy_test():
    return 'branch_a'

A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)

branch_task = BranchPythonOperator(
    task_id='branching',
    python_callable=dummy_test,
    dag=dag,
)

branch_task >> A_task 
branch_task >> B_task

编辑

如果您安装的是 Airflow 版本 >=1.10.3,您还可以return a list of task ids,允许您在单个 Operator 和don't use a dummy task before joining 中跳过多个下游路径。

【讨论】:

您是否有更多关于“返回任务ID列表,允许您在单个操作员中跳过多个下游路径:”的详细信息: @mr4kino 哎呀看起来它被推迟到 1.10.3,我太早发表评论了 ;-) 将更新答案,谢谢。 @alltej 不确定您的意思,但 A_task 和 B_task 可以是您想要的任何运算符(在多分支示例中也是 branch_x)。 DummyOperator 只是一个愚蠢的例子。之所以称为BranchPythonOperator,是因为它使用 Python 函数来决定要遵循的分支,仅此而已。 如果分支使用KubernetesPodOperator 提取一些文件,让我们说没有要提取的文件,我需要将该任务和下游任务标记为“已跳过”。 如果跳过条件来自 Operator 内部,我建议使用 XCOM 并根据该 XCOM 值确定 BranchPythonOperator。特别是对于KubernetesPodOperator,您可能希望使用xcom_push=True 发送该状态。【参考方案2】:

你必须使用airflow trigger rules

所有操作符都有一个 trigger_rule 参数,它定义了触发生成任务的规则。

触发规则的可能性:

ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'

这是解决您问题的想法:

from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook

sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)

task_1 = SSHExecuteOperator(
        task_id='task_1',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2 = SSHExecuteOperator(
        task_id='conditional_task',
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)

task_2a = SSHExecuteOperator(
        task_id='task_2a',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)

task_2b = SSHExecuteOperator(
        task_id='task_2b',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ALL_FAILED,
        ssh_hook=sshHook,
        dag=dag)

task_3 = SSHExecuteOperator(
        task_id='task_3',
        bash_command=<YOUR COMMAND>,
        trigger_rule=TriggerRule.ONE_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)


task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)

【讨论】:

谢谢@Jean S,您的解决方案就像一个魅力。我还有一个问题。在执行Task2a并跳过Task2b的场景中,我注意到Task3与Task2a同时执行,而我想在之后执行它。除了在 2 个分支(如 Task3a 和 Task3b)中复制 Task3 之外,您还有什么技巧吗?再次感谢。 嗨!您是否尝试在任务 3 中通过 trigger_rule=TriggerRule.ALL_DONE 更改:trigger_rule=TriggerRule.ONE_SUCCESS?你确定你的任务是同时执行的吗? (尝试在 T2A 中添加睡眠功能以进行完整性检查) 来自 Airflow 的文档link 我确认“one_success:一旦至少一个父母成功就会触发,它不会等待所有父母都完成"...我会尝试 ALL_DONE!谢谢 失败似乎有点太宽泛了。任务可能由于各种原因(例如网络或 DNS 问题)而失败,然后触发错误的下游任务。有没有办法用两个不同的下游选项来定义两种或更多不同类型的成功?例如文件存在做a,文件不存在做b?文件传感器似乎不是正确的答案,因为在所有重试之后,失败可能是由于其他原因。 对于寻找新的触发规则文档(Airflow 2.1+)的其他人,您可以在这里找到它:Trigger Rules

以上是关于如何在 Airflow 中创建条件任务的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Symfony2 中创建 cron 任务

Pyspark:如何在不同条件的数据框中创建列

如何在 IE HTML 条件中创建“else”?

如何在 R Markdown 中创建条件 selectInput 小部件?

如何在 Oracle 12c 中创建带条件的索引?

如何在grunt中创建许多单独的uglify任务?