将异步协程作为 celery 任务运行

Posted

技术标签:

【中文标题】将异步协程作为 celery 任务运行【英文标题】:Run asynchronous coroutines as celery tasks 【发布时间】:2019-09-03 13:50:54 【问题描述】:

我希望能够在我的 celery 任务中使用异步代码。它适用于异步或龙卷风。我发现,我可以做这样的事情


from tornado.ioloop import IOLoop

from celery._state import _task_stack

from . import celery


class AsyncTask(celery.Task):

    def __call__(self, *args, **kwargs):
        _task_stack.push(self)
        self.push_request(args=args, kwargs=kwargs)
        try:
            IOLoop.current().run_sync(lambda: self.run(*args, **kwargs))
        finally:
            self.pop_request()
            _task_stack.pop()

然后像这样使用它

from .celery import celery
from tornado.httpclient import AsyncHTTPClient

@celery.task(base=AsyncTask)
async def test_async_celery_task(x, y):
    result = await AsyncHTTPClient().fetch(request='https://google.com.ua')
    print('Async IS OKAY: '.format(result))

或者我可以直接在任务中使用run_sync,这不是可取的

我想知道是否可以执行上述操作,或者我应该在 worker 中启动事件循环并通过 add_future 启动我的任务。有没有其他人做过类似的事情?我可以期待一些性能提升吗?

我需要这个,因为我需要能够使用来自项目其他部分的异步代码,例如数据库调用 ets

【问题讨论】:

【参考方案1】:

不是正确使用 ioloop 使事情变得复杂,而是让 Celery 知道它可以并且应该使用异步任务。 Celery 与 Pools 一起运行,它可以在那里安排你的工作。池可以是线程的,可以是多进程的,Celery 知道(通过配置)它有 X 进程、Y 线程、Z 工人,其中有多少有工作或空闲。但是 ATM,Celery 在技术上能够接收和运行协程,但开箱即用无法计算它们以跟踪其中有多少有工作或空闲。如果您想看看它的外观 - 有a development code of future Celery's Asyncpool。

【讨论】:

以上是关于将异步协程作为 celery 任务运行的主要内容,如果未能解决你的问题,请参考以下文章

为啥 Celery 守护进程看不到任务?

celery(异步处理)+redis

CELERY 定时任务

Django中使用celery来异步处理和定时任务

celery异步,延时任务, 周期任务

python3.7 中使用django-celery 完成异步任务