将大型 Pandas 数据帧写入 SQL Server 数据库

Posted

技术标签:

【中文标题】将大型 Pandas 数据帧写入 SQL Server 数据库【英文标题】:Write Large Pandas DataFrames to SQL Server database 【发布时间】:2016-02-22 08:44:50 【问题描述】:

我有 74 个相对较大的 Pandas DataFrame(大约 34,600 行和 8 列),我正试图尽快将它们插入 SQL Server 数据库。在做了一些研究之后,我了解到好的 olepandas.to_sql 函数不适用于 SQL Server 数据库中的如此大的插入,这是我最初采用的方法(非常慢 - 应用程序完成几乎一个小时 vs 大约一个小时)使用 mysql 数据库时需要 4 分钟。)

This article 和许多其他 *** 帖子有助于为我指明正确的方向,但我遇到了障碍:

出于上面链接中解释的原因,我正在尝试使用 SQLAlchemy 的核心而不是 ORM。所以,我将数据框转换为字典,使用pandas.to_dict,然后执行execute()insert()

self._session_factory.engine.execute(
    TimeSeriesResultValues.__table__.insert(),
    data)
# 'data' is a list of dictionaries.

问题是插入没有得到任何值——它们显示为一堆空括号,我得到这个错误:

(pyodbc.IntegretyError) ('23000', "[23000] [FreeTDS][SQL Server]Cannot
insert the value NULL into the column...

我传入的字典列表中有一些值,所以我不知道为什么这些值没有显示出来。

编辑:

下面是我要讲的例子:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        ["name": 'NAME ' + str(i) for i in range(n)]
    )
    print("SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")

【问题讨论】:

使用 mysql 数据库大约需要 4 分钟 ...所以to_sql() 是一个可行的解决方案,只是 MSSQL 中的连接比 MySQL 慢吗?您使用的是哪个 ODBC API?数据库服务器是本地的还是远程的?考虑一个临时表导入,然后迁移到最终表。 @Parfait:使用to_sql() 可以在 MySQL 中产生可接受的性能,但在 MSSQL 中不行。我正在使用pyodbc。数据库是远程的,因此写入 CSV 文件然后通过原始 sql 代码进行批量插入在这种情况下也不会真正起作用。此外,用户需要批量管理权限才能执行此操作,而对于此应用程序的用户而言,这可能并不总是可行的。 考虑绕过 odbc 驱动程序并使用严格的 Python API - pmyssl 和 MySQL ODBC API? pymysql?两者的表结构和数据类型相同吗?相同数量的记录?真的调查这个。两者都是高级企业 RDMS,不应该执行那么宽的范围(4 分钟对 ~60 分钟)。 【参考方案1】:

我要告诉你一些不幸的消息,SQLAlchemy 实际上并没有为 SQL Server 实现批量导入,它实际上只是执行 to_sql 正在执行的相同缓慢的单个 INSERT 语句。我想说你最好的选择是尝试使用bcp 命令行工具编写脚本。这是我过去使用过的脚本,但不能保证:

from subprocess import check_output, call
import pandas as pd
import numpy as np
import os

pad = 0.1
tablename = 'sandbox.max.pybcp_test'
overwrite=True
raise_exception = True
server = 'P01'
trusted_connection= True
username=None
password=None
delimiter='|'
df = pd.read_csv('D:/inputdata.csv', encoding='latin', error_bad_lines=False)



def get_column_def_sql(col):
   if col.dtype == object:
      width = col.str.len().max() * (1+pad)
      return '[] varchar()'.format(col.name, int(width)) 
   elif np.issubdtype(col.dtype, float):
      return'[] float'.format(col.name) 
   elif np.issubdtype(col.dtype, int):
      return '[] int'.format(col.name) 
   else:
      if raise_exception:
         raise NotImplementedError('data type  not implemented'.format(col.dtype))
      else:
         print('Warning: cast column  as varchar; data type  not implemented'.format(col, col.dtype))
         width = col.str.len().max() * (1+pad)
         return '[] varchar()'.format(col.name, int(width)) 

def create_table(df, tablename, server, trusted_connection, username, password, pad):         
    if trusted_connection:
       login_string = '-E'
    else:
       login_string = '-U  -P '.format(username, password)

    col_defs = []
    for col in df:
       col_defs += [get_column_def_sql(df[col])]

    query_string = 'CREATE TABLE \n()\nGO\nQUIT'.format(tablename, ',\n'.join(col_defs))       
    if overwrite == True:
       query_string = "IF OBJECT_ID('', 'U') IS NOT NULL DROP TABLE ;".format(tablename, tablename) + query_string


    query_file = 'c:\\pybcp_tempqueryfile.sql'
    with open (query_file,'w') as f:
       f.write(query_string)

    if trusted_connection:
       login_string = '-E'
    else:
       login_string = '-U  -P '.format(username, password)

    o = call('sqlcmd -S   -i '.format(server, login_string, query_file), shell=True)
    if o != 0:
       raise BaseException("Failed to create table")
   # o = call('del '.format(query_file), shell=True)


def call_bcp(df, tablename):   
    if trusted_connection:
       login_string = '-T'
    else:
       login_string = '-U  -P '.format(username, password)
    temp_file = 'c:\\pybcp_tempqueryfile.csv'

    #remove the delimiter and change the encoding of the data frame to latin so sql server can read it
    df.loc[:,df.dtypes == object] = df.loc[:,df.dtypes == object].apply(lambda col: col.str.replace(delimiter,'').str.encode('latin'))
    df.to_csv(temp_file, index = False, sep = '|', errors='ignore')
    o = call('bcp sandbox.max.pybcp_test2 in c:\pybcp_tempqueryfile.csv -S "localhost" -T -t^| -r\n -c')

【讨论】:

【参考方案2】:

这刚刚更新为 SQLAchemy 版本:1.3.0,以防万一其他人需要知道。应该使您的 dataframe.to_sql 语句更快。

https://docs.sqlalchemy.org/en/latest/changelog/migration_13.html#support-for-pyodbc-fast-executemany

engine = create_engine( "mssql+pyodbc://scott:tiger@mssql2017:1433/test?driver=ODBC+Driver+13+for+SQL+Server", fast_executemany=True)

【讨论】:

fast_executemany=True 在我的情况下并没有真正的帮助。与其他 RDBM 相比,写入速度非常慢。 这适用于我在 Azure 上使用 df.to_sql() 和 SQL Server DB。速度提高了 >10 倍

以上是关于将大型 Pandas 数据帧写入 SQL Server 数据库的主要内容,如果未能解决你的问题,请参考以下文章

“未指定驱动程序名称”将 pandas 数据帧写入 SQL Server 表

将数据从 python pandas 数据帧导出或写入 MS Access 表

AWS:从 Pandas 数据帧写入 DynamoDB

使用 Pandas 在 Python 中处理大型 SQL 查询?

将pandas DataFrame写入sql时出现无效列名错误

Pandas - 将大型数据框切成块