如何在 FastAPI 应用程序中发送操作进度?
Posted
技术标签:
【中文标题】如何在 FastAPI 应用程序中发送操作进度?【英文标题】:How to send a progress of operation in a FastAPI app? 【发布时间】:2021-03-02 05:52:07 【问题描述】:我已经部署了一个 fastapi 端点,
from fastapi import FastAPI, UploadFile
from typing import List
app = FastAPI()
@app.post('/work/test')
async def testing(files: List(UploadFile)):
for i in files:
.......
# do a lot of operations on each file
# after than I am just writing that processed data into mysql database
# cur.execute(...)
# cur.commit()
.......
# just returning "OK" to confirm data is written into mysql
return "response" : "OK"
我可以从 API 端点请求输出,它对我来说工作正常。
现在,对我来说最大的挑战是要知道每次迭代需要多少时间。因为在 UI 部分(那些正在访问我的 API 端点的人)我想帮助他们为每个正在处理的迭代/文件显示一个进度条 (TIME TAKEN)。
我有什么可能的方法来实现它吗?如果是这样,请帮助我进一步了解如何进行?
谢谢。
【问题讨论】:
你可以访问 UI 代码吗? 实际上,我无权访问 UI 代码。我只是想提供一些他们可以访问的在幕后处理的每个文件的指示。只是一些基本的指示。没什么好复杂的。目前,他们只能在处理完所有文件后才能获得最终响应。所以没有办法提供一些指示。 可以提供单独的url来获取处理状态吗? 是的,完全没问题。我尝试使用 web-socket,但我无法弄清楚。 不需要网络套接字,你可以用其他方法解决这个问题,下面的答案是否回答了你的问题(如果没有我可以看看)? 【参考方案1】:方法
轮询
跟踪任务进度的首选方法是轮询:
-
在收到
request
以在后端启动任务后:
-
在存储中创建一个
task object
(例如在内存中,redis
等)。 task object
必须包含以下数据:task ID
、status
(待定、已完成)、result
等。
在后台运行任务(协程、线程、多处理、任务队列,如Celery
、arq
、aio-pika
、dramatiq
等)
通过返回之前收到的task ID
,立即回复202 (Accepted)
。
-
这可以来自任务本身,如果它知道任务存储并且可以访问它。任务本身会定期更新有关自身的信息。
或使用任务监视器(
Observer
、producer-consumer
模式),它将监视任务的状态及其结果。它还会更新存储中的信息。
client side
(front-end
) 上启动一个轮询周期,将任务状态发送到端点/task/ID/status
,从任务存储中获取信息。
流式响应
Streaming 是一种不太方便的定期获取请求处理状态的方法。当我们在不关闭连接的情况下逐渐推送响应时。它有许多明显的缺点,例如,如果连接断开,您可能会丢失信息。 Streaming Api 是 REST Api 之外的另一种方法。
网络套接字
您还可以使用websockets 进行实时通知和双向通信。
链接:
可在以下链接中找到进度条的轮询方法示例和django + celery
的更详细说明:
https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html
https://buildwithdjango.com/blog/post/celery-progress-bars/
我在这里提供了使用多处理在 FastAPI 中运行后台任务的简化示例:https://***.com/a/63171013/13782669
旧答案:
您可以在后台运行任务,返回其id
并提供前端将定期调用的/status
端点。在状态响应中,您可以返回您的任务现在的状态(例如,与当前处理的文件的编号挂起)。我提供了几个简单的例子here。
演示
轮询
使用 asyncio 任务的方法演示(单工作者解决方案):
import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
progress: int = 0
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = # Dict as job storage
async def long_task(queue: asyncio.Queue, param: int):
for i in range(1, param): # do work and return our progress
await asyncio.sleep(1)
await queue.put(i)
await queue.put(None)
async def start_new_task(uid: UUID, param: int) -> None:
queue = asyncio.Queue()
task = asyncio.create_task(long_task(queue, param))
while progress := await queue.get(): # monitor task progress
jobs[uid].progress = progress
jobs[uid].status = "complete"
@app.post("/new_task/param", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_new_task, new_task.uid, param)
return new_task
@app.get("/task/uid/status")
async def status_handler(uid: UUID):
return jobs[uid]
问题中循环的改编示例
后台处理函数定义为def
,FastAPI在线程池上运行。
import time
from http import HTTPStatus
from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
processed_files: List[str] = Field(default_factory=list)
app = FastAPI()
jobs: Dict[UUID, Job] =
def process_files(task_id: UUID, files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
jobs[task_id].processed_files.append(i.filename)
jobs[task_id].status = "completed"
@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(process_files, new_task.uid, files)
return new_task
@app.get("/work/uid/status")
async def status_handler(uid: UUID):
return jobs[uid]
流媒体
async def process_files_gen(files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
yield f"i.filename processed\n"
yield f"OK\n"
@app.post('/work/stream/test', status_code=HTTPStatus.ACCEPTED)
async def work(files: List[UploadFile] = File(...)):
return StreamingResponse(process_files_gen(files))
【讨论】:
如何将旧答案演示轮询示例扩展到多个工作人员和服务器? 解决我上面的评论:使用 uvicorn 和多个工作人员部署应用程序的一种解决方案是将 task_id 创建为 uuid4 和 pid 的字符串组合。 您可以使用共享存储,如数据库或内存存储,而不是工作本地字典。相关话题***.com/questions/65686318/…【参考方案2】:以下是使用 uniq 标识符和包含作业信息的全局可用字典的解决方案:
注意:在您使用动态键值(在使用中的示例 uuid)并将应用程序保持在单个进程中之前,以下代码可以安全使用。
-
要启动应用程序,请创建一个文件
main.py
运行uvicorn main:app --reload
通过访问http://127.0.0.1:8000/
创建工作条目
重复第 3 步以创建多个作业
转到http://127.0.0.1/status
页面查看页面状态。
转到http://127.0.0.1/status/identifier
,按作业 ID 查看作业进度。
应用代码:
from fastapi import FastAPI, UploadFile
import uuid
from typing import List
import asyncio
context = 'jobs':
app = FastAPI()
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
for file, file_number in enumerate(iter_over):
jobs = context['jobs']
job_info = jobs[job_key]
job_info['iteration'] = file_number
job_info['status'] = 'inprogress'
await asyncio.sleep(1)
pending_jobs[job_key]['status'] = 'done'
@app.post('/work/test')
async def testing(files: List[UploadFile]):
identifier = str(uuid.uuid4())
context[jobs][identifier] =
asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())
return "identifier": identifier
@app.get('/')
async def get_testing():
identifier = str(uuid.uuid4())
context['jobs'][identifier] =
asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())
return "identifier": identifier
@app.get('/status')
def status():
return
'all': list(context['jobs'].values()),
@app.get('/status/identifier')
async def status(identifier):
return
"status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
【讨论】:
这似乎适用于我的单身工人。谢谢!如果您有空闲时间,您能否指出如何在 do_work 函数中并行化 for 循环?我想使用类似 joblib 或类似的东西来并行化 for 循环。我已经问过问题here 你可以创建async def
来处理每次迭代的输入并将未来的对象存储在一个列表中,而不是在 for 循环之外调用 async io collect: docs.python.org/3/library/asyncio-task.html#asyncio.gather。您可以自己尝试,一旦我有时间,我可以提供一些样品;)
非常感谢。我对异步 io 操作不是很熟悉,但我会尽力而为,当你有时间时,请尝试在这里解决问题,***.com/questions/65132243/…
如果您有时间,请尝试帮助我。我一直在尝试找到一种方法但失败了,如果您熟悉一种方法,我可以使用该方法并行化 for 循环并能够跟踪迭代?这是赏金问题链接:***.com/questions/65132243/…
@user_12 当然,我会看看它:)以上是关于如何在 FastAPI 应用程序中发送操作进度?的主要内容,如果未能解决你的问题,请参考以下文章
如何从我的 FastAPI 应用程序向另一个站点 (API) 发送 HTTP 请求?
如何将 socket.io 挂载到 fastapi 应用程序并向所有连接的客户端发送广播
如何在 FastAPI RealWorld 示例应用中应用事务逻辑?