使用 add_periodic_task 在 Celery (celerybeat) 中动态设置周期性任务

Posted

技术标签:

【中文标题】使用 add_periodic_task 在 Celery (celerybeat) 中动态设置周期性任务【英文标题】:Setting up periodic tasks in Celery (celerybeat) dynamically using add_periodic_task 【发布时间】:2017-04-28 17:17:02 【问题描述】:

我正在使用Celery 4.0.1Django 1.10,但我在安排任务时遇到了麻烦(运行任务工作正常)。这是芹菜的配置:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

app.conf.BROKER_URL = 'amqp://:@'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2

app.conf.CELERY_QUEUES = (
    Queue('myapp.celery_default'),
    Queue('myapp.queue1'),
    Queue('myapp.queue2'),
    Queue('myapp.queue3'),
)

然后在 tasks.py 我有:

@app.task(queue='myapp.queue1')
def my_task(some_id):
    print("Doing something with", some_id)

在views.py中我想安排这个任务:

def my_view(request, id):
    app.add_periodic_task(10, my_task.s(id))

然后我执行命令:

sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app

但该任务从未被安排。我在日志中看不到任何内容。这项任务正在发挥作用,因为如果在我看来我这样做了:

def my_view(request, id):
    my_task.delay(id)

任务被执行。

如果在我的配置文件中,如果我手动安排任务,这样它就可以工作:

app.conf.CELERYBEAT_SCHEDULE = 
    'add-every-30-seconds': 
        'task': 'tasks.my_task',
        'schedule': 10.0,
        'args': (66,)
    ,

我只是无法动态安排任务。有什么想法吗?

【问题讨论】:

【参考方案1】:

编辑:(2018 年 13 月 1 日)

最新的release 4.1.0 已经解决了这个ticket #3958 中的主题并已合并


实际上不能在视图级别不定义周期性任务,因为节拍调度设置会先加载,在运行时无法重新调度:

add_periodic_task()函数会在后台添加beat_schedule设置的入口,同样的设置也可以用来手动设置周期性任务:

app.conf.CELERYBEAT_SCHEDULE = 
    'add-every-30-seconds': 
        'task': 'tasks.my_task',
        'schedule': 10.0,
        'args': (66,)
    ,

这意味着如果你想使用add_periodic_task(),它应该被包装在 celery 应用级别的 on_after_configure 处理程序中,并且对运行时的任何修改都不会生效:

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10, my_task.s(66))

正如doc 中提到的,常规的celerybeat 只是简单地跟踪任务执行:

默认调度程序是celery.beat.PersistentScheduler,它只是在本地搁置数据库文件中跟踪上次运行时间。

为了能够动态管理周期性任务并在运行时重新调度 celerybeat:

还有django-celery-beat 扩展将日程存储在 Django 数据库中,并提供方便的管理界面在运行时管理周期性任务

任务将被持久化在 django 数据库中,并且调度程序可以在 db 级别的任务模型中更新。每当您更新定期任务时,此任务表中的计数器将递增,并告诉 celery beat 服务从数据库重新加载计划。

可能的解决方案如下:

from django_celery_beat.models import PeriodicTask, IntervalSchedule

schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))

views.py

def update_task_view(request, id)
    task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
    task.args=json.dumps([id])
    task.save()

【讨论】:

拿起你的评论“实际上你不能在视图级别定义周期性任务”:是否可以在应用级别使用add_periodic_task(),即intask.py?在应用程序中声明这些周期性任务似乎更好的封装。 实际上根本不需要使用它,因为如果您只使用app.conf.CELERYBEAT_SCHEDULE设置语法,它将为您调用,但如果您想明确使用它,您可以在@987654339中使用它@文件。 我相信最新版本(4.1.0 之后)应该解决这个问题。这是正在进行的开发#3958 请注意,on_after_configure 不适用于在另一个应用程序的 tasks.py 文件中定义的定期任务。请改用on_after_finalize。这是因为一旦celery.py 文件被导入,on_after_configure 信号就会被发送,但是autodiscover_tasks 调用直到很久以后,当 django 完成设置所有应用程序时才解决。 (autodiscover_tasks 安排调用但不会立即执行它,除非您使用 force=True 保证在 Django 中失败)。

以上是关于使用 add_periodic_task 在 Celery (celerybeat) 中动态设置周期性任务的主要内容,如果未能解决你的问题,请参考以下文章

造梦西游3 CE怎么使用

windows ce 5.0 的应用程序是不是适合在 windows ce 6.5 上使用?

通过 celery 向 django-channels 发送消息

无法让 SQL Server CE 与 Visual Studio 2005 和 Windows CE 5.0 一起使用

CE的使用方法有哪些?

Windows 逆向CheatEngine 工具 ( 汉化版 CE 工具推荐 | 编写简单 C++ 程序 | C++ 程序执行分析 | 使用 CE 修改上述 C++ 程序 )