如何在 FastAPI 中进行持久数据库连接?
Posted
技术标签:
【中文标题】如何在 FastAPI 中进行持久数据库连接?【英文标题】:How to do persistent database connection in FastAPI? 【发布时间】:2020-11-25 22:13:27 【问题描述】:我正在用 FastAPI 编写我的第一个项目,但我有点挣扎。特别是,我不确定我应该如何在我的应用程序中使用 asyncpg 连接池。目前我的情况是这样的
在 db.py 我有
pgpool = None
async def get_pool():
global pgpool
if not pgpool:
pgpool = await asyncpg.create_pool(dsn='MYDB_DSN')
return pgpool
然后在单个文件中我使用 get_pool 作为依赖项。
@router.post("/user/", response_model=models.User, status_code=201)
async def create_user(user: models.UserCreate, pgpool = Depends(get_pool)):
# ... do things ...
首先,我拥有的每个端点都使用数据库,因此为每个函数添加依赖参数似乎很愚蠢。其次,这似乎是一种迂回的做事方式。我定义一个全局,然后定义一个返回该全局的函数,然后注入该函数。我相信还有更自然的方法。
我看到人们建议将我需要的任何内容作为属性添加到应用对象
@app.on_event("startup")
async def startup():
app.pool = await asyncpg.create_pool(dsn='MYDB_DSN')
但是当我有多个带有路由器的文件时它不起作用,我不知道如何从路由器对象访问应用程序对象。
我错过了什么?
【问题讨论】:
【参考方案1】:您可以使用应用程序工厂模式来设置您的应用程序。
为避免使用全局或直接向应用对象添加内容,您可以创建自己的类数据库来保存连接池。
要将连接池传递给每个路由,您可以使用中间件并将池添加到request.state
示例代码如下:
import asyncio
import asyncpg
from fastapi import FastAPI, Request
class Database():
async def create_pool(self):
self.pool = await asyncpg.create_pool(dsn='MYDB_DSN')
def create_app():
app = FastAPI()
db = Database()
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
request.state.pgpool = db.pool
response = await call_next(request)
return response
@app.on_event("startup")
async def startup():
await db.create_pool()
@app.on_event("shutdown")
async def shutdown():
# cleanup
pass
@app.get("/")
async def hello(request: Request):
print(request.state.pool)
return app
app = create_app()
【讨论】:
好吧,在这种情况下,我将不得不在每条路线上添加request: Request
,而不是pool=Depends(get_pool)
,似乎并没有节省多少麻烦。
啊...好吧,您可以在模块中创建池(例如 database.py)并直接从那里导入。
FastAPI 不支持基于类的视图,但这是我在 GitHub gist.github.com/dmontagu/87e9d3d7795b14b63388d4b16054f0ff 上找到的一个小技巧解决方案
FWIW 维护者提出了类似的建议——github.com/tiangolo/fastapi/issues/1800
抱歉,请问你们为什么直接使用asyncpg
而不是使用文档中的Databases
包装器。是因为它让你对底层连接池有更多的控制权吗?【参考方案2】:
我的做法是在 db.py 中。
class Database:
def __init__(self,user,password,host,database,port="5432"):
self.user = user
self.password = password
self.host = host
self.port = port
self.database = database
self._cursor = None
self._connection_pool = None
self.con = None
async def connect(self):
if not self._connection_pool:
try:
self._connection_pool = await asyncpg.create_pool(
min_size=1,
max_size=20,
command_timeout=60,
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
ssl="require"
)
logger.info("Database pool connectionn opened")
except Exception as e:
logger.exception(e)
async def fetch_rows(self, query: str,*args):
if not self._connection_pool:
await self.connect()
else:
con = await self._connection_pool.acquire()
try:
result = await con.fetch(query,*args)
return result
except Exception as e:
logger.exception(e)
finally:
await self._connection_pool.release(con)
async def close(self):
if not self._connection_pool:
try:
await self._connection_pool.close()
logger.info("Database pool connection closed")
except Exception as e:
logger.exception(e)
然后在应用中
@app.on_event("startup")
async def startup_event():
database_instance = db.Database(**db_arguments)
await database_instance.connect()
app.state.db = database_instance
logger.info("Server Startup")
@app.on_event("shutdown")
async def shutdown_event():
if not app.state.db:
await app.state.db.close()
logger.info("Server Shutdown")
然后你可以通过在路由中传入请求参数来获取带有request.app.state.db的db实例。
【讨论】:
如果您阅读了我的问题,我已经尝试过这种方法。似乎无法从 APIRouter() 路由访问主应用程序对象。以上是关于如何在 FastAPI 中进行持久数据库连接?的主要内容,如果未能解决你的问题,请参考以下文章