如何将 Celery 与 asyncio 结合使用?

Posted

技术标签:

【中文标题】如何将 Celery 与 asyncio 结合使用?【英文标题】:How to combine Celery with asyncio? 【发布时间】:2017-02-10 11:07:20 【问题描述】:

如何创建一个使 celery 任务看起来像 asyncio.Task 的包装器?或者有没有更好的方法将 Celery 与 asyncio 集成?

@asksol,Celery 的创造者,said this::

将 Celery 用作异步 I/O 框架之上的分布式层是很常见的(重要提示:将 CPU 绑定任务路由到 prefork worker 意味着它们不会阻塞您的事件循环)。

但我找不到任何专门针对 asyncio 框架的代码示例。

【问题讨论】:

你能澄清一下“看起来”是什么意思吗?我想您可能误解了 Asksol 的评论——您将 celery 放在了作为异步管理器的 Rabbit 或 SQS 等框架的前面。因此,您可以为使用 asyncio 的 celery 创建一个代理/插件,但任务不会“看起来像”(即具有 asyncio 的接口)? celery 的重点是抽象使用的异步方法? 【参考方案1】:

这是我在必要时处理异步协程的 Celery 实现:

包装 Celery 类以扩展其功能:

from celery import Celery
from inspect import isawaitable
import asyncio


class AsyncCelery(Celery):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.patch_task()

        if 'app' in kwargs:
            self.init_app(kwargs['app'])

    def patch_task(self):
        TaskBase = self.Task

        class ContextTask(TaskBase):
            abstract = True

            async def _run(self, *args, **kwargs):
                result = TaskBase.__call__(self, *args, **kwargs)
                if isawaitable(result):
                    await result

            def __call__(self, *args, **kwargs):
                asyncio.run(self._run(*args, **kwargs))

        self.Task = ContextTask

    def init_app(self, app):
        self.app = app

        conf = 
        for key in app.config.keys():
            if key[0:7] == 'CELERY_':
                conf[key[7:].lower()] = app.config[key]

        if 'broker_transport_options' not in conf and conf.get('broker_url', '')[0:4] == 'sqs:':
            conf['broker_transport_options'] = 'region': 'eu-west-1'

        self.config_from_object(conf)


celery = AsyncCelery()

【讨论】:

【参考方案2】:

这是一个简单的助手,您可以使用它来让 Celery 任务等待:

import asyncio
from asgiref.sync import sync_to_async

# Converts a Celery tasks to an async function
def task_to_async(task):
    async def wrapper(*args, **kwargs):
        delay = 0.1
        async_result = await sync_to_async(task.delay)(*args, **kwargs)
        while not async_result.ready():
            await asyncio.sleep(delay)
            delay = min(delay * 1.5, 2)  # exponential backoff, max 2 seconds
        return async_result.get()
    return wrapper

sync_to_async一样,可以直接作为包装器使用:

@shared_task
def get_answer():
    sleep(10) # simulate long computation
    return 42    

result = await task_to_async(get_answer)()

...作为装饰者:

@task_to_async
@shared_task
def get_answer():
    sleep(10) # simulate long computation
    return 42    

result = await get_answer()

当然,这不是一个完美的解决方案,因为它依赖于polling。 但是,在 Celery officially provides a better solution 之前从 Django 异步视图调用 Celery 任务应该是一个很好的解决方法。

编辑 2021/03/02:添加对 sync_to_async 的调用以支持 eager mode。

【讨论】:

这是一个可靠的解决方法,我们已经在我的回答中提到的 FastAPI 应用程序中使用了它(虽然不是作为装饰器):) 请记住,您需要注意错误处理并有一个计划如何处理任何潜在的异常! task_to_async 调用 AsyncResult.get(),这会重新引发任务引发的任何异常。当然,如果你想自定义这个行为,你可以在task_to_async添加参数,然后转发给async_result.get() 将任务包装在异步助手中的意义何在?你不能只用睡眠来实现循环,没有它吗? Afaik task.delay 是非阻塞的。只有类似 task.get 的东西会阻塞。【参考方案3】:

编辑:2021 年 1 月 12 日以前的答案(在底部找到)没有很好地老化,因此我添加了可能的解决方案组合,这些解决方案可能会满足那些仍然在寻找如何共同使用 asyncio 和芹菜

让我们先快速分解用例(更深入的分析在这里:asyncio and coroutines vs task queues):

如果任务受 I/O 限制,则使用协程和异步会更好。 如果任务受 CPU 限制,则最好使用 Celery 或其他类似的任务管理系统。

因此,在 Python 的“做一件事并把它做好”的上下文中,不要尝试将 asyncio 和 celery 混合在一起是有意义的。

但是,如果我们希望能够以异步方式和异步任务的形式运行方法,会发生什么情况?那么我们有一些选择可以考虑:

我能找到的最好的例子如下:https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/(我刚刚发现它是@Franey's response):

    定义你的异步方法。

    使用asgirefsync.async_to_sync模块封装异步方法并在一个celery任务中同步运行:

    # tasks.py
    import asyncio
    from asgiref.sync import async_to_sync
    from celery import Celery
    
    app = Celery('async_test', broker='a_broker_url_goes_here')
    
    async def return_hello():
        await asyncio.sleep(1)
        return 'hello'
    
    
    @app.task(name="sync_task")
    def sync_task():
        async_to_sync(return_hello)()
    

我在FastAPI 应用程序中遇到的一个用例与上一个示例相反:

    CPU 密集型进程占用了异步端点。

    解决方案是将异步 CPU 绑定进程重构为 celery 任务,并从 Celery 队列中传递一个任务实例以执行。

    可视化该案例的最小示例:

    import asyncio
    import uvicorn
    
    from celery import Celery
    from fastapi import FastAPI
    
    app = FastAPI(title='Example')
    worker = Celery('worker', broker='a_broker_url_goes_here')
    
    @worker.task(name='cpu_boun')
    def cpu_bound_task():
        # Does stuff but let's simplify it
        print([n for n in range(1000)])
    
    @app.get('/calculate')
    async def calculate():
        cpu_bound_task.delay()
    
    if __name__ == "__main__":
        uvicorn.run('main:app', host='0.0.0.0', port=8000)
    

另一个解决方案似乎是 @juanra 和 @danius 在他们的答案中提出的,但我们必须记住,当我们混合同步和异步执行时,性能往往会受到影响,因此这些答案需要在之前进行监控我们可以决定在 prod 环境中使用它们。

最后,有一些现成的解决方案,我不能推荐(因为我自己没有使用过),但我会在这里列出它们:

Celery Pool AsyncIO 这似乎完全解决了 Celery 5.0 没有解决的问题,但请记住,它似乎有点实验性(今天 0.2.0 版 01/12/2021) aiotasks 声称是“一个类似于 Celery 的任务管理器,它分发 Asyncio 协程”,但似乎有点陈旧(大约 2 年前的最新提交)

嗯,它的年龄没有那么好,是吗? Celery 5.0 版没有实现异步兼容性,因此我们无法知道何时以及是否会实现......出于响应遗留原因(因为它是当时的答案)和评论继续,将其留在这里。

如官方网站所述,这将从 Celery 5.0 版中实现:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

    Celery 的下一个主要版本将仅支持 Python 3.5,我们计划在其中利用新的 asyncio 库。 放弃对 Python 2 的支持将使我们能够删除大量的兼容性代码,而使用 Python 3.5 使我们能够利用打字、异步/等待、异步和类似概念,在旧版本中没有其他选择。

以上内容来自上一个链接。

所以最好的办法就是等待5.0版发布!

与此同时,祝你编码愉快:)

【讨论】:

这没有发生,而且 celery 5 与 asyncio 不兼容。 @piro 我还没用过 celery 5,我会进一步调查!感谢更新 @piro 好吧,我做了我的研究并重构了这个答案,希望你能在那里找到有用的东西! 我打开了a feature request,他们回答“这是我们为 celery 6.0 计划的更大设计决策的一部分”。 在我们得到 Celery 的官方支持之前,我发现 polling the status of the AyncResult 提供了一个很好的解决方法。【参考方案4】:

我通过在 celery-pool-asyncio 库中结合 Celery 和 asyncio 解决了问题。

【讨论】:

这实际上似乎是一个很好的解决方案,唯一的问题是它不支持 celery 5。有任何时间表吗?【参考方案5】:

这种简单的方法对我来说效果很好:

import asyncio
from celery import Celery

app = Celery('tasks')

async def async_function(param1, param2):
    # more async stuff...
    pass

@app.task(name='tasks.task_name', queue='queue_name')
def task_name(param1, param2):
    asyncio.run(async_function(param1, param2))

【讨论】:

【参考方案6】:

我发现的最简洁的方法是将async 函数包装在asgiref.sync.async_to_sync 中(来自asgiref):

from asgiref.sync import async_to_sync
from celery.task import periodic_task


async def return_hello():
    await sleep(1)
    return 'hello'


@periodic_task(
    run_every=2,
    name='return_hello',
)
def task_return_hello():
    async_to_sync(return_hello)()

我从我写的blog post 中提取了这个例子。

【讨论】:

非常好,我在研究这个问题的过程中发现了你的文章,并将它包含在我的答案的编辑中(我现在当然要提到你,因为我发现了它)!感谢您的知识提升:) 谢谢!看到对我文章的引用总是很酷,即使它在同一个线程中。【参考方案7】:

您可以使用run_in_executor 将任何阻塞调用包装到任务中,如documentation 中所述,我还在示例中添加了自定义timeout:

def run_async_task(
    target,
    *args,
    timeout = 60,
    **keywords
) -> Future:
    loop = asyncio.get_event_loop()
    return asyncio.wait_for(
        loop.run_in_executor(
            executor,
            functools.partial(target, *args, **keywords)
        ),
        timeout=timeout,
        loop=loop
    )
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
    run_async_task, your_task.delay, some_arg, some_karg="" 
)
result = loop.run_until_complete(
    run_async_task, async_result.result 
)

【讨论】:

以上是关于如何将 Celery 与 asyncio 结合使用?的主要内容,如果未能解决你的问题,请参考以下文章

Django Channels 从 Celery 任务发送组消息。 Asyncio 事件循环在所有异步任务完成之前停止

flask 工厂模式与celery结合

flask 工厂模式与celery结合

如何将 asyncio 与 boost.python 一起使用?

Celery学习---Celery 与django结合实现计划任务功能

celery