Python SQLAlchemy+Threading 静默失败
Posted
技术标签:
【中文标题】Python SQLAlchemy+Threading 静默失败【英文标题】:Python SQLAlchemy+Threading failing silently 【发布时间】:2019-07-22 12:23:33 【问题描述】:我正在尝试构建一个服务器来采样流式价格馈送并使用 SQLAlchemy 更新 postgres 数据库。我正在使用映射类的线程实例,这似乎可以工作但不稳定。
Stream 类的 1 或 2 个实例没有问题,但如果是 10 个,线程会随机且静默地失败。每次在它失败之前,SQLAlchemy 都会给出一条错误消息,所以看起来这就是杀死线程的原因。流没有问题,它总是稳定的。
我是否错过了 SQLAlchemy 设置中的某些内容?有没有更好的方法将多个实时订阅馈送到 SQL 中?
代码:
import time
import json
from threading import Thread, Lock
import sqlalchemy as db
from sqlalchemy.orm import scoped_session, sessionmaker, relationship
from sqlalchemy.ext.declarative import declarative_base
# Setup SQLAlchemy
engine = db.create_engine('postgresql://localhost:5432/Project', echo=False)
metadata = db.MetaData(bind=engine)
Session = scoped_session(sessionmaker(bind=engine))
Base = declarative_base()
Base.metadata.create_all(engine)
session = Session()
#DB classes
#static data table
class StockMaster(Base):
__tablename__ = 'stock_master'
id = db.Column(db.Integer, primary_key=True)
ticker = db.Column(db.String)
stock_name = db.Column(db.String)
@classmethod
def find_by_ticker(cls,ticker):
return session.query(StockMaster).filter(StockMaster.ticker==ticker).first()
#live data table
class StockLive(Base):
__tablename__ = 'stock_live'
id = db.Column(db.Integer, primary_key=True)
quote = db.Column(db.Numeric)
timestamp = db.Column(db.Numeric)
ticker_id = db.Column(db.Integer, db.ForeignKey('stock_master.id'))
ticker = relationship("StockMaster", foreign_keys=[ticker_id])
def __init__(self, quote, ticker_id, timestamp):
self.quote=quote
self.ticker_id=ticker_id
self.timestamp=timestamp
def save_to_db(self):
session.add(self)
session.commit()
@classmethod
def find_by_ticker_id(cls,ticker_id):
return session.query(StockLive).filter(StockLive.ticker_id==ticker_id).first()
@classmethod
def find_by_ticker(cls,ticker):
ticker_id = StockMaster.find_by_ticker(ticker).id
return session.query(StockLive).filter(StockLive.ticker_id==ticker_id).first()
class Stream(Thread):
def __init__(self,ticker):
Thread.__init__(self)
self.ticker=ticker
self.quote=1
self.data_set = StockLive.find_by_ticker(self.ticker)
self.count=0
def run(self):
con.subscribe(self.ticker)
current_mid=1
while True:
new_data = json.loads(con.get_price(self.ticker).to_json())
new_mid = new_data['Mid']
if new_mid == current_mid:
pass
else:
current_mid = new_mid
self.data_set.quote = current_mid
self.data_set.timestamp = time.time()
try:
self.data_set.save_to_db()
self.count+=1
except:
self.data_set = StockLive.find_by_ticker(self.ticker)
print('error saving to db for '+self.ticker)
time.sleep(.1)
if __name__ == '__main__':
threads=
for ticker in tickerlist:
try:
threads[ticker]=Stream(ticker)
threads[ticker].setName('Thread ' + ticker)
threads[ticker].start()
except:
print('Error setting up '+ticker)
while True:
for ticker in tickerlist:
if threads[ticker].isAlive()==False:
threads[ticker]=Stream(ticker)
SQLAlchemy 错误信息:
/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py:2323: SAWarning:当前未使用“Session.add()”操作 在刷新过程的执行阶段支持。结果可能 不一致。考虑使用替代事件侦听器或 而是连接级操作。 % 方法) /anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py:2425: SAWarning: 属性历史事件累积在 1 个以前的干净 内部刷新事件处理程序中的实例已被重置,并将 不会导致数据库更新。考虑使用 set_committed_value() 在内部刷新事件处理程序中以避免此警告。 % 长度) 线程 Thread MSFT 中的异常:回溯(最后一次调用): 文件 “/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2436 行,在 _flush transaction.commit() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 465 行,提交中 self._assert_active(prepared_ok=True) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 285 行,在 _assert_active raise sa_exc.ResourceClosedError(closed_msg) sqlalchemy.exc.ResourceClosedError: This transaction is closed
在处理上述异常的过程中,又发生了一个异常:
Traceback(最近一次调用最后一次):文件 “”,第 48 行,运行中 self.data_set.save_to_db() 文件“”,第 44 行,在 save_to_db session.commit() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 954 行,提交中 self.transaction.commit() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 467 行,提交中 self._prepare_impl() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 447 行,在 _prepare_impl self.session.flush() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2313 行,齐平 self._flush(objects) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2440 行,在 _flush transaction.rollback(_capture_exception=True) 文件 "/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", 第 76 行,在 退出 compat.reraise(type_, value, traceback) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/util/compat.py”, 第 249 行,在再加注中 提高价值文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2440 行,在 _flush transaction.rollback(_capture_exception=True) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 483 行,在回滚中 self._assert_active(prepared_ok=True, rollback_ok=True) 文件 "/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py", 第 285 行,在 _assert_active raise sa_exc.ResourceClosedError(closed_msg) sqlalchemy.exc.ResourceClosedError: This transaction is closed
在处理上述异常的过程中,又发生了一个异常:
Traceback(最近一次调用最后一次):文件 “/anaconda3/lib/python3.7/threading.py”,第 917 行,在 _bootstrap_inner self.run() 文件“”,第 53 行,运行中 self.data_set = StockLive.find_by_ticker(self.ccy) 文件“”,第 52 行,在 find_by_ticker ticker_id = StockMaster.find_by_ticker(ticker).id 文件“”,第 23 行,在 find_by_ticker 返回 session.query(StockMaster).filter(StockMaster.ticker==ticker).first() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py”, 第 2895 行,首先 ret = list(self[0:1]) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py”,行 2687,在 getitem 中 返回列表(res)文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/query.py”,行 2994,在 iter self.session._autoflush() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 1493 行,在 _autoflush 中 self.flush() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2313 行,齐平 self._flush(objects) 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 2400 行,在 _flush subtransactions=True)文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 865 行,开始 嵌套=嵌套)文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 297 行,在 _begin self._assert_active() 文件“/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”, 第 264 行,在 _assert_active “此会话处于‘准备好的’状态;没有进一步” sqlalchemy.exc.InvalidRequestError:此会话处于‘准备好的’状态 状态;此事务中不能再发出 SQL。
【问题讨论】:
我认为您需要更改创建引擎中的pool
设置。默认情况下它是 5(你的代码吱吱作响超过 5 个线程)所以将其更改为你的需要。
【参考方案1】:
您的代码可能还有其他问题,但一个明显的问题是您与多个线程共享您的session
。
应该在每个线程中创建这些,而不是设置全局session
。
我无法运行您的代码,但您可以尝试以下方法:
完全删除全局会话变量。你不需要它。然后修改您的方法和线程以包含本地会话:
@classmethod
def find_by_ticker(cls,ticker, session):
return session.query(StockMaster).filter(StockMaster.ticker==ticker).first()
...
class Stream(Thread):
def __init__(self,ticker):
Thread.__init__(self)
self.ticker=ticker
self.quote=1
self.session = Session()
self.data_set = StockLive.find_by_ticker(self.ticker, self.session)
self.count=0
或者类似的东西。现在这将使每个线程都有自己的会话,并且您的代码将开始表现得更好。
【讨论】:
感谢您的回复,这似乎解决了一个问题,但又引发了另一个问题。线程看起来更稳定,但 session.flush() 进程现在显示不稳定。错误是“SAWarning:在刷新过程的执行阶段当前不支持使用 'Session.add()' 操作。结果可能不一致。请考虑使用替代事件侦听器或连接级操作。”如果我自己尝试 session.flush() ,它仍然会随机失败。 您是否完全删除了全局会话变量(Base.metadata.create_all(engine)
之后的行?如果它仍然存在,请尝试删除它,因为您可能仍在代码中的某处使用全局会话。例如在哪里会话是否出现在您的 StockLive
类方法中?您需要在代码中的任何地方都执行相同的操作。抱歉,我最初没有发现。
如果您删除全局会话变量,您的代码编辑器将突出显示您需要进行的所有更改,因为不再存在全局会话。现在对全局会话的每个引用都将失败。以上是关于Python SQLAlchemy+Threading 静默失败的主要内容,如果未能解决你的问题,请参考以下文章
python flask orm sqlalchemy 实例