芹菜任务没有注册?
Posted
技术标签:
【中文标题】芹菜任务没有注册?【英文标题】:Celery task is not getting registered? 【发布时间】:2021-04-25 17:35:37 【问题描述】:我正在使用 Django + Celery 来动态安排任务。调度任务的时间从用户数据库中查询或由用户动态提供。 我正在尝试将参数传递给 start_task 函数,但它破坏了代码,因为调度程序无法将其添加到其条目中。如果没有附加参数 (schedule_start_time),它可以正常工作。 我认为它与信号有关!
def clean_time(time):
time = time.get("created_at")
hh = time.strftime("%H")
mm = time.strftime("%M")
ss = time.strftime("%S")
dd = time.strftime("%d")
mnth = time.strftime("%m")
yy = time.strftime("%Y")
return hh, mm, ss, dd, mnth, yy
@app.on_after_configure.connect
def start_time(schedule_start_date, **kwargs):
hh, mm, ss, dd, mnth, yy = clean_time(schedule_start_date)
app.add_periodic_task(crontab((hour=''.format(hh), minute=''.format(mm), day_of_month=''.format(dd), month_of_year=''.format(mnth)), test.s("hello"))
@app.task
def test(arg):
print(arg)
【问题讨论】:
【参考方案1】:H3ll0.
您应该使用 django-celery-beat 来管理 django 中的周期性 celery 任务。
这是我为测试 django-celery-beat 而创建的一个 repo。
Periodicall Repo
请记住,您应该运行工作程序,并且文档中描述了节拍。
django-celery-beat-documentation
【讨论】:
这里是芹菜维护者,这是正确答案。以上是关于芹菜任务没有注册?的主要内容,如果未能解决你的问题,请参考以下文章