这是从 Twisted 进行线程化 SQLAlchemy 查询的可接受方式吗?

Posted

技术标签:

【中文标题】这是从 Twisted 进行线程化 SQLAlchemy 查询的可接受方式吗?【英文标题】:Is this an acceptable way to make threaded SQLAlchemy queries from Twisted? 【发布时间】:2014-01-12 14:58:29 【问题描述】:

我一直在阅读有关在 Twisted 应用程序的上下文中使用 SQLAlchemy 的 ORM 的内容。要消化的信息很多,所以我很难将所有部分放在一起。到目前为止,我收集了以下绝对真理:

    一个会话意味着一个线程。总是。 scoped_session,默认情况下,为我们提供了一种将会话约束到给定线程的方法。换句话说,我确信通过使用scoped_session,我不会将会话传递给其他线程(除非我明确地这样做,我不会这样做)。

我还收集到一些与延迟/急切加载有关的问题,一种可能的方法是将 ORM 对象与会话分离,并在更改线程时将它们重新附加到另一个会话。我对细节很模糊,但我也得出结论,scoped_session 使许多这些观点没有实际意义。

我的第一个问题是我的上述结论是否严重错误

除此之外,我还精心设计了这种方法,我希望它是令人满意的。

我首先创建一个scoped_session 对象...

Session = scoped_session(sessionmaker(bind=_my_engine))

...然后我将从上下文管理器中使用它,以便优雅地处理异常和清理:

@contextmanager
def transaction_context():
    session = Session()
    try:
        yield session
        session.commit()
    except:
        session.rollback()
        raise
    finally:
        session.remove()  # dispose of the session

现在我需要做的就是在一个被推迟到一个单独线程的函数中使用上面的上下文管理器。我拼凑了一个装饰器,让事情变得更漂亮:

def threaded(fn):
    @wraps(fn)  # functools.wraps
    def wrapper(*args, **kwargs):
        return deferToThread(fn, *args, **kwargs)  # t.i.threads.deferToThread
    return wrapper

这是我打算如何使用整个 shebang 的示例。下面是一个使用 SQLAlchemy ORM 执行数据库查找的函数:

@threaded
def get_some_attributes(group):
    with transaction_context() as session:
        return session.query(Attribute).filter(Attribute.group == group)

我的第二个问题是这种方法是否可行。

我的假设是否存在根本性的缺陷? 有什么注意事项吗? 有没有更好的办法?

编辑:Here 是关于我的上下文管理器中的意外错误的相关问题。

【问题讨论】:

嗯,我想每个人都会遇到的首要问题是:它是否按照您的编码方式工作? @bitcycle,顺便说一句(并且令人惊讶地),不......它不起作用。我在上下文管理器中得到了AttributeError——显然Session 没有remove 属性。这是相当令人惊讶的——一个 这看起来基本正确(除了应该是Session.remove())。要记住的一件事是,一些数据库驱动程序,例如sqlite3,不允许它们的连接在线程之间传递。我相当确定scoped_session 对象可以适当地处理这个问题(我使用 SQLAlchemy 已经有一段时间了)。 正如@Glyph 提到的,Alchimia 已经做了一些。它在大多数情况下都有效(尽管它仍然相当不完整),但它还没有使 sqlite 和朋友们开心所需的线程固定。欢迎投稿,这可能是比从头开始更好的起点。 我很确定 Alchimia 不会包装 ORM。 (也许“还”,也许很少有人真正喜欢 ORM?) 【参考方案1】:

现在我正在研究这个确切的问题,我想我找到了解决方案。

确实,您必须将所有数据库访问功能推迟到一个线程。但是在您的解决方案中,您在查询数据库后删除了会话,因此您的所有结果 ORM 对象都将被分离,您将无法访问它们的字段。

你不能使用 scoped_session,因为在 Twisted 中我们只有一个 MainThread(除了在 deferToThread 中工作的东西)。但是,我们可以将scoped_sesssionscopefunc 一起使用。

在 Twisted 中有一个很棒的东西被称为ContextTracker

提供一种在调用上下传递任意键/值数据的方法 堆栈而不将它们作为参数传递给该调用的函数 堆栈。

在我的扭曲网络应用程序中,方法 render_GET 我设置了一个 uuid 参数:

call = context.call("uuid": str(uuid.uuid4()), self._render, request)

然后我调用 _render 方法来完成实际工作(使用 db、渲染 html 等)。

我这样创建scoped_session

scopefunc = functools.partial(context.get, "uuid")
Session = scoped_session(session_factory, scopefunc=scopefunc)

现在在_render 的任何函数调用中,我都可以与:

Session()

_render 结束时,我必须执行Session.remove() 才能删除会话。

它适用于我的 web 应用程序,我认为可以用于其他任务。

这是一个完全独立的例子,展示它是如何协同工作的。

from twisted.internet import reactor, threads
from twisted.web.resource import Resource
from twisted.web.server import Site, NOT_DONE_YET
from twisted.python import context
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base
import uuid
import functools

engine = create_engine(
    'sqlite:///test.sql',
    connect_args='check_same_thread': False,
    echo=False)

session_factory = sessionmaker(bind=engine)
scopefunc = functools.partial(context.get, "uuid")
Session = scoped_session(session_factory, scopefunc=scopefunc)
Base = declarative_base()


class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)

Base.metadata.create_all(bind=engine)


class TestPage(Resource):
    isLeaf = True

    def render_GET(self, request):
        context.call("uuid": str(uuid.uuid4()), self._render, request)
        return NOT_DONE_YET

    def render_POST(self, request):
        return self.render_GET(request)

    def work_with_db(self):
        user = User(name="TestUser")
        Session.add(user)
        Session.commit()
        return user

    def _render(self, request):
        print "session: ", id(Session())
        d = threads.deferToThread(self.work_with_db)

        def success(result):
            html = "added user with name - %s" % result.name
            request.write(html.encode('UTF-8'))
            request.finish()
            Session.remove()
        call = functools.partial(context.call, "uuid": scopefunc(), success)
        d.addBoth(call)
        return d

if __name__ == "__main__":
    reactor.listenTCP(8888, Site(TestPage()))
    reactor.run()

我打印出会话的 id,您可以看到它对于每个请求都不同。如果您从 scoped_session 构造函数中删除 scopefunc 并同时执行两个请求(将 time.sleep 插入 work_with_db),您将获得这两个请求的一个公共会话。

scoped_session 对象默认使用 threading.local() 作为存储,以便为所有调用 scoped_session 注册表的人维护一个 Session,但仅限于单个线程的范围内

这里有一个问题,在twisted 中,我们只有一个线程来处理所有请求。这就是为什么我们必须创建自己的scopefunc,这将显示请求之间的差异。

另一个问题,twisted 没有将上下文传递给回调,我们必须包装回调并将当前上下文发送给它。

call = functools.partial(context.call, "uuid": scopefunc(), success)

我仍然不知道如何使它与 defer.inLineCallback 一起工作,我在代码中到处使用它。

【讨论】:

感谢您的回答!您是否介意发布一个更详细的示例,明确显示ContextTracker_renderscoped_session 如何组合在一起?我很难看到大局。为什么scopefunc 必须与scoped_session 一起使用?谢谢! @blz 添加 scopefunc 使用示例和说明。 @aborilov 谢谢!你在这里得到了很好的解决方案......我想知道这是否可以用装饰器更隐式地完成

以上是关于这是从 Twisted 进行线程化 SQLAlchemy 查询的可接受方式吗?的主要内容,如果未能解决你的问题,请参考以下文章

Python - twisted reactor - 从线程角度看callLater和callFromThread之间的区别

使用twisted将mysql插入变成异步执行

8.Twisted 多线程

scrapy机制mark(基于twisted)

爬虫日记(82):Twisted的线程返回值

Twisted - CRITICAL - Deferred中的未处理错误...没有堆栈跟踪