如何在 Celery 任务执行期间强制执行记录器格式?

Posted

技术标签:

【中文标题】如何在 Celery 任务执行期间强制执行记录器格式?【英文标题】:How to enforce logger format during Celery task execution? 【发布时间】:2019-06-15 21:32:26 【问题描述】:

我有一些使用 Python 日志记录模块记录调试日志的服务。

my_service.py:

import logging

logger = logging.getLogger(__name__)

class SomeService:
    def synchronize(self):
        logger.debug('synchronizing stuff')
        external_library.call('do it')
        logger.debug('found x results')

然后,我从 celery 任务中使用此服务

tasks.py:

@shared_task
def synchronize_stuff():
    stuff = some_service.synchronize()

Worker 然后输出这样的日志:

worker_1     | [2019-01-22 11:39:19,232: DEBUG/MainProcess] Task accepted: my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] pid:12
worker_1     | [2019-01-22 11:39:19,237: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): example.com:443
worker_1     | [2019-01-22 11:39:19,839: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/stuff HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:19,860: DEBUG/ForkPoolWorker-1] Processing 35
worker_1     | [2019-01-22 11:39:19,862: DEBUG/ForkPoolWorker-1] Item 35 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,863: DEBUG/ForkPoolWorker-1] Processing 36
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] Item 36 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] Processing 49
worker_1     | [2019-01-22 11:39:20,380: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/49 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,429: DEBUG/ForkPoolWorker-1] Processing 50
worker_1     | [2019-01-22 11:39:20,680: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/50 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,693: DEBUG/ForkPoolWorker-1] Processing 51
worker_1     | [2019-01-22 11:39:21,138: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/51 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:21,197: INFO/ForkPoolWorker-1] Task my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] succeeded in 1.9656380449960125s: None

这对于调试来说已经足够好了,但我想在这些日志中包含任务名称和 uuid。这可以通过像这样使用 celery 任务记录器来实现:

my_service.py:

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)

class SomeService:
    def synchronize(self):
        logger.debug('synchronizing stuff')
        external_library.call('do it')
        logger.debug('found x results')

这正是我想要的日志记录:

worker_1     | [2019-01-22 11:39:19,232: DEBUG/MainProcess] Task accepted: my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] pid:12
worker_1     | [2019-01-22 11:39:19,237: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): example.com:443
worker_1     | [2019-01-22 11:39:19,839: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/stuff HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:19,860: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 35
worker_1     | [2019-01-22 11:39:19,862: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Item 35 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,863: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 36
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Item 36 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 49
worker_1     | [2019-01-22 11:39:20,380: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/49 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,429: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 50
worker_1     | [2019-01-22 11:39:20,680: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/50 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,693: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 51
worker_1     | [2019-01-22 11:39:21,138: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/51 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:21,197: INFO/ForkPoolWorker-1] Task my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] succeeded in 1.9656380449960125s: None

但我有两个问题:

    我不想在服务中使用 celery logger。即使在完全没有安装 Celery 的环境中也可以使用该服务(那么日志中不包含任务名称和 uuid 就可以了)

    在同一任务期间执行的来自外部库的日志不使用相同的记录器,因此不在日志中包含任务名称和 uuid。

这让我想到了这个问题:无论我如何登录我的服务或外部库如何记录,是否可以在任务级别(在 tasks.py 中)指定(强制)记录器?像这样就可以了:

tasks.py:

@shared_task
def synchronize_stuff():
    logging.enforce_logger(get_task_logger(__name__))
    stuff = some_service.synchronize()
    logging.restore_logger()

另外值得注意的是,我在项目中使用了 Django。

谢谢!

【问题讨论】:

如果可以的话,我建议使用 django-background 任务,这比 celery 容易得多,在这里看看我的答案:***.com/questions/54225303/… @Ahtisham 我们实际上使用了其他 Celery 功能,如调度程序、周期性任务和 Flower 进行监控。这只是一个小问题,解决后会使调试更容易,但不会破坏交易。 【参考方案1】:

这不是您正在寻找的。但是我有一个类似的问题,并使用我在处理程序上应用的日志过滤器解决了这个问题,该处理程序记录到我不想要芹菜日志消息的服务。我在这个问题中描述了我的问题和我的解决方案: How can I log from my python application to splunk, if I use celery as my task scheduler?

告诉我这是否指向正确的方向......

另外,我使用 python logging.dictConfig 获得了非常好的结果!

【讨论】:

嗯,这种帮助。我想出了gist.github.com/wodCZ/c6ea066b3b9b50010ae5e569e48d3c9b,它似乎按我想要的方式工作。但它可能会破坏 Django 默认日志记录。我会搞砸这个并最终发布我想出的东西。不过谢谢:)

以上是关于如何在 Celery 任务执行期间强制执行记录器格式?的主要内容,如果未能解决你的问题,请参考以下文章

如何确保 Celery 任务是防止重叠的 Celery 任务执行

Celery分布式任务队列快速入门

Celery介绍和基本使用

Celery结合Django使用

Celery 初步使用心得

Celery在Django中的使用介绍