在数据库中为每个 celery 任务存储一个任务 ID

Posted

技术标签:

【中文标题】在数据库中为每个 celery 任务存储一个任务 ID【英文标题】:Storing a task id for each celery task in database 【发布时间】:2013-02-18 10:39:55 【问题描述】:

我在 mysql 中使用过 celery。我想将任务 ID 作为纯整数存储在数据库中或 celery task 的变量中。我该怎么做?

【问题讨论】:

【参考方案1】:

为什么不创建一个 celery 任务模型,并将一个 celery 任务 ID 保存到该模型中?

class CeleryModel(models.Model):
    celery_task_id = models.CharField(max_length = 50, unique=True)

然后:

def some_celery_task():
    result = celery_task.delay()
    celery_task = CeleryModel(celery_task_id = result.id)
    celery_task.save() # added save line

您的整数值将是:celery_task.id 对应于实际的唯一 celery_task_id。

更新:另一种方式...

首先python manage.py inspectdb > inspectdb.py。在该文件中,您会发现:

class CeleryTaskmeta(models.Model):
    id = models.IntegerField(primary_key=True)
    task_id = models.CharField(max_length=765, unique=True)
    status = models.CharField(max_length=150)
    result = models.TextField(blank=True)
    date_done = models.DateTimeField()
    traceback = models.TextField(blank=True)
    hidden = models.IntegerField()
    meta = models.TextField(blank=True)
    class Meta:
        db_table = u'celery_taskmeta'

接下来,python manage.py startapp celery_model。将此文件放入 models.py 文件中。我使用南,所以我的最后一步是python manage.py convert_app celery_model。然而,这是不必要的。现在你可以访问这个 celery 数据表了 django 级别,并且可以读取每个任务的主键作为你的整数值。例如

>>> ct = CeleryTaskmeta.objects.get(id=1)
>>> for k,v in ct.__dict__.items(): print k,v
... 
status SUCCESS
task_id 2fa95f24-7640-434c-9fef-0732ac1d23c7
date_done 2013-02-17 19:22:56+00:00
traceback None
_state <django.db.models.base.ModelState object at 0x10263fa90>
meta eJxrYKotZAzlSM7IzEkpSs0rZIotZC7WAwBREgb9
result gAJLBC4=
hidden 0
id 1

聪明的人会知道如何使您的CeleryTaskmeta 成为只读模型,因为我认为您不会想要篡改数据表。

更新:到你问题的最后一部分:

>>> from celerytest.tasks import add
>>> result = add.delay()
>>> result.int_id = 1
>>> for k,v in result.__dict__.items(): print k,v
...
parent None
app <Celery default:0x10264df10>
task_name celerytest.tasks.add
int_id 1
id 01503afd-d196-47af-8e10-e7dc06603cfc
backend <djcelery.backends.database.DatabaseBackend object at 0x1026842d0>

【讨论】:

感谢您的彻底和干净的解释! 兄弟问题是我想在进程开始时检索task_id。事实证明,只有在作业成功完成后,数据才会写入 CeleryTaskMeta 表中。 @Cole 你如何将异步抓取的结果存储在数据库中?【参考方案2】:

要在任务启动时获取 celery 任务 ID,我执行以下操作:

process_task = my_task.apply_async(args=[args])
task_id = process_task.task_id

my_task 是 tasks.py 中的一个方法

【讨论】:

以上是关于在数据库中为每个 celery 任务存储一个任务 ID的主要内容,如果未能解决你的问题,请参考以下文章

Celery-定时任务

Celery 为每个任务创建一个新连接

Celery---一个懂得异步任务,延时任务,周期任务的芹菜

在每个时区每天凌晨 3:00 运行 celery 任务?姜戈

celery 任务的实时进度跟踪

Celery - 一个懂得 异步任务 , 定时任务 , 周期任务 的芹菜