使用异步服务器的长时间运行任务
Posted
技术标签:
【中文标题】使用异步服务器的长时间运行任务【英文标题】:Long-running tasks with async server 【发布时间】:2016-05-23 04:53:58 【问题描述】:我想每个人都知道如何处理 django 中长时间运行的任务:使用 celery 并放松。但是如果我想通过 aiohttp(或 tornado)获得 websockets 的好处呢?
假设我有一个 CPU 密集型任务,可能需要几秒钟到多 (5-10) 分钟。在 websocket 循环中处理此任务并通知用户进度似乎是个好主意。无 ajax 请求,短任务响应速度非常快。
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
answer_to_the_ultimate_question_of_life_the_universe_and_everything =\
long_running_task(msg.data, NotificationHelper(ws))
ws.send_str(json.dumps(
'action': 'got-answer',
'data': answer_to_the_ultimate_question_of_life_the_universe_and_everything,
))
return ws
但另一方面,据我所知,CPU 密集型任务以这种方式服务会阻塞整个线程。如果我有 10 个工人和 11 个客户想要使用应用程序,那么在第一个客户的任务完成之前,不会为第 11 个客户提供服务。
也许,我应该在 celery 中运行看起来很大的任务,在主循环中运行看起来很小的任务?
那么,我的问题是:有没有什么好的设计模式可以用异步服务器来处理长时间运行的任务?
谢谢!
【问题讨论】:
asyncio
不会帮助您处理 CPU 密集型任务。
@dirn 仅适用于 IO 绑定任务和与服务器的不同类型交互吗?
好吧,asyncio 可以提供帮助,如果任务托管在单独的线程中(或带有收益的绿色线程或子进程,则真正变态)会更加生动。 Obv.,有一些问题 - 什么是 ws 连接死了?会不会有数据竞赛? “真正的请求多于资源”——503/queue/block/error 呢?
如果函数是picklable,使用asyncio.run_in_executor
怎么样?
@shongololo 我想将 ws 对象放入函数中,如示例所示。所以,看起来它是不可挑选的,是吗?
【参考方案1】:
只需通过loop.run_in_executor()
运行您长时间运行的CPU 密集型任务,并通过loop.call_soon_threadsafe()
发送进度通知。
如果您的工作不是 CPU 而是 IO 绑定(例如发送电子邮件),您可以通过loop.create_task()
调用创建一个新任务。它看起来像产生新线程。
如果您不能使用即发即弃的方法,则需要使用像 RabbitMQ 这样的持久消息代理(有 https://github.com/benjamin-hodgson/asynqp 库用于以异步方式与 Rabbit 通信)。
【讨论】:
以上是关于使用异步服务器的长时间运行任务的主要内容,如果未能解决你的问题,请参考以下文章
对同步适配器、服务、加载程序、提供程序和异步任务感到困惑? [关闭]