fastapi异步web框架入门
Posted 三叶草body
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了fastapi异步web框架入门相关的知识,希望对你有一定的参考价值。
1 如何快速实现一个fastapi
1.1 准备工作
使用Python来写后端,基本上使用的是Django和Flask。fastapi不仅仅高效率而且还很适合产品级的开发。
需要安装一下fastapi以及ASGI(ASGI是WSGI的升级版,支持异步调用)
$ pip install fastapi
$ pip install pydantic
$ pip install uvicorn
$ pip install python-multipart
1.2 实现一个demo
- 创建
main.py
, 然后写入一下代码,可以直接运行
# -*- coding: utf-8 -*-
from fastapi import FastAPI
import uvicorn
# from web_server.views import router
app = FastAPI()
@app.get("/", tags=[\'首页API\'], description=\'fastapi后台测试服务首页API\')
def index():
return \'message\': \'脚本后台服务已启动。\', \'docs_url\': \'http://127.0.0.1:8000/docs\'
@app.get("/items/item_id")
def read_item(item_id: int, keyword: str):
"""
测试api
:param item_id: url路径参数
:param keyword: url查询字符串参数
:return:
"""
return "item_id": item_id, "keyword": keyword
# 添加子路由扩展API
# app.include_router(router, prefix=\'/exec_script\')
if __name__ == \'__main__\':
print(\'已启动服务\')
# main指的是当前py文件名,app指的是定义的FastAPI()实例
uvicorn.run(app=\'main:app\', host=\'127.0.0.1\', port=8000, reload=True, debug=True)
- 因为安装了
uvicorn
库, 还可以通过终端命令运行:
$ uvicorn main:app --reload --port 8000
$ curl http://127.0.0.1:8000
$ curl http://127.0.0.1:8000/items/1?keyword=book
- 响应的结果数据如下:
"item_id": 1,
"keyword": "book"
2 fastapi后台异步任务
2.1 后台任务的使用
-
创建一个异步任务函数
from fastapi import BackgroundTasks, FastAPI app = FastAPI() def write_notification(email: str, message=""): with open("log.txt", mode="w") as f: f.write(f"notification for email:message")
-
调用后台任务
@app.post("/send-notification/email") async def send_notification(email: str, background_tasks: BackgroundTasks): background_tasks.add_task(write_notification, email, message="notification...") return "message": "Notification sent in the background"
-
add_task()接收的参数:
-
一个在后台运行的任务函数(write_notification)
-
按照顺寻传递的一系列参数(email)
-
任何的关键字参数(message="notification...")
-
-
2.2 依赖注入
from fastapi import Depends, FastAPI
app = FastAPI()
async def common_parameters(q: str = None, skip: int = 0, limit: int = 100):
return "q": q, "skip": skip, "limit": limit
@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
return commons
@app.get("/users/")
async def read_users(commons: dict = Depends(common_parameters)):
return commons
以上是关于fastapi异步web框架入门的主要内容,如果未能解决你的问题,请参考以下文章