使用 add_periodic_task 在 Celery (celerybeat) 中动态设置周期性任务
Posted
技术标签:
【中文标题】使用 add_periodic_task 在 Celery (celerybeat) 中动态设置周期性任务【英文标题】:Setting up periodic tasks in Celery (celerybeat) dynamically using add_periodic_task 【发布时间】:2017-04-28 17:17:02 【问题描述】:我正在使用Celery 4.0.1
和Django 1.10
,但我在安排任务时遇到了麻烦(运行任务工作正常)。这是芹菜的配置:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.BROKER_URL = 'amqp://:@'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2
app.conf.CELERY_QUEUES = (
Queue('myapp.celery_default'),
Queue('myapp.queue1'),
Queue('myapp.queue2'),
Queue('myapp.queue3'),
)
然后在 tasks.py 我有:
@app.task(queue='myapp.queue1')
def my_task(some_id):
print("Doing something with", some_id)
在views.py中我想安排这个任务:
def my_view(request, id):
app.add_periodic_task(10, my_task.s(id))
然后我执行命令:
sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app
但该任务从未被安排。我在日志中看不到任何内容。这项任务正在发挥作用,因为如果在我看来我这样做了:
def my_view(request, id):
my_task.delay(id)
任务被执行。
如果在我的配置文件中,如果我手动安排任务,这样它就可以工作:
app.conf.CELERYBEAT_SCHEDULE =
'add-every-30-seconds':
'task': 'tasks.my_task',
'schedule': 10.0,
'args': (66,)
,
我只是无法动态安排任务。有什么想法吗?
【问题讨论】:
【参考方案1】:编辑:(2018 年 13 月 1 日)
最新的release 4.1.0 已经解决了这个ticket #3958 中的主题并已合并
实际上不能在视图级别不定义周期性任务,因为节拍调度设置会先加载,在运行时无法重新调度:
add_periodic_task()
函数会在后台添加beat_schedule设置的入口,同样的设置也可以用来手动设置周期性任务:app.conf.CELERYBEAT_SCHEDULE = 'add-every-30-seconds': 'task': 'tasks.my_task', 'schedule': 10.0, 'args': (66,) ,
这意味着如果你想使用add_periodic_task()
,它应该被包装在 celery 应用级别的 on_after_configure
处理程序中,并且对运行时的任何修改都不会生效:
app = Celery()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(10, my_task.s(66))
正如doc 中提到的,常规的celerybeat 只是简单地跟踪任务执行:
默认调度程序是
celery.beat.PersistentScheduler
,它只是在本地搁置数据库文件中跟踪上次运行时间。
为了能够动态管理周期性任务并在运行时重新调度 celerybeat:
还有django-celery-beat 扩展将日程存储在 Django 数据库中,并提供方便的管理界面在运行时管理周期性任务。
任务将被持久化在 django 数据库中,并且调度程序可以在 db 级别的任务模型中更新。每当您更新定期任务时,此任务表中的计数器将递增,并告诉 celery beat 服务从数据库重新加载计划。
可能的解决方案如下:
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))
views.py
def update_task_view(request, id)
task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
task.args=json.dumps([id])
task.save()
【讨论】:
拿起你的评论“实际上你不能在视图级别定义周期性任务”:是否可以在应用级别使用add_periodic_task()
,即intask.py
?在应用程序中声明这些周期性任务似乎更好的封装。
实际上根本不需要使用它,因为如果您只使用app.conf.CELERYBEAT_SCHEDULE
设置语法,它将为您调用,但如果您想明确使用它,您可以在@987654339中使用它@文件。
我相信最新版本(4.1.0 之后)应该解决这个问题。这是正在进行的开发#3958
请注意,on_after_configure
不适用于在另一个应用程序的 tasks.py
文件中定义的定期任务。请改用on_after_finalize
。这是因为一旦celery.py
文件被导入,on_after_configure
信号就会被发送,但是autodiscover_tasks
调用直到很久以后,当 django 完成设置所有应用程序时才解决。 (autodiscover_tasks
安排调用但不会立即执行它,除非您使用 force=True
保证在 Django 中失败)。以上是关于使用 add_periodic_task 在 Celery (celerybeat) 中动态设置周期性任务的主要内容,如果未能解决你的问题,请参考以下文章
windows ce 5.0 的应用程序是不是适合在 windows ce 6.5 上使用?
通过 celery 向 django-channels 发送消息
无法让 SQL Server CE 与 Visual Studio 2005 和 Windows CE 5.0 一起使用
Windows 逆向CheatEngine 工具 ( 汉化版 CE 工具推荐 | 编写简单 C++ 程序 | C++ 程序执行分析 | 使用 CE 修改上述 C++ 程序 )