如何将基于类的任务传递给 CELERY_BEAT_SCHEDULE

Posted

技术标签:

【中文标题】如何将基于类的任务传递给 CELERY_BEAT_SCHEDULE【英文标题】:How to pass a class based task into CELERY_BEAT_SCHEDULE 【发布时间】:2017-06-15 11:40:59 【问题描述】:

正如docs 中所见,基于类的任务是表达复杂逻辑的一种公平方式。

但是,文档没有指定如何将闪亮的新创建的基于类的任务添加到您CELERY_BEAT_SCHEDULE(使用 django)中

我尝试过的事情: celery.py

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, 'task_summary')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    from payments.tasks.generic.payeer import PayeerPaymentChecker
    from payments.tasks.generic.ok_pay import OkPayPaymentChecker

    okpay_import = OkPayPaymentChecker()
    payeer_imprt = PayeerPaymentChecker()

    sender.add_periodic_task(60.0, okpay_import.s(),
                             name='OkPay import',
                             expires=30)

    sender.add_periodic_task(60.0, payeer_imprt.s(),
                             name='Payeer import',
                             expires=30)

-- 或者--

payments/task_summary.py

from tasks.generic.import import OkPayPaymentChecker, PayeerPaymentChecker
 run_okpay = OkPayPaymentChecker()
 run_payeer = PayeerPaymentChecker()

CELERY_BEAT_SCHEDULE = 
# yes, i did try referring to the class here
'check_okpay_payments': 

    'task': 'payments.tasks.task_summary.run_okpay',
    'schedule': timedelta(seconds=60),
,
'check_payeer_payments': 
    'task': 'payments.task_summary.run_payeer',
    'schedule': timedelta(seconds=60),
,

真的不知道该怎么办,恢复到类似: payments/task_summary.py/

from payments.tasks.generic.ok_pay import OkPayPaymentChecker
from payments.tasks.generic.payeer import PayeerPaymentChecker
from celery import shared_task


@shared_task
def run_payer():
    instance = PayeerPaymentChecker()
    return instance.run()


@shared_task
def run_okpay():
    instance = OkPayPaymentChecker()
    return instance.run()

我检查过但没有帮助我/解决问题的在线资源:

https://denibertovic.com/posts/celery-best-practices/

https://blog.balthazar-rouberol.com/celery-best-practices

http://shulhi.com/class-based-celery-task/

http://jsatt.com/blog/class-based-celery-tasks/

【问题讨论】:

【参考方案1】:

我也花了一段时间才找到答案,而且由于这个问题在谷歌搜索结果中的排名很高,我想我会把它放在这里给那些正在努力寻找答案的人:

您可以像添加普通任务一样添加它,但使用类名。

CELERY_BEAT_SCHEDULE = 
    'my_task_name': 
        'task': 'mymodule.tasks.MyTaskClass',
        'schedule': timedelta(seconds=60),
,

(这是假设你有mymodule/tasks.py

from celery import Task

class MyTaskClass(Task):

    def run(self, *args, **kwargs):
       ... stuff ...

【讨论】:

虽然我还没有实现你的想法,但为你的意图投票

以上是关于如何将基于类的任务传递给 CELERY_BEAT_SCHEDULE的主要内容,如果未能解决你的问题,请参考以下文章

如何最好地将方法传递给同一类的方法

气流 - 如何将 xcom 变量传递给 Python 函数

使用 python socketserver 如何将变量传递给处理程序类的构造函数

在 Django 1.11 中将 QuerySet 传递给 Celery 任务

如何将任务结果传递给不使用延续的其他任务

如何将异步任务结果传递给另一个活动?