如何记录 Django 芹菜任务中发生的异常

Posted

技术标签:

【中文标题】如何记录 Django 芹菜任务中发生的异常【英文标题】:How to log exceptions occurring in a django celery task 【发布时间】:2013-05-15 12:48:20 【问题描述】:

我已经设置了 celery 来使用我的 django 应用程序使用他们的守护进程指令 (http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing)

这是我的测试任务

@periodic_task(run_every=timedelta(seconds=10))
def debugger():
    logger.info("Running debugger")
    raise Exception('Failed')

我需要一种方法来知道此任务(调试器)由于异常而失败。 Celery 的日志文件打印 logger.info("running debugger") 日志,但它不记录异常。我是否遗漏了什么,或者我应该以其他方式找到失败的任务?

【问题讨论】:

想从 Celery 得到什么?我不能像桌面应用程序那样崩溃。您可以使用两种简单的方法。 1.使用结果后端并将任务标记为下降。 2. 将所有代码包装在 celery 中以尝试除外。 @Rustem 我希望 Celery 能够捕获异常并将它们写入日志文件,而不是显然吞下它们...... 我遇到了同样的问题。 【参考方案1】:

问题:

我希望 Celery 捕获异常并将它们写入日志文件,而不是显然吞下它们...

出于专业解决方案的目的,这里当前的最佳答案是马马虎虎。许多 python 开发人员会考虑在个案基础上捕获全面错误是一个危险信号。对此的合理厌恶在评论中得到了很好的表达:

等一下,我希望至少对于每个失败的任务都会在工作日志中记录一些内容...

Celery 确实捕获了异常,它只是没有做 OP 希望它用它做的事情(它将它存储在结果后端中)。以下要点是互联网在这个问题上所能提供的最好的。这有点过时了,但请注意分叉和星号的数量。

https://gist.github.com/darklow/c70a8d1147f05be877c3

要点是处理失败案例并对其进行自定义处理。这是 OP 问题的超集。以下是如何调整 gist 中的解决方案以记录异常。

import logging

logger = logging.getLogger('your.desired.logger')


class LogErrorsTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.exception('Celery task failure!!!1', exc_info=exc)
        super(LogErrorsTask, self).on_failure(exc, task_id, args, kwargs, einfo)

你仍然需要确保你的所有任务都继承自这个任务类,如果你使用 @task 装饰器(带有 base=LogErrorsTask kwarg),要点说明如何做到这一点。

此解决方案的好处是不会将您的代码嵌套在任何额外的 try-except 上下文中。这是在 celery 已经在使用的故障代码路径上的捎带。

【讨论】:

【参考方案2】:

你可以看Celery User Guide:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def div():
    try:
        1 / 0
    except ZeroDivisionError:
        logger.exception("Task error")

来自python logging module 的文档:

Logger.exception(msg, *args)

在此记录器上记录一条级别为 ERROR 的消息。参数被解释为 debug()。异常信息被添加到日志消息中。此方法只能从异常处理程序中调用。

【讨论】:

等一下,我希望工作日志中至少会记录一些失败的任务......【参考方案3】:

为了从 Celery 任务中接收所有未处理的异常,我注册了一个信号处理程序。我正在格式化 logging.error 消息,然后可以通过默认的 Python 日志记录配置来处理。

这里是相关部分

from celery import signals

@signals.task_retry.connect
@signals.task_failure.connect
@signals.task_revoked.connect
def on_task_failure(**kwargs):
    """Abort transaction on task errors.
    """
    # celery exceptions will not be published to `sys.excepthook`. therefore we have to create another handler here.
    from traceback import format_tb

    log.error('[task:%s:%s]' % (kwargs.get('task_id'), kwargs['sender'].request.correlation_id, )
              + '\n'
              + ''.join(format_tb(kwargs.get('traceback', [])))
              + '\n'
              + str(kwargs.get('exception', '')))

请注意,此信号处理程序自动适用于所有任务;即它不需要更改您的 task 装饰器。

【讨论】:

这很好用,但它似乎在日志中记录了重复的回溯,从两个到四个不等。其他人有这个问题吗?【参考方案4】:

使用 traceback 模块将跟踪捕获为字符串并将其发送到记录器。

try:
    ...
except:
    import traceback
    logger.info(traceback.format_exc())

【讨论】:

fwif,python 记录器可以包含任何级别的回溯。您所要做的就是将 exc_info=1 添加到通话中。例如logger.info('某事失败 b/c 其他事', exc_info=1)【参考方案5】:

您也可以覆盖 celery 应用,以避免将 base kwarg 添加到每个 @app.task 装饰器:

import logging
from celery import Celery, Task

logger = logging.getLogger(__name__)

class LoggingTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.exception('Task failed: %s' % exc, exc_info=exc)
        super(LoggingTask, self).on_failure(exc, task_id, args, kwargs, einfo)

class LoggingCelery(Celery):
    def task(self, *args, **kwargs):
        kwargs.setdefault('base', LoggingTask)
        return super(LoggingCelery, self).task(*args, **kwargs)

app = LoggingCelery(__name__)

【讨论】:

以上是关于如何记录 Django 芹菜任务中发生的异常的主要内容,如果未能解决你的问题,请参考以下文章

如何禁用芹菜任务结果记录?

芹菜从 Django 模块内部记录到文件

Django 学习之Celery(芹菜)

如何在 Django 中等待芹菜任务的结果

如何在任务中获取芹菜结果模型(使用 django-celery-results)

什么可能会延迟我的芹菜任务?