如何在 FastAPI 中进行多处理

Posted

技术标签:

【中文标题】如何在 FastAPI 中进行多处理【英文标题】:How to do multiprocessing in FastAPI 【发布时间】:2020-11-20 00:05:19 【问题描述】:

在处理 FastAPI 请求时,我需要对列表的每个元素执行 CPU 密集型任务。我想在多个 CPU 内核上进行此处理。

在 FastAPI 中执行此操作的正确方法是什么?我可以使用标准的multiprocessing 模块吗?到目前为止,我发现的所有教程/问题都只涉及 I/O 绑定任务,例如 Web 请求。

【问题讨论】:

【参考方案1】:

async def端点

您可以使用 loop.run_in_executor 和 ProcessPoolExecutor 在单独的进程中启动函数。

@app.post("/async-endpoint")
async def test_endpoint():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_func)  # wait result

def端点

由于def 端点是run implicitly 在一个单独的线程中,您可以使用模块multiprocessing 和concurrent.futures 的全部功能。请注意,在def 函数内部,await 可能无法使用。样品:

@app.post("/def-endpoint")
def test_endpoint():
    ...
    with multiprocessing.Pool(3) as p:
        result = p.map(f, [1, 2, 3])
@app.post("/def-endpoint/")
def test_endpoint():
    ...
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
      results = executor.map(f, [1, 2, 3])

注意应该记住,在端点中创建进程池以及创建大量线程会导致响应速度变慢,因为请求增加。


即时执行

在单独的进程中执行函数并立即等待结果的最简单和最原生的方法是使用loop.run_in_executor 和ProcessPoolExecutor。

一个池,如下例所示,可以在应用程序启动时创建,并且不要忘记在应用程序退出时关闭。可以使用 max_workers ProcessPoolExecutor 构造函数参数设置池中使用的进程数。如果max_workersNone 或未给出,则默认为机器上的处理器数。

这种方法的缺点是请求处理程序(路径操作)在单独的进程中等待计算完成,而客户端连接保持打开状态。而如果由于某种原因失去了连接,那么结果将无处返回。

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from fastapi import FastAPI

from calc import cpu_bound_func

app = FastAPI()


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


@app.get("/param")
async def handler(param: int):
    res = await run_in_process(cpu_bound_func, param)
    return "result": res


@app.on_event("startup")
async def on_startup():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

移至背景

通常,CPU 密集型任务在后台执行。 FastAPI 提供运行background tasks 的功能,以便在返回响应后运行,您可以在其中启动并异步等待 CPU 绑定任务的结果。

在这种情况下,例如,您可以立即返回"Accepted"(HTTP代码202)的响应和唯一的任务ID,在后台继续计算,客户端可以稍后请求任务的状态使用这个ID

BackgroundTasks 提供了一些特性,特别是,您可以运行其中的几个(包括在依赖项中)。在它们中,您可以使用在依赖项中获得的资源,这些资源只有在所有任务完成后才会被清理,而在出现异常时可以正确处理它们。这在diagram 中可以看得更清楚。

以下是执行最小任务跟踪的示例。假定应用程序的一个实例正在运行。

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus

from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field

from calc import cpu_bound_func


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = 


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


async def start_cpu_bound_task(uid: UUID, param: int) -> None:
    jobs[uid].result = await run_in_process(cpu_bound_func, param)
    jobs[uid].status = "complete"


@app.post("/new_cpu_bound_task/param", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
    return new_task


@app.get("/status/uid")
async def status_handler(uid: UUID):
    return jobs[uid]


@app.on_event("startup")
async def startup_event():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()

更强大的解决方案

上面所有的例子都很简单,但是如果你需要一些更强大的系统来进行繁重的分布式计算,那么你可以看看消息代理RabbitMQKafkaNATS 等等。以及使用它们的库像芹菜。

【讨论】:

但是这样我就无法访问 cpu_bound_func 的返回结果了,对吧? 在后台执行的情况下是,但我修改了返回示例的答案。 就我而言,我想在cpu_bound_func 内更新一个全局dict,但使用上面的代码无法正常工作。因此,我直接在start_cpu_bound_task 内部运行了该函数(没有awaitasync)并且它可以工作。我的解决方案有什么缺点吗? 在异步协程的上下文中启动 cpu 绑定函数不是一个好主意。最可取的是使用一些进程间通信(或缓存、数据库)从工作进程向 Web 服务器提供状态更新。上面的例子只是一个非常简单的例子。

以上是关于如何在 FastAPI 中进行多处理的主要内容,如果未能解决你的问题,请参考以下文章

如何/我可以在条带 webhook 处理程序中调用 FASTAPI 异步函数,还是我的方法完全错误?

FastAPI 的应用挂载是如何工作的?

如何在 FastAPI 中使用 Pydantic 模型和表单数据?

使用 FastAPI 在基于 Python 的 GraphQL 服务器中进行身份验证

如何在 FastAPI 中启用 CORS?

19.FastAPI中间件