芹菜任务与偏移量相同的时间表

Posted

技术标签:

【中文标题】芹菜任务与偏移量相同的时间表【英文标题】:Celery tasks same schedule with offset 【发布时间】:2017-05-28 13:59:38 【问题描述】:

我有几个任务,如下所示:

CELERYBEAT_SCHEDULE = 
    'task1': 
        'task': 'api.tasks.task1',
        'schedule': timedelta(seconds=10),
    ,
    'task2': 
        'task': 'api.tasks.task2',
        'schedule': timedelta(seconds=30),
    ,
    'task3': 
        'task': 'api.tasks.task3',
        'schedule': timedelta(seconds=15),
    ,
    ...

因此,task1 将在 *:*:10、*:*:20、*:*:30、*:*:40、*:*:50 和 *:*:00 中运行

task2 将在 *:*:30 和 *:*:00 中运行

task3 将在 *:*:15、*:*:30、*:*:45 和 *:*:00 中运行

然后任务总是在 *:*:30 和 *:*:00 中发生。有什么方法可以添加偏移量。我想得到这样的东西:

task1 (offset=2) 在 *:*:12, *:*:22, *:*:32, *:*:42, *:*:52 和 *:*:02 中运行

task2 (offset=7) 在 *:*:37 和 *:*:07 中运行

task3 (offset=0) 在 *:*:15、*:*:30、*:*:45 和 *:*:00 中运行

我已经阅读了文档,我认为我必须使用 crontab,但是没有其他更好的方法吗?而且 crontab 没有秒配置:-(

【问题讨论】:

我实现了一次schedulesince(timedelta(seconds=10) 通常会转换为schedule(timedelta(seconds=10)))。这并不完全符合您的要求,但也许会有所帮助。 谢谢@glowka 我可以像例子一样使用它......但是它不能解决我的问题:-( 你有什么问题? @Goin:这似乎是一个非常不寻常的要求。也许有更简单的方法来构建底层架构/需求? @sdolan 这不是一个解决方案:-P 这是非常基本的。现在我每分钟都有几次运行任务,而其他几次没有执行任何操作。我想分配任务,但我不想改变间隔,只改变初始点 【参考方案1】:

您可以通过以下步骤解决此问题:

1.CELERYBEAT_SCHEDULE文件不需要添加settings.py

2.在__init__.py文件api应用程序中添加以下代码:

import tasks

3.然后在tasks.py文件上:

from datetime import datetime

from celery import Celery


app = Celery()
run_id = None

@app.task
def task1():
    print('every 10 seconds:', datetime.now().second)

@app.task
def task2():
    print('every 30 seconds:', datetime.now().second)

@app.task
def task3():
    print('every 15 seconds:', datetime.now().second)

@app.task
def run(sender):
    global app, run_id
    # Schedule other tasks
    sender.add_periodic_task(10.0, task1.s())
    sender.add_periodic_task(30.0, task2.s())
    sender.add_periodic_task(15.0, task3.s())
    # Stop self running later times
    app.control.revoke(run_id)

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    global run_id
    now = datetime.now()
    run_id = sender.add_periodic_task((30 if now.second < 30 else 60) - now.second, run.s(sender))

【讨论】:

【参考方案2】:

根据celery documentation:

您还可以通过扩展 schedule的接口。

所以这是我的解决方案:

from datetime import timedelta

from celery import Celery
from celery.schedules import schedule


class MySchedule(schedule):
    def __init__(self, run_every=None, offset=None):
        self._run_every = run_every
        self._offset = offset if offset is not None else timedelta(seconds=0)
        self._do_offset = True if self._offset else False
        super(MySchedule, self).__init__(
            run_every=self._run_every + self._offset)

    def is_due(self, last_run_at):
        ret = super(MySchedule, self).is_due(last_run_at)
        if self._do_offset and ret.is_due:
            self._do_offset = False
            **self._offset = datetime.timedelta(seconds=0)** #bug fix
            self.run_every = self._run_every
            ret = super(MySchedule, self).is_due(last_run_at)
        return ret

    def __reduce__(self):
        return self.__class__, (self._run_every, self._offset)


app = Celery('tasks', broker='pyamqp://guest@localhost//')

app.conf.beat_schedule = 
    'task1': 
        'task': 'tasks.task1',
        'schedule': MySchedule(
            run_every=timedelta(seconds=10), offset=timedelta(seconds=2)),
    ,
    'task2': 
        'task': 'tasks.task2',
        'schedule': MySchedule(
            run_every=timedelta(seconds=30), offset=timedelta(seconds=7)),
    ,
    'task3': 
        'task': 'tasks.task3',
        'schedule': MySchedule(
            run_every=timedelta(seconds=15), offset=timedelta(seconds=0)),
    ,



@app.task
def task1():
    print('task1')


@app.task
def task2():
    print('task2')


@app.task
def task3():
    print('task3')

您可以编写自己的MySchedule 并从BaseSchedule 扩展它以获得更多控制权。

【讨论】:

【参考方案3】:

我试图用一个有点不同的解决方案来解决这个问题,比如 vasi1y 解决方案。但是这个解决方案和以前的解决方案都不起作用......

class schedule_offset(schedule):

    def __init__(self, run_every=None, offset=None,
                relative=False, nowfun=None, app=None):
        self._run_every = run_every
        if offset is None:
            offset = 0
        self._offset = maybe_timedelta(offset)
        self._executing = 0
        super(schedule_offset, self).__init__(
            run_every=self._run_every, relative=relative, nowfun=nowfun, app=app)

    def is_due(self, last_run_at):
        last_run_at = last_run_at + self._offset
        last_run_at = self.maybe_make_aware(last_run_at)
        rem_delta = self.remaining_estimate(last_run_at)
        remaining_s = timedelta_seconds(rem_delta)
        if remaining_s == 0:
            ret = schedstate(is_due=True, next=self.seconds + self._offset.seconds)
            if self._executing < 2:
                self._executing += 1
                if self._executing == 2:
                    self._offset = maybe_timedelta(0)
            return ret
        return schedstate(is_due=False, next=remaining_s)

    def __reduce__(self):
        return self.__class__, (self._run_every, self._offset, self.relative, self.nowfun)


CELERYBEAT_SCHEDULE = 
    'task1': 
        'task': 'api.tasks.task1',
        'schedule': schedule_offset(timedelta(seconds=10), offset=2),
    ,
    'task2': 
        'task': 'api.tasks.task2',
        'schedule': schedule_offset(timedelta(seconds=30), offset=7),
    ,
    'task3': 
        'task': 'api.tasks.task3',
        'schedule': timedelta(seconds=15),
    ,
    ...

【讨论】:

以上是关于芹菜任务与偏移量相同的时间表的主要内容,如果未能解决你的问题,请参考以下文章

如何防止芹菜执行相同的任务?

芹菜节拍:crontab 两次执行相同的任务(间隔 10 分钟)

如何禁用芹菜任务结果记录?

什么可能会延迟我的芹菜任务?

如何在显示网格中使用偏移量(边距)

从芹菜任务中获取芹菜工人的名字?