如何强制气流任务失败?

Posted

技术标签:

【中文标题】如何强制气流任务失败?【英文标题】:How do I force a task on airflow to fail? 【发布时间】:2017-08-24 00:04:48 【问题描述】:

我有一个可处理 csv 文件条目的 python 可调用 process_csv_entries。我希望我的任务只有在所有条目都成功处理后才能成功完成。否则任务应该失败

def process_csv_entries(csv_file):
    # Boolean 
    file_completely_parsed = <call_to_module_to_parse_csv>
    return not file_completely_parsed

CSV_FILE=<Sets path to csv file>
t1 = PythonOperator(dag=dag,
                      task_id='parse_csv_completely',
                      python_operator=process_csv_entries,
                      op_args=[CSV_FILE])

无论返回值如何,t1 似乎都成功完成。 如何强制 PythonOperator 任务失败?

【问题讨论】:

【参考方案1】:

遇到错误条件时引发异常(在您的情况下:文件未成功解析时)

raise ValueError('File not parsed completely/correctly')

用合适的消息提出相关的错误类型

【讨论】:

这行得通..谢谢!我希望有更好的方法来处理这个问题。 这不会强制任务实例状态为“失败”...有没有办法绕过重试配置?【参考方案2】:

是的,提出AirflowException,这将导致任务立即进入失败状态。

from airflow import AirflowException

ValueError 可用于失败和重试。

【讨论】:

任何异常都会使任务失败并将其移至失败状态。 如果设置了重试,有没有办法防止气流重试任务?例如,有一些您不想/不需要重试的错误,例如与无效输入相关的错误。 @JoeJ,有一个公开的 PR:github.com/apache/airflow/pull/7133【参考方案3】:

AirflowFailException 现在可以让任务不重试就失败

【讨论】:

对于那些想知道“现在”是什么版本的人,它是introduced in 1.10.11【参考方案4】:

如果您想在不重试的情况下使任务失败,请使用 AirflowFailException :-

示例:-

from airflow.exceptions import AirflowFailException
def task_to_fail():
    raise AirflowFailException("Our api key is bad!")

如果您正在寻找重试,请使用 AirflowException :-

例子:-

from airflow import AirflowException
def task_to_fail():
    raise AirflowException("Error msj")

【讨论】:

以上是关于如何强制气流任务失败?的主要内容,如果未能解决你的问题,请参考以下文章

气流将长时间运行的任务标记为失败

如何使用气流检查长时间运行的 http 任务的状态?

气流操作员从外部Rest API提取数据

如何将数据帧传递到气流任务的临时表中

如何在另一个任务气流中使用查询结果(bigquery 运算符)

该怎么强制自己完成计划任务?