获取有关 Airflow on_failure_callback 上下文的异常详细信息

Posted

技术标签:

【中文标题】获取有关 Airflow on_failure_callback 上下文的异常详细信息【英文标题】:Get Exception details on Airflow on_failure_callback context 【发布时间】:2019-01-20 03:59:16 【问题描述】:

有什么方法可以获取气流on_failure_callback上的异常详细信息?

我注意到它不是context 的一部分。我想创建一个通用的异常处理机制,将有关错误的信息发布到 Slack,包括有关异常的详细信息。我现在已经成功触发/执行回调并发布到 Slack,但无法发布异常详细信息。

谢谢。

【问题讨论】:

【参考方案1】:

on_failure_callback 可以提供给 DAG 和/或单个任务。

在第一种情况下(提供给 DAG),context 中没有 'exception'(参数 Airflow 调用您的 on_failure_callback)。

在第二种情况下(提供给任务),有。

包含的对象应该是 python Exception。 从中获取堆栈跟踪之类的东西令人惊讶地不直观,但是从this answer 我使用以下内容来获取相当可读的堆栈跟踪:

import traceback

...

exception = context.get('exception')
formatted_exception = ''.join(
   traceback.format_exception(etype=type(exception), 
     value=exception, tb=exception.__traceback__
   )
).strip()

【讨论】:

【参考方案2】:

错误被添加到上下文here。因此,您实际上可以通过以下方式获得:

context.get("exception")

不幸的是,您似乎无法从上下文中获取堆栈跟踪或类似的东西。

【讨论】:

我刚收到None【参考方案3】:

我认为可能无法在回调中获取异常详细信息。 Look at the source code

# Handling callbacks pessimistically
try:
    if self.state == State.UP_FOR_RETRY and task.on_retry_callback:
        task.on_retry_callback(context)
    if self.state == State.FAILED and task.on_failure_callback:
        task.on_failure_callback(context)
except Exception as e3:
    logging.error("Failed at executing callback")
    logging.exception(e3)

【讨论】:

以上是关于获取有关 Airflow on_failure_callback 上下文的异常详细信息的主要内容,如果未能解决你的问题,请参考以下文章

Airflow 中文文档:用Dask扩展

Airflow 中文文档:使用操作器

Airflow:使用 LivyBatchOperator 在 yarn 中提交 pyspark 应用程序

airflow系列教程 airflow的报警功能设置

Google Cloud DataFlow 作业尚不可用.. 在 Airflow

Apache Manged Airflow EMR 操作员 DAG 失败