如何将 Celery 与 asyncio 结合使用?
Posted
技术标签:
【中文标题】如何将 Celery 与 asyncio 结合使用?【英文标题】:How to combine Celery with asyncio? 【发布时间】:2017-02-10 11:07:20 【问题描述】:如何创建一个使 celery 任务看起来像 asyncio.Task
的包装器?或者有没有更好的方法将 Celery 与 asyncio
集成?
@asksol,Celery 的创造者,said this::
将 Celery 用作异步 I/O 框架之上的分布式层是很常见的(重要提示:将 CPU 绑定任务路由到 prefork worker 意味着它们不会阻塞您的事件循环)。
但我找不到任何专门针对 asyncio
框架的代码示例。
【问题讨论】:
你能澄清一下“看起来”是什么意思吗?我想您可能误解了 Asksol 的评论——您将 celery 放在了作为异步管理器的 Rabbit 或 SQS 等框架的前面。因此,您可以为使用 asyncio 的 celery 创建一个代理/插件,但任务不会“看起来像”(即具有 asyncio 的接口)? celery 的重点是抽象使用的异步方法? 【参考方案1】:这是我在必要时处理异步协程的 Celery 实现:
包装 Celery 类以扩展其功能:
from celery import Celery
from inspect import isawaitable
import asyncio
class AsyncCelery(Celery):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.patch_task()
if 'app' in kwargs:
self.init_app(kwargs['app'])
def patch_task(self):
TaskBase = self.Task
class ContextTask(TaskBase):
abstract = True
async def _run(self, *args, **kwargs):
result = TaskBase.__call__(self, *args, **kwargs)
if isawaitable(result):
await result
def __call__(self, *args, **kwargs):
asyncio.run(self._run(*args, **kwargs))
self.Task = ContextTask
def init_app(self, app):
self.app = app
conf =
for key in app.config.keys():
if key[0:7] == 'CELERY_':
conf[key[7:].lower()] = app.config[key]
if 'broker_transport_options' not in conf and conf.get('broker_url', '')[0:4] == 'sqs:':
conf['broker_transport_options'] = 'region': 'eu-west-1'
self.config_from_object(conf)
celery = AsyncCelery()
【讨论】:
【参考方案2】:这是一个简单的助手,您可以使用它来让 Celery 任务等待:
import asyncio
from asgiref.sync import sync_to_async
# Converts a Celery tasks to an async function
def task_to_async(task):
async def wrapper(*args, **kwargs):
delay = 0.1
async_result = await sync_to_async(task.delay)(*args, **kwargs)
while not async_result.ready():
await asyncio.sleep(delay)
delay = min(delay * 1.5, 2) # exponential backoff, max 2 seconds
return async_result.get()
return wrapper
和sync_to_async
一样,可以直接作为包装器使用:
@shared_task
def get_answer():
sleep(10) # simulate long computation
return 42
result = await task_to_async(get_answer)()
...作为装饰者:
@task_to_async
@shared_task
def get_answer():
sleep(10) # simulate long computation
return 42
result = await get_answer()
当然,这不是一个完美的解决方案,因为它依赖于polling。 但是,在 Celery officially provides a better solution 之前从 Django 异步视图调用 Celery 任务应该是一个很好的解决方法。
编辑 2021/03/02:添加对 sync_to_async
的调用以支持 eager mode。
【讨论】:
这是一个可靠的解决方法,我们已经在我的回答中提到的 FastAPI 应用程序中使用了它(虽然不是作为装饰器):) 请记住,您需要注意错误处理并有一个计划如何处理任何潜在的异常!task_to_async
调用 AsyncResult.get()
,这会重新引发任务引发的任何异常。当然,如果你想自定义这个行为,你可以在task_to_async
添加参数,然后转发给async_result.get()
。
将任务包装在异步助手中的意义何在?你不能只用睡眠来实现循环,没有它吗? Afaik task.delay 是非阻塞的。只有类似 task.get 的东西会阻塞。【参考方案3】:
编辑:2021 年 1 月 12 日以前的答案(在底部找到)没有很好地老化,因此我添加了可能的解决方案组合,这些解决方案可能会满足那些仍然在寻找如何共同使用 asyncio 和芹菜
让我们先快速分解用例(更深入的分析在这里:asyncio and coroutines vs task queues):
如果任务受 I/O 限制,则使用协程和异步会更好。 如果任务受 CPU 限制,则最好使用 Celery 或其他类似的任务管理系统。因此,在 Python 的“做一件事并把它做好”的上下文中,不要尝试将 asyncio 和 celery 混合在一起是有意义的。
但是,如果我们希望能够以异步方式和异步任务的形式运行方法,会发生什么情况?那么我们有一些选择可以考虑:
我能找到的最好的例子如下:https://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/(我刚刚发现它是@Franey's response):
定义你的异步方法。
使用asgiref
的sync.async_to_sync
模块封装异步方法并在一个celery任务中同步运行:
# tasks.py
import asyncio
from asgiref.sync import async_to_sync
from celery import Celery
app = Celery('async_test', broker='a_broker_url_goes_here')
async def return_hello():
await asyncio.sleep(1)
return 'hello'
@app.task(name="sync_task")
def sync_task():
async_to_sync(return_hello)()
我在FastAPI 应用程序中遇到的一个用例与上一个示例相反:
CPU 密集型进程占用了异步端点。
解决方案是将异步 CPU 绑定进程重构为 celery 任务,并从 Celery 队列中传递一个任务实例以执行。
可视化该案例的最小示例:
import asyncio
import uvicorn
from celery import Celery
from fastapi import FastAPI
app = FastAPI(title='Example')
worker = Celery('worker', broker='a_broker_url_goes_here')
@worker.task(name='cpu_boun')
def cpu_bound_task():
# Does stuff but let's simplify it
print([n for n in range(1000)])
@app.get('/calculate')
async def calculate():
cpu_bound_task.delay()
if __name__ == "__main__":
uvicorn.run('main:app', host='0.0.0.0', port=8000)
另一个解决方案似乎是 @juanra 和 @danius 在他们的答案中提出的,但我们必须记住,当我们混合同步和异步执行时,性能往往会受到影响,因此这些答案需要在之前进行监控我们可以决定在 prod 环境中使用它们。
最后,有一些现成的解决方案,我不能推荐(因为我自己没有使用过),但我会在这里列出它们:
Celery Pool AsyncIO 这似乎完全解决了 Celery 5.0 没有解决的问题,但请记住,它似乎有点实验性(今天 0.2.0 版 01/12/2021) aiotasks 声称是“一个类似于 Celery 的任务管理器,它分发 Asyncio 协程”,但似乎有点陈旧(大约 2 年前的最新提交)嗯,它的年龄没有那么好,是吗? Celery 5.0 版没有实现异步兼容性,因此我们无法知道何时以及是否会实现......出于响应遗留原因(因为它是当时的答案)和评论继续,将其留在这里。
如官方网站所述,这将从 Celery 5.0 版中实现:
http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface
-
Celery 的下一个主要版本将仅支持 Python 3.5,我们计划在其中利用新的 asyncio 库。
放弃对 Python 2 的支持将使我们能够删除大量的兼容性代码,而使用 Python 3.5 使我们能够利用打字、异步/等待、异步和类似概念,在旧版本中没有其他选择。
以上内容来自上一个链接。
所以最好的办法就是等待5.0版发布!
与此同时,祝你编码愉快:)
【讨论】:
这没有发生,而且 celery 5 与 asyncio 不兼容。 @piro 我还没用过 celery 5,我会进一步调查!感谢更新 @piro 好吧,我做了我的研究并重构了这个答案,希望你能在那里找到有用的东西! 我打开了a feature request,他们回答“这是我们为 celery 6.0 计划的更大设计决策的一部分”。 在我们得到 Celery 的官方支持之前,我发现 polling the status of the AyncResult 提供了一个很好的解决方法。【参考方案4】:我通过在 celery-pool-asyncio 库中结合 Celery 和 asyncio 解决了问题。
【讨论】:
这实际上似乎是一个很好的解决方案,唯一的问题是它不支持 celery 5。有任何时间表吗?【参考方案5】:这种简单的方法对我来说效果很好:
import asyncio
from celery import Celery
app = Celery('tasks')
async def async_function(param1, param2):
# more async stuff...
pass
@app.task(name='tasks.task_name', queue='queue_name')
def task_name(param1, param2):
asyncio.run(async_function(param1, param2))
【讨论】:
【参考方案6】:我发现的最简洁的方法是将async
函数包装在asgiref.sync.async_to_sync
中(来自asgiref
):
from asgiref.sync import async_to_sync
from celery.task import periodic_task
async def return_hello():
await sleep(1)
return 'hello'
@periodic_task(
run_every=2,
name='return_hello',
)
def task_return_hello():
async_to_sync(return_hello)()
我从我写的blog post 中提取了这个例子。
【讨论】:
非常好,我在研究这个问题的过程中发现了你的文章,并将它包含在我的答案的编辑中(我现在当然要提到你,因为我发现了它)!感谢您的知识提升:) 谢谢!看到对我文章的引用总是很酷,即使它在同一个线程中。【参考方案7】:您可以使用run_in_executor
将任何阻塞调用包装到任务中,如documentation 中所述,我还在示例中添加了自定义timeout:
def run_async_task(
target,
*args,
timeout = 60,
**keywords
) -> Future:
loop = asyncio.get_event_loop()
return asyncio.wait_for(
loop.run_in_executor(
executor,
functools.partial(target, *args, **keywords)
),
timeout=timeout,
loop=loop
)
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
run_async_task, your_task.delay, some_arg, some_karg=""
)
result = loop.run_until_complete(
run_async_task, async_result.result
)
【讨论】:
以上是关于如何将 Celery 与 asyncio 结合使用?的主要内容,如果未能解决你的问题,请参考以下文章
Django Channels 从 Celery 任务发送组消息。 Asyncio 事件循环在所有异步任务完成之前停止
如何将 asyncio 与 boost.python 一起使用?