芹菜任务与偏移量相同的时间表
Posted
技术标签:
【中文标题】芹菜任务与偏移量相同的时间表【英文标题】:Celery tasks same schedule with offset 【发布时间】:2017-05-28 13:59:38 【问题描述】:我有几个任务,如下所示:
CELERYBEAT_SCHEDULE =
'task1':
'task': 'api.tasks.task1',
'schedule': timedelta(seconds=10),
,
'task2':
'task': 'api.tasks.task2',
'schedule': timedelta(seconds=30),
,
'task3':
'task': 'api.tasks.task3',
'schedule': timedelta(seconds=15),
,
...
因此,task1 将在 *:*:10、*:*:20、*:*:30、*:*:40、*:*:50 和 *:*:00 中运行
task2 将在 *:*:30 和 *:*:00 中运行
task3 将在 *:*:15、*:*:30、*:*:45 和 *:*:00 中运行
然后任务总是在 *:*:30 和 *:*:00 中发生。有什么方法可以添加偏移量。我想得到这样的东西:
task1 (offset=2) 在 *:*:12, *:*:22, *:*:32, *:*:42, *:*:52 和 *:*:02 中运行
task2 (offset=7) 在 *:*:37 和 *:*:07 中运行
task3 (offset=0) 在 *:*:15、*:*:30、*:*:45 和 *:*:00 中运行
我已经阅读了文档,我认为我必须使用 crontab,但是没有其他更好的方法吗?而且 crontab 没有秒配置:-(
【问题讨论】:
我实现了一次schedulesince(timedelta(seconds=10)
通常会转换为schedule(timedelta(seconds=10))
)。这并不完全符合您的要求,但也许会有所帮助。
谢谢@glowka 我可以像例子一样使用它......但是它不能解决我的问题:-(
你有什么问题?
@Goin:这似乎是一个非常不寻常的要求。也许有更简单的方法来构建底层架构/需求?
@sdolan 这不是一个解决方案:-P 这是非常基本的。现在我每分钟都有几次运行任务,而其他几次没有执行任何操作。我想分配任务,但我不想改变间隔,只改变初始点
【参考方案1】:
您可以通过以下步骤解决此问题:
1.CELERYBEAT_SCHEDULE
文件不需要添加settings.py
2.在__init__.py
文件api
应用程序中添加以下代码:
import tasks
3.然后在tasks.py
文件上:
from datetime import datetime
from celery import Celery
app = Celery()
run_id = None
@app.task
def task1():
print('every 10 seconds:', datetime.now().second)
@app.task
def task2():
print('every 30 seconds:', datetime.now().second)
@app.task
def task3():
print('every 15 seconds:', datetime.now().second)
@app.task
def run(sender):
global app, run_id
# Schedule other tasks
sender.add_periodic_task(10.0, task1.s())
sender.add_periodic_task(30.0, task2.s())
sender.add_periodic_task(15.0, task3.s())
# Stop self running later times
app.control.revoke(run_id)
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
global run_id
now = datetime.now()
run_id = sender.add_periodic_task((30 if now.second < 30 else 60) - now.second, run.s(sender))
【讨论】:
【参考方案2】:根据celery documentation:
您还可以通过扩展 schedule的接口。
所以这是我的解决方案:
from datetime import timedelta
from celery import Celery
from celery.schedules import schedule
class MySchedule(schedule):
def __init__(self, run_every=None, offset=None):
self._run_every = run_every
self._offset = offset if offset is not None else timedelta(seconds=0)
self._do_offset = True if self._offset else False
super(MySchedule, self).__init__(
run_every=self._run_every + self._offset)
def is_due(self, last_run_at):
ret = super(MySchedule, self).is_due(last_run_at)
if self._do_offset and ret.is_due:
self._do_offset = False
**self._offset = datetime.timedelta(seconds=0)** #bug fix
self.run_every = self._run_every
ret = super(MySchedule, self).is_due(last_run_at)
return ret
def __reduce__(self):
return self.__class__, (self._run_every, self._offset)
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.beat_schedule =
'task1':
'task': 'tasks.task1',
'schedule': MySchedule(
run_every=timedelta(seconds=10), offset=timedelta(seconds=2)),
,
'task2':
'task': 'tasks.task2',
'schedule': MySchedule(
run_every=timedelta(seconds=30), offset=timedelta(seconds=7)),
,
'task3':
'task': 'tasks.task3',
'schedule': MySchedule(
run_every=timedelta(seconds=15), offset=timedelta(seconds=0)),
,
@app.task
def task1():
print('task1')
@app.task
def task2():
print('task2')
@app.task
def task3():
print('task3')
您可以编写自己的MySchedule
并从BaseSchedule
扩展它以获得更多控制权。
【讨论】:
【参考方案3】:我试图用一个有点不同的解决方案来解决这个问题,比如 vasi1y 解决方案。但是这个解决方案和以前的解决方案都不起作用......
class schedule_offset(schedule):
def __init__(self, run_every=None, offset=None,
relative=False, nowfun=None, app=None):
self._run_every = run_every
if offset is None:
offset = 0
self._offset = maybe_timedelta(offset)
self._executing = 0
super(schedule_offset, self).__init__(
run_every=self._run_every, relative=relative, nowfun=nowfun, app=app)
def is_due(self, last_run_at):
last_run_at = last_run_at + self._offset
last_run_at = self.maybe_make_aware(last_run_at)
rem_delta = self.remaining_estimate(last_run_at)
remaining_s = timedelta_seconds(rem_delta)
if remaining_s == 0:
ret = schedstate(is_due=True, next=self.seconds + self._offset.seconds)
if self._executing < 2:
self._executing += 1
if self._executing == 2:
self._offset = maybe_timedelta(0)
return ret
return schedstate(is_due=False, next=remaining_s)
def __reduce__(self):
return self.__class__, (self._run_every, self._offset, self.relative, self.nowfun)
CELERYBEAT_SCHEDULE =
'task1':
'task': 'api.tasks.task1',
'schedule': schedule_offset(timedelta(seconds=10), offset=2),
,
'task2':
'task': 'api.tasks.task2',
'schedule': schedule_offset(timedelta(seconds=30), offset=7),
,
'task3':
'task': 'api.tasks.task3',
'schedule': timedelta(seconds=15),
,
...
【讨论】:
以上是关于芹菜任务与偏移量相同的时间表的主要内容,如果未能解决你的问题,请参考以下文章