如何在 Airflow 上重新启动失败的任务

Posted

技术标签:

【中文标题】如何在 Airflow 上重新启动失败的任务【英文标题】:How to restart a failed task on Airflow 【发布时间】:2017-09-02 09:46:30 【问题描述】:

我正在使用 LocalExecutor,而我的 dag 有 3 个任务,其中 task(C) 依赖于 task(A)。 Task(B) 和 task(A) 可以像下面这样并行运行

A-->C

B

所以 task(A) 失败了,但 task(B) 运行良好。任务(C) 尚未运行,因为任务(A) 已失败。

我的问题是我如何单独重新运行 Task(A),以便 Task(C) 在 Task(A) 完成并且 Airflow UI 将它们标记为成功后运行

【问题讨论】:

【参考方案1】:

在用户界面中:

    转到要更改的运行的 dag 和 dag run 点击GraphView 点击任务A 点击“清除”

这会让任务 A 再次运行,如果成功,任务 C 应该运行。 这很有效,因为当您清除任务的状态时,调度程序会将其视为之前没有运行过此 dag 运行。

【讨论】:

也可以使用命令行:airflow clear -s <start_date> -e <end_date> -t task_a <dag_name> 非常感谢! UI 和命令行都对我有用! 代码中可以有这个吗?如果任务失败,哪个会在固定时间后检查并尝试清除它? @TomasJansson 它将使用与原始执行时间相同的执行时间。但是您的 start_date 将具有新值(它将具有当前时间戳)。您可以在“任务实例详细信息”屏幕中看到所有内容。 @TomasJansson 执行日期(GUI 中的“运行”)将保持不变,但“开始”和“结束”时间戳将参考实际时间。因此,在使用原始运行日期重新运行整个 DAG(失败)时,此方法很有用(如果您在 SQL 语句中使用 ds 变量,则为 fe;同时仅手动触发任务通过 UI 将实际时间戳分配给 ds 变量并更改任务/DAG 运行的参数)。【参考方案2】:

这是另一种解决方案,您可以在其中明确并自动重试某些任务。如果您只想清除某个任务,则不会使用 -d(下游)标志:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


def clear_upstream_task(context):
    execution_date = context.get("execution_date")
    clear_tasks = BashOperator(
        task_id='clear_tasks',
        bash_command=f'airflow tasks clear -s execution_date  -t t1 -d -y clear_upstream_task'
    )
    return clear_tasks.execute(context=context)


# Default settings applied to all tasks
default_args = 
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)



with DAG('clear_upstream_task',
         start_date=datetime(2021, 1, 1),
         max_active_runs=3,
         schedule_interval=timedelta(minutes=5),
         default_args=default_args,
         catchup=False
         ) as dag:
    t0 = DummyOperator(
        task_id='t0'
    )

    t1 = DummyOperator(
        task_id='t1'
    )

    t2 = DummyOperator(
        task_id='t2'
    )
    t3 = BashOperator(
        task_id='t3',
        bash_command='exit 123',
        on_failure_callback=clear_upstream_task
    )

    t0 >> t1 >> t2 >> t3

【讨论】:

以上是关于如何在 Airflow 上重新启动失败的任务的主要内容,如果未能解决你的问题,请参考以下文章

让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?

Airflow 中文文档:使用systemd运行Airflow

启动 Airflow 网络服务器失败并出现 sqlalchemy.exc.NoInspectionAvailable:没有可用的检查系统

Flink 任务失败重启与恢复策略

Flink 任务失败重启与恢复策略

如何从 UI 停止/终止 Airflow 任务