尝试使用 SQLAlchemy 捕获完整性错误

Posted

技术标签:

【中文标题】尝试使用 SQLAlchemy 捕获完整性错误【英文标题】:Trying to catch integrity error with SQLAlchemy 【发布时间】:2012-07-04 01:31:16 【问题描述】:

我在尝试捕获错误时遇到问题。我正在使用 Pyramid/SQLAlchemy 并使用电子邮件作为主键制作了一个注册表单。问题是当输入重复的电子邮件时,它会引发 IntegrityError,因此我试图捕捉该错误并提供一条消息,但无论我做什么我都无法捕捉它,错误不断出现。

try:
    new_user = Users(email, firstname, lastname, password)
    DBSession.add(new_user)
    return HTTPFound(location = request.route_url('new'))
except IntegrityError:
    message1 = "Yikes! Your email already exists in our system. Did you forget your password?"

当我尝试except exc.SQLAlchemyError 时,我得到了相同的消息(尽管我想捕获特定错误而不是一揽子捕获所有错误)。我也尝试了exc.IntegrityError,但没有运气(尽管它存在于 API 中)。

我的 Python 语法有什么问题吗,或者我需要在 SQLAlchemy 中做些什么来捕捉它?


我不知道如何解决这个问题,但我对可能导致问题的原因有一些想法。也许 try 语句并没有失败而是成功了,因为 SQLAlchemy 本身正在引发异常并且 Pyramid 正在生成视图,因此 except IntegrityError: 永远不会被激活。或者,更有可能的是,我完全错误地理解了这个错误。

【问题讨论】:

你从哪里导入IntegrityError?你确定这是正确的吗?你能告诉我们回溯吗? @FogleBird 我不是最擅长捕捉错误,所以我想如果它引发 IntegrityError,那么我可以在不导入任何内容的情况下捕捉它(我认为它只是一个名称)。我确实尝试了我从 SQLAlchemy 导入的答案中的两个命令。我不知道如何显示引用,这是一个很长的网页,基本上说电子邮件地址不是唯一的。 【参考方案1】:

在 Pyramid 中,如果您已将会话(脚手架自动为您执行)配置为使用 ZopeTransactionExtension,则在视图执行之前不会刷新/提交会话。如果您想自己在视图中捕获任何 SQL 错误,则需要强制 flush 将 SQL 发送到引擎。 DBSession.flush() 应该在 add(...) 之后执行。

更新

我用一个保存点的例子来更新这个答案,只是因为很少有关于如何用事务包做到这一点的例子。

def create_unique_object(db, max_attempts=3):
    while True:
        sp = transaction.savepoint()
        try:
            obj = MyObject()
            obj.identifier = uuid.uuid4().hex
            db.add(obj)
            db.flush()
        except IntegrityError:
            sp.rollback()
            max_attempts -= 1
            if max_attempts < 1:
                raise
        else:
            return obj

obj = create_unique_object(DBSession)

请注意,如果不使用表级锁定,即使这也容易在事务之间重复,但它至少显示了如何使用保存点。

【讨论】:

aahhh 做到了!在视图完成之前,我没有意识到它没有提交。非常感谢迈克尔! 这是一个愚蠢的解决方案。什么,完全禁用交易? O_o 什么?这不是禁用事务。您在 SQLAlchemy 中所做的大部分操作都在内存中,直到您刷新更改(在数据库上执行 SQL 命令)。如果这些更改失败,则事务将回滚,如果成功,则在提交事务时它们可能会被持久化。通常回滚/提交由 pyramid_tm 处理,但您也可以选择使用保存点并在本地处理它。【参考方案2】:

你需要做的是捕获一个一般异常并输出它的类;那么您可以使异常更具体。

except Exception as ex:
    print ex.__class__

【讨论】:

为什么?他正在尝试捕获特定异常,我不明白您为什么建议捕获一般异常? 正确,我想捕获特定异常,以便为用户提供更好的错误上下文。您是否建议我可以所有错误或捕获所有错误然后从中获取特定错误?抱歉,我不熟悉错误(我的代码很少有错误 :-) 开玩笑 不,我是说他可以使用它来实际确定异常的类别——然后捕获正确的类别。 这个解决方案对我有用。我的日志告诉我错误是cx_Oracle.IntegrityError,但使用这种方法我发现它实际上是我需要捕获的sqlalchemy.exc.IntegrityError【参考方案3】:

DBSession.commit() 之前可能没有数据库操作,因此在具有try/except 的控制器代码已经返回之后,IntegrityError 在堆栈中稍后被引发。

【讨论】:

如果我删除 DBSession.add(new_user) 那么它不会引发异常。没有DBSession.commit(),你指的是sqlaclhemey里面的代码吗? @Lostsoul:在DBSession.add() 之后在try/except 内调用DBSession.commit()。我指的是将金字塔/sqlalchemy 粘合在一起的代码。如果你的控制器没有引发异常,它通常会调用.commit(),如果是,它通常会调用.rollback()【参考方案4】:

编辑:上面编辑的答案是使用回滚的更好方法。

--

如果您想在金字塔应用程序的中间处理事务或在序列结束时执行自动事务提交的事情,那么不需要发生任何魔法。

如果上一个交易失败,请记住启动一个新交易

像这样:

def my_view(request):
   ... # Do things
   if success:
     try:
       instance = self._instance(**data)
       DBSession.add(instance)
       transaction.commit()
       return 'success': True
     except IntegrityError as e:  # <--- Oh no! Duplicate unique key
       transaction.abort()
       transaction.begin() # <--- Start new transaction
       return 'success': False

请注意,在成功的事务上调用 .commit() 是可以的,因此无需在成功调用后启动新事务。

如果事务处于失败状态,您只需要中止事务并开始一个新事务。

(如果事务不是这样的 poc,您可以使用保存点并回滚到保存点而不是开始新事务;遗憾的是,这是不可能的,因为尝试提交会使已知的先前保存点无效。好东西嗯?)(编辑:

【讨论】:

我想你对中兴通讯如何与事务包、pyramid_tm、SQLAlchemy 交互感到很困惑。我不怪你,它没有很好的记录。但是,transaction.commit 在您的代码中不应该是必需的,如果您想回滚某些内容,您可以使用 sp = transaction.savepoint() 然后如果在 DBSession.flush() 上发生异常,您可以执行 sp.rollback() 并继续而不创建新的交易。 要明确事务是一个“poc”,这可能只是一个额外的步骤,因为在简单的情况下,您只处理一个事务源(您的数据库)。它实际上会同步所有事务源的更改,例如 pyramid_mailer 除非事务成功,否则不会发送电子邮件,从而防止在请求期间发生异常时发送意外电子邮件。 你是对的,这是一种更好的方法。我仍然很困惑为什么 transaction.commit() 与 DBSession.flush() 有所不同(前者不工作并破坏事务状态,使回滚失败而后者以某种方式工作)......但是是的。你的例子现在更有意义了。 如果您在命令行中处理数据库,请将事务视为执行BEGIN 语句。然后将flush() 视为执行一堆SQL 命令。最后,中间的一切都只是你头脑/记忆中发生的事情(构建 SQL,推动位,组织你将要做的事情)。例如,当您调用DBSession.add(obj) 时,数据库实际上并没有发生任何事情,直到该操作被刷新。在 SQLAlchemy 中,当您执行提交时,会自动进行刷新,但您可以随时刷新。【参考方案5】:

我就是这样做的。

from contextlib import(
        contextmanager,
        )


@contextmanager
def session_scope():
    """Provide a transactional scope around a series of operations."""
    session = Session()
    try:
        yield session
        session.commit()
    except:
        session.rollback()
        raise
    finally:
        session.close()



def create_user(email, firstname, lastname, password):

    new_user = Users(email, firstname, lastname, password)

    try:

        with session_scope() as session:

            session.add(new_user)

    except sqlalchemy.exc.IntegrityError as e:
        pass

http://docs.sqlalchemy.org/en/latest/orm/session_basics.html#when-do-i-construct-a-session-when-do-i-commit-it-and-when-do-i-close-it

【讨论】:

【参考方案6】:

刷新会话后在 finally 语句中捕获异常。

try:
    new_user = Users(email, firstname, lastname, password)
    DBSession.add(new_user)
    return HTTPFound(location = request.route_url('new'))
finally:
    try:
        DBSession.flush()
    except IntegrityError:
        message1 = "Yikes! Your email already exists in our system. Did you forget your password?"

【讨论】:

以上是关于尝试使用 SQLAlchemy 捕获完整性错误的主要内容,如果未能解决你的问题,请参考以下文章

SQLAlchemy在FlushError之后没有回滚

错误未捕获类型错误:无法读取完整日历上未定义的属性“hasTime”

登录Julia时如何捕获完整的堆栈跟踪信息

如何在 Relay 中捕获 GraphQL 错误消息?

尝试添加外键的 SQLAlchemy 错误

无法捕获 SQLAlchemy IntegrityError