在任务中获取芹菜节拍触发时间
Posted
技术标签:
【中文标题】在任务中获取芹菜节拍触发时间【英文标题】:Get celery beat trigger time on task 【发布时间】:2017-02-21 09:03:09 【问题描述】:我正在尝试找到一种方法来获取触发 celery beat 以触发任务的时间条件。
获取datetime.now()
由于所有 celery 工作人员都在忙碌,因此通常会偏离任务按 celery beat 排队的时间。
例如:我将任务设置为每天 12:30 执行,但由于任务在 12:31 开始运行时所有工作人员都很忙。
无论任务执行的时间如何,我都需要知道哪个时间条件触发了任务。
编辑:
这就是我定义周期性任务的方式:
CELERYBEAT_SCHEDULE =
'periodic_clear_task':
'task': 'app.tasks.periodic_clear_task',
'schedule': crontab(hour=2),
'args': ()
,
'periodic_update_task':
'task': 'app.tasks.periodic_update_task',
'schedule': crontab(minute='00,30'),
'args': ()
,
【问题讨论】:
您可以在调用任务时将 datetime.now() 作为参数传递吗? @JensAstrup 我该怎么做?编辑:我刚刚编辑了问题以包括我的定期任务的定义 刚刚记住,一旦 celery worker 启动,调度程序就会被处理——而不是每次运行任务时。换句话说,将 datetime 作为参数传递只会给你启动 worker 的时间,这没有帮助。您正在寻找的内容可能在bound methods 嗨莱昂内尔,你能得到这个答案吗?我也想这样做。 @MohammadMustaqeem 不幸的是没有。我最终将那些需要在他们自己的队列中更短的触发时间的任务排队,这样他们就不必等到其他任务完成并解决我的问题。这可能无法解决您的方案中的问题。它对我有用,因为这些任务并不多,可以移动到另一个队列,而不必担心顺序执行会丢失。如果您有任何其他问题,请随时提问。 【参考方案1】:我遇到了同样的问题,我想在周期性 celery 任务中使用任务触发时间而不是执行时间。我在具有任务触发时间的 celery 任务中找不到确切的字段。作为一种解决方法,我使用了 expires time of task。
task_expires_day =30
@app.on_after_configure.connect
def df_beat_setup(sender, **kwargs):
pipeline_config =
sender.add_periodic_task(
crontab(
minute=0,
hour=8,
),
df_scheduler_task.s(),
args=(pipeline_config),
name="df_trigger",
queue="df_scheduler_queue",
expires=60*60*24*task_expires_day,
options="time": datetime.now()
)
@app.task(name="df_scheduler_queue", bind=True, acks_late=True)
def df_scheduler_task(task: "celery.Task", pipeline_config: Dict, time: str) -> dict:
task_trigger_time = parser.parse(task.request.expires)-timedelta(days=task_expires_day))
...
【讨论】:
【参考方案2】:与此问题中的另一个 answer 类似,我已使用 expires(秒)来解决此问题。使用 Celery Beat 时间表时,可以指定如下:
limit = 60
CELERYBEAT_SCHEDULE =
...,
'periodic_update_task':
'task': 'app.tasks.periodic_update_task',
'schedule': crontab(minute='00,30'),
'args': (),
'options': 'expires': 60 * 60 * 24 * limit
,
这里的限制很重要,因为现在我们分配了一个到期值,如果达到这个限制,任务就不会被执行。因此,它需要选择得足够大,以便根据需要处理任务。请记住,到期值以秒为单位指定。更多信息here。
当指定 expires 参数时,Celery 会自动将此秒值转换为要在内部使用的日期时间值。要访问函数中的过期时间,您必须绑定方法,然后从任务请求对象中检索。此任务对象应如下所示。
<Context: 'lang': 'py', 'task': 'app.tasks.periodic_update_task', 'expires': '2021-10-23T21:11:26.031443+01:00', 'id': 'ef0e301a-b9b3-4930-8fa3-ca3fdf6b36cb', ... >
一旦有时间,您可以进行简单的计算以重新获得触发时间。
@app.task(bind=True)
def periodic_update_task(self):
trigger_time = parser.parse(self.request.expires)-timedelta(days=limit))
【讨论】:
以上是关于在任务中获取芹菜节拍触发时间的主要内容,如果未能解决你的问题,请参考以下文章