使用 SqlAlchemy 和 cx_Oracle 将 Pandas DataFrame 写入 Oracle 数据库时加快 to_sql()

Posted

技术标签:

【中文标题】使用 SqlAlchemy 和 cx_Oracle 将 Pandas DataFrame 写入 Oracle 数据库时加快 to_sql()【英文标题】:Speed up to_sql() when writing Pandas DataFrame to Oracle database using SqlAlchemy and cx_Oracle 【发布时间】:2017-08-01 08:45:57 【问题描述】:

使用 pandas 数据框的 to_sql 方法,我可以很容易地将少量行写入 oracle 数据库中的表:

from sqlalchemy import create_engine
import cx_Oracle
dsn_tns = "(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=<host>)(PORT=1521))\
       (CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=<servicename>)))"
pwd = input('Please type in password:')
engine = create_engine('oracle+cx_oracle://myusername:' + pwd + '@%s' % dsn_tns)
df.to_sql('test_table', engine.connect(), if_exists='replace')

但是对于任何常规大小的数据帧(我的有 60k 行,不是那么大),代码变得无法使用,因为它在我愿意等待的时间内(肯定超过 10 分钟)从未完成。我用谷歌搜索了很多次,最接近的解决方案是ansonw 在this question 中给出的答案。但那是关于mysql,而不是oracle。正如Ziggy Eunicien 指出的那样,它不适用于oracle。有什么想法吗?

编辑

这是数据框中的行示例:

id          name            premium     created_date    init_p  term_number uprate  value   score   group   action_reason
160442353   LDP: Review     1295.619617 2014-01-20  1130.75     1           7       -42 236.328243  6       pass
164623435   TRU: Referral   453.224880  2014-05-20  0.00        11          NaN     -55 38.783290   1       suppress

这里是df的数据类型:

id               int64
name             object
premium          float64
created_date     object
init_p           float64
term_number      float64
uprate           float64
value            float64
score            float64
group            int64
action_reason    object

【问题讨论】:

你能提供一个可运行的测试用例吗?使用相应的 SQL 来创建和填充带有虚拟数据的表?您可能会遇到很多问题。 【参考方案1】:

Pandas + SQLAlchemy 默认将所有object(字符串)列保存为Oracle DB中的CLOB,这使得插入非常缓慢。

这里有一些测试:

import pandas as pd
import cx_Oracle
from sqlalchemy import types, create_engine

#######################################################
### DB connection strings config
#######################################################
tns = """
  (DESCRIPTION =
    (ADDRESS = (PROTOCOL = TCP)(HOST = my-db-scan)(PORT = 1521))
    (CONNECT_DATA =
      (SERVER = DEDICATED)
      (SERVICE_NAME = my_service_name)
    )
  )
"""

usr = "test"
pwd = "my_oracle_password"

engine = create_engine('oracle+cx_oracle://%s:%s@%s' % (usr, pwd, tns))

# sample DF [shape: `(2000, 11)`]
# i took your 2 rows DF and replicated it: `df = pd.concat([df]* 10**3, ignore_index=True)`
df = pd.read_csv('/path/to/file.csv')

DF 信息:

In [61]: df.shape
Out[61]: (2000, 11)

In [62]: df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2000 entries, 0 to 1999
Data columns (total 11 columns):
id               2000 non-null int64
name             2000 non-null object
premium          2000 non-null float64
created_date     2000 non-null datetime64[ns]
init_p           2000 non-null float64
term_number      2000 non-null int64
uprate           1000 non-null float64
value            2000 non-null int64
score            2000 non-null float64
group            2000 non-null int64
action_reason    2000 non-null object
dtypes: datetime64[ns](1), float64(4), int64(4), object(2)
memory usage: 172.0+ KB

让我们看看将它存储到 Oracle DB 需要多长时间:

In [57]: df.shape
Out[57]: (2000, 11)

In [58]: %timeit -n 1 -r 1 df.to_sql('test_table', engine, index=False, if_exists='replace')
1 loop, best of 1: 16 s per loop

在 Oracle DB 中(注意 CLOB):

AAA> desc test.test_table
 Name                            Null?    Type
 ------------------------------- -------- ------------------
 ID                                       NUMBER(19)
 NAME                                     CLOB        #  !!!
 PREMIUM                                  FLOAT(126)
 CREATED_DATE                             DATE
 INIT_P                                   FLOAT(126)
 TERM_NUMBER                              NUMBER(19)
 UPRATE                                   FLOAT(126)
 VALUE                                    NUMBER(19)
 SCORE                                    FLOAT(126)
 group                                    NUMBER(19)
 ACTION_REASON                            CLOB        #  !!!

现在让我们指示 pandas 将所有 object 列保存为 VARCHAR 数据类型:

In [59]: dtyp = c:types.VARCHAR(df[c].str.len().max())
    ...:         for c in df.columns[df.dtypes == 'object'].tolist()
    ...:

In [60]: %timeit -n 1 -r 1 df.to_sql('test_table', engine, index=False, if_exists='replace', dtype=dtyp)
1 loop, best of 1: 335 ms per loop

这次大约是。快 48 倍

签入 Oracle DB:

 AAA> desc test.test_table
 Name                          Null?    Type
 ----------------------------- -------- ---------------------
 ID                                     NUMBER(19)
 NAME                                   VARCHAR2(13 CHAR)        #  !!!
 PREMIUM                                FLOAT(126)
 CREATED_DATE                           DATE
 INIT_P                                 FLOAT(126)
 TERM_NUMBER                            NUMBER(19)
 UPRATE                                 FLOAT(126)
 VALUE                                  NUMBER(19)
 SCORE                                  FLOAT(126)
 group                                  NUMBER(19)
 ACTION_REASON                          VARCHAR2(8 CHAR)        #  !!!

让我们用 200.000 行 DF 测试它:

In [69]: df.shape
Out[69]: (200000, 11)

In [70]: %timeit -n 1 -r 1 df.to_sql('test_table', engine, index=False, if_exists='replace', dtype=dtyp, chunksize=10**4)
1 loop, best of 1: 4.68 s per loop

在我的测试(不是最快的)环境中,200K 行 DF 需要大约 5 秒。

结论:在将 DataFrames 保存到 Oracle DB 时,使用以下技巧为 object dtype 的所有 DF 列显式指定 dtype。否则会被保存为 CLOB 数据类型,需要特殊处理,速度很慢

dtyp = c:types.VARCHAR(df[c].str.len().max())
        for c in df.columns[df.dtypes == 'object'].tolist()

df.to_sql(..., dtype=dtyp)

【讨论】:

谢谢。在我的数据上,将 2 个“对象”列指定为 VARCHAR 类型后,执行时间从 30 多分钟变为几秒钟! @breezymri,很高兴我能帮上忙 :) @MaxU My 这个类型来自“类型”不是吗?我刚刚发现类型没有 VARCHAR, @MaxU 确保你调用了 - - from sqlalchemy import types @MaxU 我正在寻找这个确切的解决方案。我正在为这个解决方案添加书签,因为我的大多数 python 到 oracle 导出都有字符串数据类型,这些数据类型作为对象导入到 oracle 数据库中。 +1【参考方案2】:

您可以只使用 method='multi' 这将提高您的数据插入速度。

您也可以根据需要调整块大小,具体取决于您的数据。

我在尝试编写一个谷歌云函数时发现了这一点,该函数能够将数据从 csv 文件/excel 加载到数据框中,我想将该数据框保存到谷歌云 sql 中的 postgresql 数据库中。

如果您可以在数据框中创建与数据库表中类似的结构,那么这是一个方便使用的工具。

df.to_sql('table_name', con=engine, if_exists='append', index=False, chunksize=2000, method='multi')

【讨论】:

【参考方案3】:

只是在这里评论后代。我在 python 3.6.8、pandas 1.1.3、sqlalchemy 1.3.20 上。当我尝试从 MaxU 实现解决方案时,我最初遇到了一个错误:

raise ValueError(f"col (my_type) not a string")

老实说,我不知道为什么。在花了几个小时调试之后,这终于对我有用了。就我而言,我试图从 CSV 读取并插入到 Oracle:

import cx_Oracle
import numpy as np
import pandas as pd
import sqlalchemy as sa
from sqlalchemy import create_engine

conn = create_engine('oracle://:@'.format(USERNAME, PASSWORD, DATABASE))

df = pd.read_csv(...)
object_columns = [c for c in df.columns[df.dtypes == 'object'].tolist()]
dtyp = c:sa.types.VARCHAR(df[c].str.len().max()) for c in object_columns

df.to_sql(..., dtype=dtyp)

老实说,我并没有真正改变太多,所以不是 100% 确定我为什么会收到原始错误,但只是在这里发布以防万一。

【讨论】:

【参考方案4】:

使用此功能: 此函数将采用数据框名称和长度(可选)。 它将转换后的数据类型(对象类型)返回为 Varchar(length) 默认长度 =250(这个乐趣只处理对象类型)

def dtyp(df_name, length=250):
    cols = df_name.select_dtypes(include='object')
    dtyps = col: VARCHAR2(length) for col in cols
    return dtypsenter code here

调用方法示例:

config.dtyp(dataframe, 300)

【讨论】:

以上是关于使用 SqlAlchemy 和 cx_Oracle 将 Pandas DataFrame 写入 Oracle 数据库时加快 to_sql()的主要内容,如果未能解决你的问题,请参考以下文章

使用Python解决Cx_Oracle查询时UnicodeDecodeError的问题

使用 SqlAlchemy 执行返回 REF CURSOR 的 Oracle 存储过程

SQL Alchemy - 从 Oracle 迁移到 MySQL 的 Python 脚本

python+sqlalchemy 完成Oracle数据库读写操作

Python安装cx_oracle操作

使用 Python 和 Cx_Oracle 调用带有 XMLTYPE 输入和输出参数的 Oracle 存储过程