异步 SQLAlchemy 无法创建引擎

Posted

技术标签:

【中文标题】异步 SQLAlchemy 无法创建引擎【英文标题】:async SQLAlchemy can't create engine 【发布时间】:2021-12-20 16:07:23 【问题描述】:

我制作了一个小应用程序,它使用SQLAlchemy 来处理与 postgresql 数据库的连接。现在我想用 asincio 重写它。由于某种原因,当我运行它时,我收到以下错误:

Traceback (most recent call last):
  File "D:\Space\discord_count_bot\bot\bot\main.py", line 12, in <module>
    dbConnection.init_connection(
  File "D:\Space\discord_count_bot\bot\bot\db_hanler.py", line 78, in init_connection
    engine = create_async_engine(connection_string, future=True, echo=True)
  File "D:\Space\discord_count_bot\bot_env\lib\site-packages\sqlalchemy\ext\asyncio\engine.py", line 40, in create_async_engine
    sync_engine = _create_engine(*arg, **kw)
  File "<string>", line 2, in create_engine
  File "D:\Space\discord_count_bot\bot_env\lib\site-packages\sqlalchemy\util\deprecations.py", line 298, in warned
    return fn(*args, **kwargs)
  File "D:\Space\discord_count_bot\bot_env\lib\site-packages\sqlalchemy\engine\create.py", line 560, in create_engine
    dbapi = dialect_cls.dbapi(**dbapi_args)
  File "D:\Space\discord_count_bot\bot_env\lib\site-packages\sqlalchemy\dialects\postgresql\psycopg2.py", line 782, in dbapi
    import psycopg2
ModuleNotFoundError: No module named 'psycopg2'

如果安装了psycopg2,我会得到

Traceback (most recent call last):
  File "D:\Space\discord_count_bot\bot\bot\main.py", line 12, in <module>
    dbConnection.init_connection(
  File "D:\Space\discord_count_bot\bot\bot\db_hanler.py", line 78, in init_connection
    engine = create_async_engine(connection_string, future=True, echo=True)
  File "D:\Space\discord_count_bot\bot_env\lib\site-packages\sqlalchemy\ext\asyncio\engine.py", line 41, in create_async_engine
    return AsyncEngine(sync_engine)
  File "D:\Space\discord_count_bot\bot_env\lib\site-packages\sqlalchemy\ext\asyncio\engine.py", line 598, in __init__
    raise exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: The asyncio extension requires an async driver to be used. The loaded 'psycopg2' is not async. 

我想我安装了asyncpg,我需要特别告诉SQLAlchemy 使用它。或者,我的代码中有一些东西让SQLAlchemy 认为,它应该使用psycopg2...我找不到任何关于它的东西,在我遇到的每个教程中,一切似乎都很好。


from datetime import datetime, timedelta
import logging

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, and_
from sqlalchemy import Column, Integer, String, DateTime, Boolean

logger = logging.getLogger('discord')
Base = declarative_base()


class TaskModel(Base):
    """Counting task model for database."""

    __tablename__ = 'tasks'

    id = Column(Integer, primary_key=True)
    author = Column(String(200))
    channel_id = Column(Integer)
    is_dm = Column(Boolean)
    start_time = Column(DateTime)
    end_time = Column(DateTime)
    count = Column(Integer)
    canceled = Column(Boolean)


class DBConnection:
    """Class handles all the db operations."""

    def __init__(self):
        """Create new uninitialized handler."""
        self._session: AsyncSession = None

    def init_connection(self, user, password, host, port, db):
        """Connect to actual database."""
        connection_string = "postgresql://:@:/".format(
            user, password, host, port, db
        )
        engine = create_async_engine(connection_string, future=True, echo=True)
        self._session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)

    async def add_task(self, author, channel_id, count, is_dm):
        """Add new task to db."""
        now = datetime.utcnow()
        task = TaskModel(
            author=author,
            channel_id=channel_id,
            is_dm=is_dm,
            start_time=now,
            end_time=now + timedelta(seconds=count),
            count=count,
            canceled=False
        )
        self._session.add(task)
        await self._session.commit()
        logger.info(f"task added to db: task")
        return task

    async def get_active_tasks(self):
        """Get all active tasks."""
        now = datetime.utcnow()
        async with self._session() as session:
            query = select(TaskModel).where(and_(
                    TaskModel.end_time > now,
                    TaskModel.canceled == False
            ))
            result = await session.execute(query)
            return result.fetchall()

dbConnection = DBConnection()

【问题讨论】:

试试postgresql+asyncpg://… pyscopg2 是 SQLAlchemy 用于 Postgres 的默认驱动程序。如前所述,如果您想使用其他任何内容,则必须在连接字符串中指定它。 一个例子可以看here。 戈德汤普森,你是我的救星,谢谢! 【参考方案1】:

正如 Gord Thompson 所说,我需要在连接字符串中更加具体, postgresql+asyncpg://… 成功了,谢谢)

【讨论】:

以上是关于异步 SQLAlchemy 无法创建引擎的主要内容,如果未能解决你的问题,请参考以下文章

SQLAlchemy创建表和删除表

SQLAlchemy bulk_insert_mappings():无法获取表“测试”的映射器

在 Flask-SQLAlchemy 模型上使用函数查询会给出 BaseQuery object is not callable 错误

为啥 SQLAlchemy 不创建串行列?

SQLAlchemy + Tornado:如何为SQLAlchemy的ScopedSession创建scopefunc?

Flask项目和文档集