通过 Django-celery 调度数千个一次性(非重复性)任务以实现近乎同时执行

Posted

技术标签:

【中文标题】通过 Django-celery 调度数千个一次性(非重复性)任务以实现近乎同时执行【英文标题】:Scheduling thousands of one-off (non-reoccuring) tasks for near-simultaneous execution via Django-celery 【发布时间】:2012-08-22 06:54:59 【问题描述】:

一些上下文:我正在构建一个 Django 应用程序,它允许用户预先保存一个操作,并安排他们希望执行所述操作的确切日期/时间。例如,安排下周早上 5:30 以编程方式将帖子推送到 Facebook 墙上。

我正在寻找一个任务调度系统,它可以处理一千个一次性任务实例,所有实例都设置为几乎同时执行(误差范围正负一分钟)。

我正在考虑为此使用 Django-celery/Rabbitmq,但我注意到 Celery docs 不解决一次性使用的任务。在这里,Django-celery 是正确的选择(也许是通过子类化 CrontabSchedule)还是我的精力更好地用于研究其他方法?也许与 Sched Module 和 Cron 一起破解一些东西。

【问题讨论】:

我不知道您是否会收到我最新编辑的通知,所以我只是想在这里发表评论并确定一下。 【参考方案1】:

编辑 2:

出于某种原因,我的头脑最初陷入了重复任务的领域。这是一个更简单的解决方案。

您真正需要的是为每个用户操作定义一个任务。您可以跳过将要在数据库中执行的存储任务——这就是 celery 的用途!

再次重用您的 facebook 帖子示例,再次假设您在某处有一个函数 post_to_facebook,它接收用户和一些文本,执行一些魔法,并将文本发布到该用户的 facebook,您可以将其定义为像这样的任务:

# Task to send one update.
@celery.task(ignore_result=True)
def post_to_facebook(user, text):
    # perform magic
    return whatever_you_want

当用户准备好将此类帖子排入队列时,您只需告诉 celery 何时运行该任务:

post_to_facebook.apply_async(
    (user, text),   # args
    eta=datetime.datetime(2012, 9, 15, 11, 45, 4, 126440)  # pass execution options as kwargs
)

这里有详细的信息,还有一大堆可用的看涨期权:http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown

如果需要调用结果,可以跳过任务定义中的ignore_result参数,取回一个AsyncResult对象,然后查看调用结果。更多内容:http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results

下面的一些答案仍然相关。您仍然希望每个用户操作都有一个任务,您仍然想考虑任务设计等,但这是完成您所要求的事情的更简单的方法。

使用重复任务的原始答案如下:

Dannyroa 的想法是正确的。我将在此基础上再做一点。

编辑/TLDR: 答案是是的,芹菜适合您的需求。您可能只需要重新考虑您的任务定义。

我假设您不允许您的用户编写任意 Python 代码来定义他们的任务。除此之外,您将必须预定义用户可以安排的一些操作,然后允许他们根据需要安排这些操作。然后,您只需为每个用户操作运行一个计划任务,检查条目并为每个条目执行操作。

一个用户操作:

使用您的 Facebook 示例,您可以将用户的更新存储在一个表中:

class ScheduledPost(Model):
    user = ForeignKey('auth.User')
    text = TextField()
    time = DateTimeField()
    sent = BooleanField(default=False)

然后您将每分钟运行一次任务,检查该表中计划在最后一分钟发布的条目(基于您提到的误差范围)。如果达到一分钟窗口非常重要,则可以更频繁地安排任务,例如每 30 秒一次。任务可能如下所示(在 myapp/tasks.py 中):

@celery.task
def post_scheduled_updates():
    from celery import current_task
    scheduled_posts = ScheduledPost.objects.filter(
        sent=False,
        time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
        time__lte=timezone.now()
    )
    for post in scheduled_posts:
        if post_to_facebook(post.text):
            post.sent = True
            post.save()

配置可能如下所示:

CELERYBEAT_SCHEDULE = 
    'fb-every-30-seconds': 
        'task': 'tasks.post_scheduled_updates',
        'schedule': timedelta(seconds=30),
    ,

其他用户操作:

对于每个用户操作,除了发帖到 Facebook 之外,您还可以定义一个新表和一个新任务:

class EmailToMom(Model):
    user = ForeignKey('auth.User')
    text = TextField()
    subject = CharField(max_length=255)
    sent = BooleanField(default=False)
    time = DateTimeField()

@celery.task
def send_emails_to_mom():
    scheduled_emails = EmailToMom.objects.filter(
        sent=False,
        time__lt=timezone.now()
    )
    for email in scheduled_emails:
        sent = send_mail(
            email.subject,
            email.text,
            email.user.email,
            [email.user.mom.email],
        )
        if sent:
            email.sent = True
            email.save()

    CELERYBEAT_SCHEDULE = 
        'fb-every-30-seconds': 
            'task': 'tasks.post_scheduled_updates',
            'schedule': timedelta(seconds=30),
        ,
        'mom-every-30-seconds': 
            'task': 'tasks.send_emails_to_mom',
            'schedule': timedelta(seconds=30),
        ,
    

速度和优化:

为了获得更高的吞吐量,您可以在post_scheduled_updates 调用期间生成一组子任务并并行执行它们,而不是迭代更新以发布并在post_scheduled_updates 调用期间串行发送它们(给定足够的workers)。然后对post_scheduled_updates 的调用运行得非常快,并安排了一大堆任务——每个fb 更新一个——以便尽快运行。看起来像这样:

# Task to send one update. This will be called by post_scheduled_updates.
@celery.task
def post_one_update(update_id):
    try:
        update = ScheduledPost.objects.get(id=update_id)
    except ScheduledPost.DoesNotExist:
        raise
    else:
        sent = post_to_facebook(update.text)
        if sent:
            update.sent = True
            update.save()
        return sent

@celery.task
def post_scheduled_updates():
    from celery import current_task
    scheduled_posts = ScheduledPost.objects.filter(
        sent=False,
        time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
        time__lte=timezone.now()
    )
    for post in scheduled_posts:
        post_one_update.delay(post.id)

我发布的代码没有经过测试,当然也没有优化,但它应该能让你走上正轨。在您的问题中,您暗示了对吞吐量的一些担忧,因此您需要仔细查看要优化的地方。一个明显的问题是批量更新,而不是反复调用post.sent=True;post.save()

更多信息:

关于周期性任务的更多信息:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html。

关于任务设计策略的部分:http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies

这里有一整页关于优化 celery:http://docs.celeryproject.org/en/latest/userguide/optimizing.html。

这个关于子任务的页面也可能很有趣:http://docs.celeryproject.org/en/latest/userguide/canvas.html。

事实上,我建议阅读所有 celery 文档。

【讨论】:

感激不尽,您的回答对我非常有帮助。 你的一些任务不符合 celery 任务的重要幂等规则。例如: ``` @celery.task def send_emails_to_mom(): scheduled_emails = EmailToMom.objects.filter( sent=False, time__lt=timezone.now() ) 用于 schedule_emails 中的电子邮件:``` 如果此任务在同时,两者都将获得相同的预定电子邮件列表,然后开始发送到相同的列表。【参考方案2】:

我要做的是创建一个名为 ScheduledPost 的模型。

我将有一个每 5 分钟左右运行一次的 PeriodicTask。

该任务将检查 ScheduledPost 表中是否有任何需要推送到 Facebook 的帖子。

【讨论】:

以上是关于通过 Django-celery 调度数千个一次性(非重复性)任务以实现近乎同时执行的主要内容,如果未能解决你的问题,请参考以下文章

如何修改django-celery web界面进行周期性调度

django-Celery分布式队列简单使用

如何在heroku服务器中配置django-celery

以编程方式批量添加数千个 Android 联系人

Django-Celery 进度条

通过 mod-rewrite 创建数千个 url