并行化 pandas pyodbc SQL 数据库调用
Posted
技术标签:
【中文标题】并行化 pandas pyodbc SQL 数据库调用【英文标题】:Parallelizing pandas pyodbc SQL database calls 【发布时间】:2015-11-15 03:50:15 【问题描述】:我目前正在通过pandas.io.sql.read_sql()
命令将数据查询到数据框中。我想并行化类似于这些人所倡导的调用:(Embarrassingly parallel database calls with Python (PyData Paris 2015 ))
类似(非常笼统):
pools = [ThreadedConnectionPool(1,20,dsn=d) for d in dsns]
connections = [pool.getconn() for pool in pools]
parallel_connection = ParallelConnection(connections)
pandas_cursor = parallel_connection.cursor()
pandas_cursor.execute(my_query)
这样的事情可能吗?
【问题讨论】:
你的SQL数据库类型和驱动是什么,是否支持多线程调用? 使用MS sql server,确实支持多线程调用 不确定 pyodbc,但自 2013 年以来,pymssql 似乎对多线程来说是线程安全的:pymssql.org/en/latest/changelog.html?highlight=threading 【参考方案1】:是的,这应该可行,但需要注意的是,您需要在您的网站上更改 parallel_connection.py。在该代码中有一个fetchall
函数并行执行每个游标,然后组合结果。这是您将要改变的核心:
旧代码:
def fetchall(self):
results = [None] * len(self.cursors)
def do_work(index, cursor):
results[index] = cursor.fetchall()
self._do_parallel(do_work)
return list(chain(*[rs for rs in results]))
新代码:
def fetchall(self):
results = [None] * len(self.sql_connections)
def do_work(index, sql_connection):
sql, conn = sql_connection # Store tuple of sql/conn instead of cursor
results[index] = pd.read_sql(sql, conn)
self._do_parallel(do_work)
return pd.DataFrame().append([rs for rs in results])
回购:https://github.com/godatadriven/ParallelConnection
【讨论】:
是否可以展示一个示例,说明您如何实际传递查询sql, conn = sql_connection
基本上我们需要传递 sql 和连接的元组?。
已经好几年了,所以我不完全记得上下文 - 但从链接代码中看起来,您会将 (sql, conn)
元组数组传递给 ParallelConnection 的构造函数。类似ParallelConnection([(sql1, con1), (sql2, con2)])
所以在传递查询字符串时不需要调用execute()
?就像在问题中一样......
在上面的例子中,我使用了fetchall
而不是execute
,但是你可以用execute
做同样的事情。使用元组数组初始化 ParallelConnection 后,调用 execute
或 fetchall
和 _do_parallel
函数处理将工作传递给各个连接/查询。以上是关于并行化 pandas pyodbc SQL 数据库调用的主要内容,如果未能解决你的问题,请参考以下文章
使用 pyODBC 的 fast_executemany 加速 pandas.DataFrame.to_sql
我可以将 pyodbc executemany 与 sql 存储过程一起使用吗?