基本的pyodbc批量插入
Posted
技术标签:
【中文标题】基本的pyodbc批量插入【英文标题】:basic pyodbc bulk insert 【发布时间】:2016-08-28 18:38:53 【问题描述】:在 python 脚本中,我需要在一个数据源上运行查询并将该查询中的每一行插入到不同数据源上的表中。我通常会使用带有 tsql 链接服务器连接的单个插入/选择语句来执行此操作,但我没有与此特定数据源的链接服务器连接。
我很难找到一个简单的 pyodbc 示例。这是我的做法,但我猜在循环中执行插入语句非常慢。
result = ds1Cursor.execute(selectSql)
for row in result:
insertSql = "insert into TableName (Col1, Col2, Col3) values (?, ?, ?)"
ds2Cursor.execute(insertSql, row[0], row[1], row[2])
ds2Cursor.commit()
有没有更好的批量方法来使用 pyodbc 插入记录?或者这是一种相对有效的方式来做到这一点。我正在使用 SqlServer 2012,以及最新的 pyodbc 和 python 版本。
【问题讨论】:
【参考方案1】:生成使用 execute_many() 所需的 SQL 的有用函数:
def generate_bulk_insert_sql(self, data:pd.DataFrame, table_name) -> str:
table_sql = str([c for c in data.columns]).replace("'","").replace("[", "").replace("]", "")
return f'INSERT INTO table_name (table_sql) VALUES (("?,"*len(data.columns))[:-1])
【讨论】:
【参考方案2】:您应该将executemany
与cursor.fast_executemany = True
一起使用,以提高性能。
pyodbc
的默认行为是运行多次插入,但这效率低下。通过应用fast_executemany
,您可以大幅提升性能。
这是一个例子:
connection = pyodbc.connect('DRIVER=ODBC Driver 17 for SQL Server',host='host', database='db', user='usr', password='foo')
cursor = connection.cursor()
# I'm the important line
cursor.fast_executemany = True
sql = "insert into TableName (Col1, Col2, Col3) values (?, ?, ?)"
tuples=[('foo','bar', 'ham'), ('hoo','far', 'bam')]
cursor.executemany(sql, tuples)
cursor.commit()
cursor.close()
connection.close()
Docs。 请注意,此功能自 4.0.19 Oct 23, 2017 以来一直可用
【讨论】:
【参考方案3】:自从 pymssql 库 (which seems to be under development again) 停产后,我们开始使用由 Zillow 的聪明人开发的 cTDS library,令我们惊讶的是,它支持 FreeTDS 批量插入。
顾名思义,cTDS 是在 FreeTDS 库之上用 C 语言编写的,这使得它非常快,非常快。恕我直言,这是批量插入 SQL Server 的最佳方式,因为 ODBC 驱动程序不支持批量插入,并且建议的 executemany
或 fast_executemany
并不是真正的批量插入操作。 BCP 工具和 T-SQL 批量插入有其局限性,因为它需要 SQL Server 可以访问文件,这在许多情况下可能会破坏交易。
下面是批量插入 CSV 文件的简单实现。请原谅我的任何错误,我在没有测试的情况下写了这个。
我不知道为什么,但对于使用 Latin1_General_CI_AS 的服务器,我需要使用 ctds.SqlVarChar 包装进入 NVarChar 列的数据。 I opened an issue about this but developers said the naming is correct,所以我更改了代码以保持心理健康。
import csv
import ctds
def _to_varchar(txt: str) -> ctds.VARCHAR:
"""
Wraps strings into ctds.NVARCHAR.
"""
if txt == "null":
return None
return ctds.SqlNVarChar(txt)
def _to_nvarchar(txt: str) -> ctds.VARCHAR:
"""
Wraps strings into ctds.VARCHAR.
"""
if txt == "null":
return None
return ctds.SqlVarChar(txt.encode("utf-16le"))
def read(file):
"""
Open CSV File.
Each line is a column:value dict.
https://docs.python.org/3/library/csv.html?highlight=csv#csv.DictReader
"""
with open(file, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
yield row
def transform(row):
"""
Do transformations to data before loading.
Data specified for bulk insertion into text columns (e.g. VARCHAR,
NVARCHAR, TEXT) is not encoded on the client in any way by FreeTDS.
Because of this behavior it is possible to insert textual data with
an invalid encoding and cause the column data to become corrupted.
To prevent this, it is recommended the caller explicitly wrap the
the object with either ctds.SqlVarChar (for CHAR, VARCHAR or TEXT
columns) or ctds.SqlNVarChar (for NCHAR, NVARCHAR or NTEXT columns).
For non-Unicode columns, the value should be first encoded to
column’s encoding (e.g. latin-1). By default ctds.SqlVarChar will
encode str objects to utf-8, which is likely incorrect for most SQL
Server configurations.
https://zillow.github.io/ctds/bulk_insert.html#text-columns
"""
row["col1"] = _to_datetime(row["col1"])
row["col2"] = _to_int(row["col2"])
row["col3"] = _to_nvarchar(row["col3"])
row["col4"] = _to_varchar(row["col4"])
return row
def load(rows):
stime = time.time()
with ctds.connect(**DBCONFIG) as conn:
with conn.cursor() as curs:
curs.execute("TRUNCATE TABLE MYSCHEMA.MYTABLE")
loaded_lines = conn.bulk_insert("MYSCHEMA.MYTABLE", map(transform, rows))
etime = time.time()
print(loaded_lines, " rows loaded in ", etime - stime)
if __name__ == "__main__":
load(read('data.csv'))
【讨论】:
如果有人让它工作,它几乎是最快的选择(至少对于 mssql),谢谢分享。【参考方案4】:这是一个可以批量插入 SQL Server 数据库的函数。
import pyodbc
import contextlib
def bulk_insert(table_name, file_path):
string = "BULK INSERT FROM '' (WITH FORMAT = 'CSV');"
with contextlib.closing(pyodbc.connect("MYCONN")) as conn:
with contextlib.closing(conn.cursor()) as cursor:
cursor.execute(string.format(table_name, file_path))
conn.commit()
这绝对有效。
更新:我在 cmets 以及定期编码时注意到,pyodbc 比 pypyodbc 得到更好的支持。
新更新:删除 conn.close(),因为 with 语句会自动处理。
【讨论】:
这是正确的答案,应该被接受。 executemany 方法不能替代批量插入的速度。值得注意的是,如果您想从迭代器而不是 SQL Server 本身上的文件执行批量插入,则可以选择 ctds 驱动程序。 pypi.python.org/pypi/ctds 刚刚查看了您提供的链接。我觉得它看起来真的很好。要试一试。谢谢。 "由于更新,最好使用pypyodbc而不是pyodbc。" - 这不再是真的。 pyodbc 仍在积极开发中,并得到 Microsoft 的官方支持。对于 pypyodbc,这些陈述都不正确。 感谢戈德指出这一点。我注意到自从我写这篇文章后,pyodbc 有了很大的改进。 这要求您的 SQL 实例在拉入此文件时有权访问它。最好通过对 SQL 实施批量复制将其推送到您的数据库。 github.com/Azure/azure-sqldb-spark【参考方案5】:处理此问题的最佳方法是使用 pyodbc 函数executemany
。
ds1Cursor.execute(selectSql)
result = ds1Cursor.fetchall()
ds2Cursor.executemany('INSERT INTO [TableName] (Col1, Col2, Col3) VALUES (?, ?, ?)', result)
ds2Cursor.commit()
【讨论】:
请注意,executemany 实际上并没有真正做大容量插入。在幕后,它仍然是 1 接 1 的插入。它确实是一个包装器,可以让数据以更 Python 的方式获取数据。这篇 SO 帖子提供了一个适当的 bulkinsert。 ***.com/questions/29638136/… 我得到以下错误,你能给我一个解决方案吗?***.com/questions/50597816/… pyodbc 4.0.19 及更高版本有一个fast_executemany
选项,可以大大加快速度。详情请见this answer。
我知道 Executemany 只会将吞吐量提高约 1.5 倍。有人可以确认吗?
有没有办法将字段/列名列表传递到 SQL 语句中?我正在尝试将所有数据从一个表加载到另一个表。源表有大约 60 个字段,我希望我可以通过编程方式获取源中的字段列表并在 SQL 语句中使用它,而不是在 SQL 语句中键入所有字段名称。以上是关于基本的pyodbc批量插入的主要内容,如果未能解决你的问题,请参考以下文章
使用 pyodbc 批量插入 SQL Server 表:找不到文件