Python SQLAlchemy+Threading 静默失败

Posted

技术标签:

【中文标题】Python SQLAlchemy+Threading 静默失败【英文标题】:Python SQLAlchemy+Threading failing silently 【发布时间】:2019-07-22 12:23:33 【问题描述】:

我正在尝试构建一个服务器来采样流式价格馈送并使用 SQLAlchemy 更新 postgres 数据库。我正在使用映射类的线程实例,这似乎可以工作但不稳定。

Stream 类的 1 或 2 个实例没有问题,但如果是 10 个,线程会随机且静默地失败。每次在它失败之前,SQLAlchemy 都会给出一条错误消息,所以看起来这就是杀死线程的原因。流没有问题,它总是稳定的。

我是否错过了 SQLAlchemy 设置中的某些内容?有没有更好的方法将多个实时订阅馈送到 SQL 中?

代码:

import time
import json
from threading import Thread, Lock
import sqlalchemy as db
from sqlalchemy.orm import scoped_session, sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base

# Setup SQLAlchemy
engine = db.create_engine('postgresql://localhost:5432/Project', echo=False)
metadata = db.MetaData(bind=engine)
Session = scoped_session(sessionmaker(bind=engine))
Base = declarative_base()

Base.metadata.create_all(engine)
session = Session()

#DB classes
#static data table
class StockMaster(Base):
    __tablename__ = 'stock_master'
    id = db.Column(db.Integer, primary_key=True)
    ticker = db.Column(db.String)
    stock_name = db.Column(db.String)

    @classmethod
    def find_by_ticker(cls,ticker):
        return session.query(StockMaster).filter(StockMaster.ticker==ticker).first()

#live data table
class StockLive(Base):
    __tablename__ = 'stock_live'
    id = db.Column(db.Integer, primary_key=True)
    quote = db.Column(db.Numeric)
    timestamp = db.Column(db.Numeric)
    ticker_id = db.Column(db.Integer, db.ForeignKey('stock_master.id'))

    ticker = relationship("StockMaster", foreign_keys=[ticker_id])

    def __init__(self, quote, ticker_id, timestamp):
        self.quote=quote
        self.ticker_id=ticker_id
        self.timestamp=timestamp

    def save_to_db(self):
        session.add(self)
        session.commit()

    @classmethod
    def find_by_ticker_id(cls,ticker_id):
        return session.query(StockLive).filter(StockLive.ticker_id==ticker_id).first()

    @classmethod
    def find_by_ticker(cls,ticker):
        ticker_id = StockMaster.find_by_ticker(ticker).id
        return session.query(StockLive).filter(StockLive.ticker_id==ticker_id).first()


class Stream(Thread):
    def __init__(self,ticker):
        Thread.__init__(self)
        self.ticker=ticker
        self.quote=1
        self.data_set = StockLive.find_by_ticker(self.ticker)
        self.count=0

    def run(self):
        con.subscribe(self.ticker)
        current_mid=1
        while True:
            new_data = json.loads(con.get_price(self.ticker).to_json())
            new_mid = new_data['Mid']

            if new_mid == current_mid:
                pass
            else:
                current_mid = new_mid
                self.data_set.quote = current_mid
                self.data_set.timestamp = time.time()
                try:
                    self.data_set.save_to_db()
                    self.count+=1
                except:
                    self.data_set = StockLive.find_by_ticker(self.ticker)
                    print('error saving to db for '+self.ticker)
            time.sleep(.1)


if __name__ == '__main__':
    threads=
    for ticker in tickerlist:
        try:
            threads[ticker]=Stream(ticker)
            threads[ticker].setName('Thread ' + ticker)
            threads[ticker].start()
        except:
            print('Error setting up '+ticker)

    while True:
        for ticker in tickerlist:
            if threads[ticker].isAlive()==False:
                threads[ticker]=Stream(ticker)

SQLAlchemy 错误信息:

/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py:2323: SAWarning:当前未使用“Session.add()”操作 在刷新过程的执行阶段支持。结果可能 不一致。考虑使用替代事件侦听器或 而是连接级操作。 % 方法) /anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py:2425: SAWarning: 属性历史事件累积在 1 个以前的干净 内部刷新事件处理程序中的实例已被重置,并将 不会导致数据库更新。考虑使用 set_committed_value() 在内部刷新事件处理程序中以避免此警告。 % 长度) 线程 Thread MSFT 中的异常:回溯(最后一次调用): 文件 “/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2436 行,在 _flush transaction.commit() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 465 行,提交中 self._assert_active(prepared_ok=True) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 285 行,在 _assert_active raise sa_exc.ResourceClosedError(closed_msg) sqlalchemy.exc.ResourceClosedError: This transaction is closed

在处理上述异常的过程中,又发生了一个异常:

Traceback(最近一次调用最后一次):文件 “”,第 48 行,运行中 self.data_set.save_to_db() 文件“”,第 44 行,在 save_to_db session.commit() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 954 行,提交中 self.transaction.commit() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 467 行,提交中 self._prepare_impl() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 447 行,在 _prepare_impl self.session.flush() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2313 行,齐平 self._flush(objects) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2440 行,在 _flush transaction.rollback(_capture_exception=True) 文件 "/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", 第 76 行,在 退出 compat.reraise(type_, value, traceback) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py”, 第 249 行,在再加注中 提高价值文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2440 行,在 _flush transaction.rollback(_capture_exception=True) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 483 行,在回滚中 self._assert_active(prepared_ok=True, rollback_ok=True) 文件 "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 第 285 行,在 _assert_active raise sa_exc.ResourceClosedError(closed_msg) sqlalchemy.exc.ResourceClosedError: This transaction is closed

在处理上述异常的过程中,又发生了一个异常:

Traceback(最近一次调用最后一次):文件 “/anaconda3/lib/python3.7/threading.py”,第 917 行,在 _bootstrap_inner self.run() 文件“”,第 53 行,运行中 self.data_set = StockLive.find_by_ticker(self.ccy) 文件“”,第 52 行,在 find_by_ticker ticker_id = StockMaster.find_by_ticker(ticker).id 文件“”,第 23 行,在 find_by_ticker 返回 session.query(StockMaster).filter(StockMaster.ticker==ticker).first() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py”, 第 2895 行,首先 ret = list(self[0:1]) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py”,行 2687,在 getitem 中 返回列表(res)文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py”,行 2994,在 iter self.session._autoflush() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 1493 行,在 _autoflush 中 self.flush() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2313 行,齐平 self._flush(objects) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2400 行,在 _flush subtransactions=True)文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 865 行,开始 嵌套=嵌套)文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 297 行,在 _begin self._assert_active() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 264 行,在 _assert_active “此会话处于‘准备好的’状态;没有进一步” sqlalchemy.exc.InvalidRequestError:此会话处于‘准备好的’状态 状态;此事务中不能再发出 SQL。

【问题讨论】:

我认为您需要更改创建引擎中的pool 设置。默认情况下它是 5(你的代码吱吱作响超过 5 个线程)所以将其更改为你的需要。 【参考方案1】:

您的代码可能还有其他问题,但一个明显的问题是您与多个线程共享您的session

应该在每个线程中创建这些,而不是设置全局session

我无法运行您的代码,但您可以尝试以下方法:

完全删除全局会话变量。你不需要它。然后修改您的方法和线程以包含本地会话:

@classmethod
def find_by_ticker(cls,ticker, session):
    return session.query(StockMaster).filter(StockMaster.ticker==ticker).first()

...

class Stream(Thread):
    def __init__(self,ticker):
        Thread.__init__(self)
        self.ticker=ticker
        self.quote=1
        self.session = Session()
        self.data_set = StockLive.find_by_ticker(self.ticker, self.session)
        self.count=0

或者类似的东西。现在这将使每个线程都有自己的会话,并且您的代码将开始表现得更好。

【讨论】:

感谢您的回复,这似乎解决了一个问题,但又引发了另一个问题。线程看起来更稳定,但 session.flush() 进程现在显示不稳定。错误是“SAWarning:在刷新过程的执行阶段当前不支持使用 'Session.add()' 操作。结果可能不一致。请考虑使用替代事件侦听器或连接级操作。”如果我自己尝试 session.flush() ,它仍然会随机失败。 您是否完全删除了全局会话变量(Base.metadata.create_all(engine) 之后的行?如果它仍然存在,请尝试删除它,因为您可能仍在代码中的某处使用全局会话。例如在哪里会话是否出现在您的 StockLive 类方法中?您需要在代码中的任何地方都执行相同的操作。抱歉,我最初没有发现。 如果您删除全局会话变量,您的代码编辑器将突出显示您需要进行的所有更改,因为不再存在全局会话。现在对全局会话的每个引用都将失败。

以上是关于Python SQLAlchemy+Threading 静默失败的主要内容,如果未能解决你的问题,请参考以下文章

Python flask-sqlalchemy初级解析

Python SQLAlchemy入门教程

python sqlalchemy

python flask orm sqlalchemy 实例

python flask orm sqlalchemy 实例

Python—sqlalchemy