如何检查处理 Celery 任务的队列
Posted
技术标签:
【中文标题】如何检查处理 Celery 任务的队列【英文标题】:How to Inspect the Queue Processing a Celery Task 【发布时间】:2020-09-15 06:19:48 【问题描述】:我目前正在利用 celery 执行定期任务。我是芹菜新手。我有两个工人运行两个不同的队列。一种用于慢速后台作业,另一种用于用户在应用程序中排队的作业。
我在 datadog 上监控我的任务,因为这是确认我的工人正常运行的简单方法。
我要做的是在每个任务完成后,记录任务在哪个队列上完成。
@after_task_publish.connect()
def on_task_publish(sender=None, headers=None, body=None, **kwargs):
statsd.increment("celery.on_task_publish.start.increment")
task = celery.tasks.get(sender)
queue_name = task.queue
statsd.increment("celery.on_task_publish.increment", tags=[f"queue_name:task"])
以下功能是我在研究 celery 文档和一些 *** 帖子后实现的,但它没有按预期工作。我得到了第一个 statsd 增量,但剩余的代码没有执行。
我想知道是否有一种更简单的方法可以在每个任务完成后/内部检查哪个队列处理了该任务。
【问题讨论】:
【参考方案1】:既然您的问题是 是否有办法在每个任务完成后/内部进行检查 - 我假设您还没有尝试过这种 celery-result-backend 的东西。因此,您可以查看 Celery 本身提供的此功能:Celery-Result-Backend / Task-result-Backend
。
它对于存储 celery 任务的结果非常有用。
通读此 => https://docs.celeryproject.org/en/stable/userguide/configuration.html#task-result-backend-settings
一旦您了解如何设置此结果后端,请搜索 result_extended
键(在同一链接中)以便能够在您的任务返回值中添加 queue-names
。
可用的选项数量 - 就像您可以设置这些结果以转到其中任何一个:
Sql-DB / NoSql-DB / S3 / Azure / Elasticsearch / etc
我已将 Result-Backend
功能与 Elasticsearch
一起使用,这也是我的任务结果的存储方式:
只需根据您的要求在settings.py
文件中添加一些配置即可。非常适合我的应用程序。而且我有一个每周 cron 只清除 successful results
的任务 - 因为我们不再需要结果 - 我只能看到 failed results
(就像图片中的那个)。 p>
这些是满足我要求的主要键:task_track_started
和 task_acks_late
以及 result_backend
【讨论】:
好的,我离这里更近了。我有一个 redis 后端并添加了 result_extended。我在结果值中看到了队列。如何找到使用 task_id 调用任务的函数名称?我有点困惑。将函数与我后端的确切任务联系起来。 我想得到你。请添加一些屏幕截图或数据,以便我理解。有一个疑问,redis
适合 result-backend
吗?以上是关于如何检查处理 Celery 任务的队列的主要内容,如果未能解决你的问题,请参考以下文章