使用 SqlAlchemy 和 cx_Oracle 将 Pandas DataFrame 写入 Oracle 数据库时加快 to_sql()
Posted
技术标签:
【中文标题】使用 SqlAlchemy 和 cx_Oracle 将 Pandas DataFrame 写入 Oracle 数据库时加快 to_sql()【英文标题】:Speed up to_sql() when writing Pandas DataFrame to Oracle database using SqlAlchemy and cx_Oracle 【发布时间】:2017-08-01 08:45:57 【问题描述】:使用 pandas 数据框的 to_sql 方法,我可以很容易地将少量行写入 oracle 数据库中的表:
from sqlalchemy import create_engine
import cx_Oracle
dsn_tns = "(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=<host>)(PORT=1521))\
(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=<servicename>)))"
pwd = input('Please type in password:')
engine = create_engine('oracle+cx_oracle://myusername:' + pwd + '@%s' % dsn_tns)
df.to_sql('test_table', engine.connect(), if_exists='replace')
但是对于任何常规大小的数据帧(我的有 60k 行,不是那么大),代码变得无法使用,因为它在我愿意等待的时间内(肯定超过 10 分钟)从未完成。我用谷歌搜索了很多次,最接近的解决方案是ansonw 在this question 中给出的答案。但那是关于mysql,而不是oracle。正如Ziggy Eunicien 指出的那样,它不适用于oracle。有什么想法吗?
编辑
这是数据框中的行示例:
id name premium created_date init_p term_number uprate value score group action_reason
160442353 LDP: Review 1295.619617 2014-01-20 1130.75 1 7 -42 236.328243 6 pass
164623435 TRU: Referral 453.224880 2014-05-20 0.00 11 NaN -55 38.783290 1 suppress
这里是df的数据类型:
id int64
name object
premium float64
created_date object
init_p float64
term_number float64
uprate float64
value float64
score float64
group int64
action_reason object
【问题讨论】:
你能提供一个可运行的测试用例吗?使用相应的 SQL 来创建和填充带有虚拟数据的表?您可能会遇到很多问题。 【参考方案1】:Pandas + SQLAlchemy 默认将所有object
(字符串)列保存为Oracle DB中的CLOB,这使得插入非常缓慢。
这里有一些测试:
import pandas as pd
import cx_Oracle
from sqlalchemy import types, create_engine
#######################################################
### DB connection strings config
#######################################################
tns = """
(DESCRIPTION =
(ADDRESS = (PROTOCOL = TCP)(HOST = my-db-scan)(PORT = 1521))
(CONNECT_DATA =
(SERVER = DEDICATED)
(SERVICE_NAME = my_service_name)
)
)
"""
usr = "test"
pwd = "my_oracle_password"
engine = create_engine('oracle+cx_oracle://%s:%s@%s' % (usr, pwd, tns))
# sample DF [shape: `(2000, 11)`]
# i took your 2 rows DF and replicated it: `df = pd.concat([df]* 10**3, ignore_index=True)`
df = pd.read_csv('/path/to/file.csv')
DF 信息:
In [61]: df.shape
Out[61]: (2000, 11)
In [62]: df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2000 entries, 0 to 1999
Data columns (total 11 columns):
id 2000 non-null int64
name 2000 non-null object
premium 2000 non-null float64
created_date 2000 non-null datetime64[ns]
init_p 2000 non-null float64
term_number 2000 non-null int64
uprate 1000 non-null float64
value 2000 non-null int64
score 2000 non-null float64
group 2000 non-null int64
action_reason 2000 non-null object
dtypes: datetime64[ns](1), float64(4), int64(4), object(2)
memory usage: 172.0+ KB
让我们看看将它存储到 Oracle DB 需要多长时间:
In [57]: df.shape
Out[57]: (2000, 11)
In [58]: %timeit -n 1 -r 1 df.to_sql('test_table', engine, index=False, if_exists='replace')
1 loop, best of 1: 16 s per loop
在 Oracle DB 中(注意 CLOB):
AAA> desc test.test_table
Name Null? Type
------------------------------- -------- ------------------
ID NUMBER(19)
NAME CLOB # !!!
PREMIUM FLOAT(126)
CREATED_DATE DATE
INIT_P FLOAT(126)
TERM_NUMBER NUMBER(19)
UPRATE FLOAT(126)
VALUE NUMBER(19)
SCORE FLOAT(126)
group NUMBER(19)
ACTION_REASON CLOB # !!!
现在让我们指示 pandas 将所有 object
列保存为 VARCHAR 数据类型:
In [59]: dtyp = c:types.VARCHAR(df[c].str.len().max())
...: for c in df.columns[df.dtypes == 'object'].tolist()
...:
In [60]: %timeit -n 1 -r 1 df.to_sql('test_table', engine, index=False, if_exists='replace', dtype=dtyp)
1 loop, best of 1: 335 ms per loop
这次大约是。快 48 倍
签入 Oracle DB:
AAA> desc test.test_table
Name Null? Type
----------------------------- -------- ---------------------
ID NUMBER(19)
NAME VARCHAR2(13 CHAR) # !!!
PREMIUM FLOAT(126)
CREATED_DATE DATE
INIT_P FLOAT(126)
TERM_NUMBER NUMBER(19)
UPRATE FLOAT(126)
VALUE NUMBER(19)
SCORE FLOAT(126)
group NUMBER(19)
ACTION_REASON VARCHAR2(8 CHAR) # !!!
让我们用 200.000 行 DF 测试它:
In [69]: df.shape
Out[69]: (200000, 11)
In [70]: %timeit -n 1 -r 1 df.to_sql('test_table', engine, index=False, if_exists='replace', dtype=dtyp, chunksize=10**4)
1 loop, best of 1: 4.68 s per loop
在我的测试(不是最快的)环境中,200K 行 DF 需要大约 5 秒。
结论:在将 DataFrames 保存到 Oracle DB 时,使用以下技巧为 object
dtype 的所有 DF 列显式指定 dtype
。否则会被保存为 CLOB 数据类型,需要特殊处理,速度很慢
dtyp = c:types.VARCHAR(df[c].str.len().max())
for c in df.columns[df.dtypes == 'object'].tolist()
df.to_sql(..., dtype=dtyp)
【讨论】:
谢谢。在我的数据上,将 2 个“对象”列指定为 VARCHAR 类型后,执行时间从 30 多分钟变为几秒钟! @breezymri,很高兴我能帮上忙 :) @MaxU My 这个类型来自“类型”不是吗?我刚刚发现类型没有 VARCHAR, @MaxU 确保你调用了 - - from sqlalchemy import types @MaxU 我正在寻找这个确切的解决方案。我正在为这个解决方案添加书签,因为我的大多数 python 到 oracle 导出都有字符串数据类型,这些数据类型作为对象导入到 oracle 数据库中。 +1【参考方案2】:您可以只使用 method='multi' 这将提高您的数据插入速度。
您也可以根据需要调整块大小,具体取决于您的数据。
我在尝试编写一个谷歌云函数时发现了这一点,该函数能够将数据从 csv 文件/excel 加载到数据框中,我想将该数据框保存到谷歌云 sql 中的 postgresql 数据库中。
如果您可以在数据框中创建与数据库表中类似的结构,那么这是一个方便使用的工具。
df.to_sql('table_name', con=engine, if_exists='append', index=False, chunksize=2000, method='multi')
【讨论】:
【参考方案3】:只是在这里评论后代。我在 python 3.6.8、pandas 1.1.3、sqlalchemy 1.3.20 上。当我尝试从 MaxU 实现解决方案时,我最初遇到了一个错误:
raise ValueError(f"col (my_type) not a string")
老实说,我不知道为什么。在花了几个小时调试之后,这终于对我有用了。就我而言,我试图从 CSV 读取并插入到 Oracle:
import cx_Oracle
import numpy as np
import pandas as pd
import sqlalchemy as sa
from sqlalchemy import create_engine
conn = create_engine('oracle://:@'.format(USERNAME, PASSWORD, DATABASE))
df = pd.read_csv(...)
object_columns = [c for c in df.columns[df.dtypes == 'object'].tolist()]
dtyp = c:sa.types.VARCHAR(df[c].str.len().max()) for c in object_columns
df.to_sql(..., dtype=dtyp)
老实说,我并没有真正改变太多,所以不是 100% 确定我为什么会收到原始错误,但只是在这里发布以防万一。
【讨论】:
【参考方案4】:使用此功能: 此函数将采用数据框名称和长度(可选)。 它将转换后的数据类型(对象类型)返回为 Varchar(length) 默认长度 =250(这个乐趣只处理对象类型)
def dtyp(df_name, length=250):
cols = df_name.select_dtypes(include='object')
dtyps = col: VARCHAR2(length) for col in cols
return dtypsenter code here
调用方法示例:
config.dtyp(dataframe, 300)
【讨论】:
以上是关于使用 SqlAlchemy 和 cx_Oracle 将 Pandas DataFrame 写入 Oracle 数据库时加快 to_sql()的主要内容,如果未能解决你的问题,请参考以下文章
使用Python解决Cx_Oracle查询时UnicodeDecodeError的问题
使用 SqlAlchemy 执行返回 REF CURSOR 的 Oracle 存储过程
SQL Alchemy - 从 Oracle 迁移到 MySQL 的 Python 脚本