如何在 Django 模型中使用 celery beat 为每个对象创建单独的任务

Posted

技术标签:

【中文标题】如何在 Django 模型中使用 celery beat 为每个对象创建单独的任务【英文标题】:How do i create seperate tasks for every objects with celery beat in Django models 【发布时间】:2019-05-15 06:04:32 【问题描述】:

假设我们有以下模型字段:

class Project(models.Model):
    project_name = models.CharField(max_length=200,unique=True)
    project_scan = models.IntegerField()  ### Scan interval
    project_status = models.BooleanField() ### To Enable "Scan" or Disable "Scan" Tasks

假设我们有 2 个项目对象:

1. Project(project_name='test1',project_scan=5) ### Scan `test1` every `5` hour
2. Project(project_name='test2',project_scan=10) ### Scan `test2` every `10` hour

Tasks.py

​​>
@task(name='project_tasks')
def Project_Tasks():
    get_all_projects = Project.objects.all()
    for each_project in get_all_project:
        if each_project.project_status == True: ### Checking if it "Scan" is allowed.
            get_interval = each_project.project_scan
            get_name = each_project.project_name
            print(get_name)

我的问题: 如何根据给定的project_scan Interval 在每个对象上运行任务? , 由于 Celery beat 将任务名称作为参数来执行扫描,例如: PeriodicTask.objects.create(interval=given_interval, name='I dont know', task='project_tasks', )

如何为每个项目任务创建单独的实例?

我尝试在 models.py 中创建 intervalSchedule 字段,但没有成功:

class Project(models.Model):
    project_name = models.CharField(max_length=200,unique=True)
    project_scan = models.IntegerField()  ### Scan interval
    project_status = models.BooleanField() ### To Enable "Scan" or Disable "Scan" Tasks
    schedule = IntervalSchedule()

【问题讨论】:

您希望在创建新的Project 时创建PeriodTask 吗?哦,你在 for 循环中运行什么? 是的,创建新项目时要创建周期性任务,并根据给定的间隔时间运行一个周期,即project_scan (hour) 在下面查看我的答案。如果您有任何问题,请在下方留言。 【参考方案1】:

您可以添加使用信号:

from django.db.models.signals import post_save
from django.dispatch import receiver

from django_celery_beat.models import PeriodicTask, IntervalSchedule


class Project(models.Model):
    project_name = models.CharField(max_length=200,unique=True)
    project_scan = models.IntegerField()  
    project_status = models.BooleanField()


    def set_periodic_task(self, task_name):
        schedule = self.get_or_create_interval()
        PeriodicTask.objects.create(
            interval=schedule, 
            name=f'self.project_name-self.id', 
            task=task_name,
        )

    def get_or_create_interval(self):
        schedule, created = IntervalSchedule.objects.get_or_create(
            every=self.project_scan,
            period=IntervalSchedule.HOURS,
        )
        return schedule

    def get_periodic_task(self, task_name):
        interval = self.get_or_create_interval()
        periodic_task = PeriodicTask.objects.get(
            interval=interval, 
            name=f'self.project_name-self.id', 
            task=task_name,
        )
        return periodic_task

    def sync_disable_enable_task(self, task_name):
        periodic_task = self.get_periodic_task(task_name)
        periodic_task.enabled = self.project_status
        periodic_task.save()


@receiver(post_save, sender=Project)
def set_or_sync_periodic_task(sender, instance=None, created=False, **kwargs):
    if created:
        instance.set_periodic_task(task_name='project_tasks')
    else:
        instance.sync_disable_enable_task(task_name='project_tasks')

你有什么: 当您创建一个新的Project 实例时,一个新的定期任务将使用方法set_periodic_task 保存。如果你想要disableenable 实例的周期性任务,你只需更改project_status 状态并保存它。它将触发sync_disable_enable_task 方法来启用或禁用。

如果你想传递参数,你可以这样做:

 PeriodicTask.objects.create(
     interval=schedule,
     name=f'self.project_name-self.id', 
     task='proj.tasks.import_contacts',
     args=json.dumps(['arg1', 'arg2']),
     kwargs=json.dumps(
        'some_kwarg': '123,
     ),
)

【讨论】:

我猜这可能是因为我正在从 tasks.py 迭代对象并将其打印出来。 你想在Project_Tasks任务中做什么?你得到所有Projects 并打印它们, 有什么方法可以将参数传递给tasks.py中的Project_Tasks函数吗?仅运行该特定对象,基于当前任务,例如:pastebin.com/raw/ss7a8VWB 你好 Sergey,这是在 Periodic Tasks 中创建一个 celery.backend_cleanup 任务,默认设置为 4 hour each interval,线程位于:***.com/questions/53958965/…

以上是关于如何在 Django 模型中使用 celery beat 为每个对象创建单独的任务的主要内容,如果未能解决你的问题,请参考以下文章

如何将自定义模型添加到 django celery

如何在 Django 和 Celery 中配置多个代理?

为啥当我尝试在 celery 任务中使用模型时,django 会引发“应用程序尚未加载”错误?

将 Django 模型中的保存方法覆盖为使用 celery 异步的最佳实践

Django ElasticSearch Celery 任务模型调用返回“str”对象不可调用

使用 Celery 的单个 Django 模型的每个对象的不同 crontab