SQLAlchemy,以惯用的 Python 方式进行可序列化事务隔离和重试
Posted
技术标签:
【中文标题】SQLAlchemy,以惯用的 Python 方式进行可序列化事务隔离和重试【英文标题】:SQLAlchemy, Serializable transactions isolation and retries in idiomatic Python way 【发布时间】:2015-02-05 17:18:36 【问题描述】:PostgreSQL 和 SQL 定义了一个Serializable transaction isolation level。如果您将事务隔离到此级别,冲突的并发事务将中止并需要重试。
我熟悉 Plone / Zope 世界中的事务重试的概念,其中在发生事务冲突的情况下可以重播整个 HTTP 请求。使用 SQLAlchemy(可能使用 zope.sqlalchemy)可以实现多类似的功能?我试图阅读 zope.sqlalchemy 和 Zope transaction manager 的文档,但这对我来说并不明显。
我特别想要这样的东西:
# Try to do the stuff, if it fails because of transaction conflict do again until retry count is exceeded
with transaction.manager(retries=3):
do_stuff()
# If we couldn't get the transaction through even after 3 attempts, fail with a horrible exception
【问题讨论】:
... 在写完问题后我发现了这个 - zodb.readthedocs.org/en/latest/… - 虽然也许有一些更精简的方法来提出重试循环? 我认为这是你能得到的最好的。with
不能重复代码,循环不允许清理。
@Eevee:函数装饰器怎么样?
ZODB 魔法虽然邪恶,但在需要时会让你想念它
@CraigRinger:默认情况下,SQLAlchemy ORM 对象不能在它们创建的事务之外访问。请参阅 expire_on_commit - file:///Users/mikko/Library/Application%20Support/Dash/DocSets/SQLAlchemy/SQLAlchemy.docset/Contents/Resources/Documents/docs.sqlalchemy.org/en/rel_0_9/orm/session.html# //apple_ref/Class/sqlalchemy.orm.session.Session - 我认为这也适用于回滚/中止,但不确定。
【参考方案1】:
因此,在摸索了大约两周并且没有找到现成的解决方案之后,我想出了自己的解决方案。
这是一个ConflictResolver
类,它提供了managed_transaction
函数装饰器。您可以使用装饰器将函数标记为可重试。 IE。如果运行函数时出现数据库冲突错误,则再次运行该函数,现在更希望导致冲突错误的数据库事务能够完成。
源代码在这里:https://bitbucket.org/miohtama/cryptoassets/src/529c50d74972ff90fe5b61dfbfc1428189cc248f/cryptoassets/core/tests/test_conflictresolver.py?at=master
涵盖它的单元测试在这里:https://bitbucket.org/miohtama/cryptoassets/src/529c50d74972ff90fe5b61dfbfc1428189cc248f/cryptoassets/core/tests/test_conflictresolver.py?at=master
仅限 Python 3.4+。
"""Serialized SQL transaction conflict resolution as a function decorator."""
import warnings
import logging
from collections import Counter
from sqlalchemy.orm.exc import ConcurrentModificationError
from sqlalchemy.exc import OperationalError
UNSUPPORTED_DATABASE = "Seems like we might know how to support serializable transactions for this database. We don't know or it is untested. Thus, the reliability of the service may suffer. See transaction documentation for the details."
#: Tuples of (Exception class, test function). Behavior copied from _retryable_errors definitions copied from zope.sqlalchemy
DATABASE_COFLICT_ERRORS = []
try:
import psycopg2.extensions
except ImportError:
pass
else:
DATABASE_COFLICT_ERRORS.append((psycopg2.extensions.TransactionRollbackError, None))
# ORA-08177: can't serialize access for this transaction
try:
import cx_Oracle
except ImportError:
pass
else:
DATABASE_COFLICT_ERRORS.append((cx_Oracle.DatabaseError, lambda e: e.args[0].code == 8177))
if not DATABASE_COFLICT_ERRORS:
# TODO: Do this when cryptoassets app engine is configured
warnings.warn(UNSUPPORTED_DATABASE, UserWarning, stacklevel=2)
#: XXX: We need to confirm is this the right way for mysql, SQLIte?
DATABASE_COFLICT_ERRORS.append((ConcurrentModificationError, None))
logger = logging.getLogger(__name__)
class CannotResolveDatabaseConflict(Exception):
"""The managed_transaction decorator has given up trying to resolve the conflict.
We have exceeded the threshold for database conflicts. Probably long-running transactions or overload are blocking our rows in the database, so that this transaction would never succeed in error free manner. Thus, we need to tell our service user that unfortunately this time you cannot do your thing.
"""
class ConflictResolver:
def __init__(self, session_factory, retries):
"""
:param session_factory: `callback()` which will give us a new SQLAlchemy session object for each transaction and retry
:param retries: The number of attempst we try to re-run the transaction in the case of transaction conflict.
"""
self.retries = retries
self.session_factory = session_factory
# Simple beancounting diagnostics how well we are doing
self.stats = Counter(success=0, retries=0, errors=0, unresolved=0)
@classmethod
def is_retryable_exception(self, e):
"""Does the exception look like a database conflict error?
Check for database driver specific cases.
:param e: Python Exception instance
"""
if not isinstance(e, OperationalError):
# Not an SQLAlchemy exception
return False
# The exception SQLAlchemy wrapped
orig = e.orig
for err, func in DATABASE_COFLICT_ERRORS:
# EXception type matches, now compare its values
if isinstance(orig, err):
if func:
return func(e)
else:
return True
return False
def managed_transaction(self, func):
"""SQL Seralized transaction isolation-level conflict resolution.
When SQL transaction isolation level is its highest level (Serializable), the SQL database itself cannot alone resolve conflicting concurrenct transactions. Thus, the SQL driver raises an exception to signal this condition.
``managed_transaction`` decorator will retry to run everyhing inside the function
Usage::
# Create new session for SQLAlchemy engine
def create_session():
Session = sessionmaker()
Session.configure(bind=engine)
return Session()
conflict_resolver = ConflictResolver(create_session, retries=3)
# Create a decorated function which can try to re-run itself in the case of conflict
@conflict_resolver.managed_transaction
def myfunc(session):
# Both threads modify the same wallet simultaneously
w = session.query(BitcoinWallet).get(1)
w.balance += 1
# Execute the conflict sensitive code inside a managed transaction
myfunc()
The rules:
- You must not swallow all exceptions within ``managed_transactions``. Example how to handle exceptions::
# Create a decorated function which can try to re-run itself in the case of conflict
@conflict_resolver.managed_transaction
def myfunc(session):
try:
my_code()
except Exception as e:
if ConflictResolver.is_retryable_exception(e):
# This must be passed to the function decorator, so it can attempt retry
raise
# Otherwise the exception is all yours
- Use read-only database sessions if you know you do not need to modify the database and you need weaker transaction guarantees e.g. for displaying the total balance.
- Never do external actions, like sending emails, inside ``managed_transaction``. If the database transaction is replayed, the code is run twice and you end up sending the same email twice.
- Managed transaction section should be as small and fast as possible
- Avoid long-running transactions by splitting up big transaction to smaller worker batches
This implementation heavily draws inspiration from the following sources
- http://***.com/q/27351433/315168
- https://gist.github.com/khayrov/6291557
"""
def decorated_func():
# Read attemps from app configuration
attempts = self.retries
while attempts >= 0:
session = self.session_factory()
try:
result = func(session)
session.commit()
self.stats["success"] += 1
return result
except Exception as e:
if self.is_retryable_exception(e):
session.close()
self.stats["retries"] += 1
attempts -= 1
if attempts < 0:
self.stats["unresolved"] += 1
raise CannotResolveDatabaseConflict("Could not replay the transaction even after attempts".format(func, self.retries)) from e
continue
else:
session.rollback()
self.stats["errors"] += 1
# All other exceptions should fall through
raise
return decorated_func
【讨论】:
【参考方案2】:zope.sqlalchemy 将 Postgres 和 Oracle 冲突错误标记为可重试。在引擎配置中设置您的隔离级别,pyramid_tm 或 Zope 中的事务重试逻辑将起作用。
【讨论】:
以上是关于SQLAlchemy,以惯用的 Python 方式进行可序列化事务隔离和重试的主要内容,如果未能解决你的问题,请参考以下文章