如何使用 Uvicorn 和 asyncio.create_task() 将任务置于后台?

Posted

技术标签:

【中文标题】如何使用 Uvicorn 和 asyncio.create_task() 将任务置于后台?【英文标题】:How to use Uvicorn with asyncio.create_task() to put task in background? 【发布时间】:2021-05-31 06:24:16 【问题描述】:

假设我有一个由 Uvicorn server 驱动的 Web 应用程序,该应用程序实现了 GraphQL API,其中包含在服务器端启动长计算的突变和检查服务器状态的查询端点。假设我们想知道有多少任务在后台运行。 我有一个简化的代码,它不起作用:

import asyncio
import logging
import time
import ariadne
import ariadne.asgi
import uvicorn
import starlette as sl
import starlette.applications

query_t = ariadne.QueryType()
mutation_t = ariadne.MutationType()

FIFO = []

async def long_task():
    print('Starting long task...')
    global FIFO
    FIFO = [1, *FIFO]
    # mock long calc with 5sec sleep
    time.sleep(5)
    FIFO.pop()
    print('Long task finished!')


@mutation_t.field('longTask')
async def resolve_long_task(_parent, info):
    print('Start to resolve long task...')
    asyncio.create_task(long_task())
    print('Resolve finished!')
    return 

@query_t.field('ping')
async def resolve_ping(_parent, info):
    return f'FIFO has len(FIFO) elements'


def main():
    schema_str = ariadne.gql('''
    type Mutation
        longTask: longTaskResponse
    
    type longTaskResponse 
        message: String
    
    type Query 
        ping: String
    
    ''')
    schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
    gql_app = ariadne.asgi.GraphQL(schema)
    app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
    uvicorn.run(app,
                host='0.0.0.0',
                port=9002,
                log_level='error')


if __name__ == '__main__':
    main()

运行后

$ python main.py

我在第一个标签的GraphQL GUI 中发送了一个突变:

mutation longTaskQueue
  longTask 
    message
  

在第二个选项卡中,我尝试检索 FIFO 的长度:

query ping 
  ping

似乎可以运行 2 个long_task,但ping 正在等待所有long_task 完成。我的一般问题是如何在后台运行繁重的代码并且不阻塞 GQL API?

【问题讨论】:

使用异步时,您需要使用asyncio.sleep 而不是time.sleep,否则会阻塞整个过程 我如何才能真正将long_task() 放入另一个进程或不时使其进入睡眠状态,以便 API 可供客户端使用(尽管他开始了一项很长的任务)? 【参考方案1】:

经过多次尝试,现在我可以将许多任务放在后台并跟踪它们的数量(API 不会在一项长任务上冻结)。正在发生的事情是 阻塞计算在池中运行:

import asyncio
import logging
import time
import ariadne
import ariadne.asgi
import uvicorn
import starlette as sl
import starlette.applications
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

query_t = ariadne.QueryType()
mutation_t = ariadne.MutationType()

FIFO = []

async def long_task():
    print('Starting long task...')
    global FIFO
    FIFO = [1, *FIFO]
    # mock long calc with 5sec sleep
    time.sleep(5)
    FIFO.pop()
    print('Long task finished!')


def run(corofn, *args):
    loop = asyncio.new_event_loop()
    try:
        coro = corofn(*args)
        asyncio.set_event_loop(loop)
        return loop.run_until_complete(coro)
    finally:
        loop.close()


@mutation_t.field('longTask')
async def resolve_long_task(_parent, info):
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(max_workers=5)
    loop.set_default_executor(ProcessPoolExecutor())
    print('Start to resolve long task...')
    loop.run_in_executor(executor, run, long_task)
    print('Resolve finished!')
    return 

@query_t.field('ping')
async def resolve_ping(_parent, info):
    return f'FIFO has len(FIFO) elements'


def main():
    schema_str = ariadne.gql('''
    type Mutation
        longTask: longTaskResponse
    
    type longTaskResponse 
        message: String
    
    type Query 
        ping: String
    
    ''')
    schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
    gql_app = ariadne.asgi.GraphQL(schema)
    app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
    uvicorn.run(app,
                host='0.0.0.0',
                port=9002,
                log_level='error')


if __name__ == '__main__':
    main()

受this answer启发的解决方案。

【讨论】:

以上是关于如何使用 Uvicorn 和 asyncio.create_task() 将任务置于后台?的主要内容,如果未能解决你的问题,请参考以下文章

FastAPI环境部署

指定主机时 FastAPI/uvicorn 不起作用

Uvicorn 服务器意外关闭

uvicorn 抑制了 gelf 驱动程序的 python 系统日志

有没有办法干净地杀死uvicorn?

Fastapi python代码执行速度受uvicorn vs gunicorn部署的影响