Pandas to_sql 在重复主键上失败
Posted
技术标签:
【中文标题】Pandas to_sql 在重复主键上失败【英文标题】:Pandas to_sql fails on duplicate primary key 【发布时间】:2015-07-31 23:25:17 【问题描述】:我想使用 pandas df.to_sql()
函数附加到现有表。
我设置了if_exists='append'
,但我的表有主键。
在尝试对现有表使用append
时,我想做与insert ignore
等效的操作,这样我就可以避免重复输入错误。
熊猫可以做到这一点,还是我需要写一个明确的查询?
【问题讨论】:
Appending Pandas dataframe to sqlite table by primary key的可能重复 【参考方案1】:很遗憾,没有指定“INSERT IGNORE”的选项。这就是我绕过该限制的方法,将不重复的行插入到该数据库中(数据框名称为 df)
for i in range(len(df)):
try:
df.iloc[i:i+1].to_sql(name="Table_Name",if_exists='append',con = Engine)
except IntegrityError:
pass #or any other action
【讨论】:
别忘了添加if_exists='append'
作为参数
这解决了问题,...但它减慢了查询 VEEEEEERY MUCH
对于那些使用 sqlalchemy 的人来说,这对我有用:添加此导入:from sqlalchemy import exc
并将异常更改为:except exc.IntegrityError as e:
。就像@miro 说的那样,它确实减慢了这个过程。
如果表中有 created_at
和 updated_at
等列是自动填充的。这种方法行不通!【参考方案2】:
请注意"if_exists='append'"
与表的存在有关,如果表 不存在该怎么办。
if_exists 与表的内容无关。
在此处查看文档:https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_sql.html
if_exists : ‘fail’, ‘replace’, ‘append’,默认‘fail’ 失败:如果表存在,什么也不做。 replace:如果表存在,则删除它,重新创建它,然后插入数据。 append:如果表存在,则插入数据。如果不存在则创建。
【讨论】:
【参考方案3】:Pandas 目前没有选择,但这里是the Github issue。如果您也需要此功能,请为它投票。
【讨论】:
同时还有 pangres pypi.org/project/pangres【参考方案4】:您可以使用to_sql
的method
参数来做到这一点:
from sqlalchemy.dialects.mysql import insert
def insert_on_duplicate(table, conn, keys, data_iter):
insert_stmt = insert(table.table).values(list(data_iter))
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(insert_stmt.inserted)
conn.execute(on_duplicate_key_stmt)
df.to_sql('trades', dbConnection, if_exists='append', chunksize=4096, method=insert_on_duplicate)
对于旧版本的 sqlalchemy,您需要将 dict
传递给 on_duplicate_key_update
。即on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(dict(insert_stmt.inserted))
【讨论】:
得到一个错误 raise ValueError("update parameter must be a non-empty dictionary") ValueError: update parameter must be a non-empty dictionary @HuyTran 我不确定你为什么会得到那个。 db 表是否已经存在?您的数据框的列是否与表的列匹配? @HuyTran 你用的是什么版本的熊猫? 嗨@Jayden,panda=v1.2.1,sqlalchmy=1.3.22 我发现错误是panda table.table 并插入方言。似乎 ValueError 指的是 insert() 需要一个表对象而不是字符串。 @HuyTran 如果你有一些不同的代码,你能编辑我的答案来澄清吗?我最近在 sqlalchemy 1.3.22 上试过这个,但那个版本的on_duplicate_key_update
不接受 ColumnCollection
,我必须创建一个 dict
。【参考方案5】:
Pandas 不支持编辑 .to_sql 方法的实际 SQL 语法,因此您可能不走运。有一些实验性的编程解决方法(例如,使用CALCHIPAN
将 Dataframe 读取到 SQLAlchemy 对象并使用 SQLAlchemy 进行事务),但最好将 DataFrame 写入 CSV 并使用显式 MySQL 函数加载它。
CALCHIPAN 回购:https://bitbucket.org/zzzeek/calchipan/
【讨论】:
pandas.pydata.org/pandas-docs/stable/whatsnew/…pandas.DataFrame.to_sql() 获得了控制SQL插入子句的方法参数。请参阅文档中的插入方法部分。 (GH8953)【参考方案6】:我在仍然收到 IntegrityError 时遇到了麻烦
...奇怪,但我只是把上面的内容倒过来:
for i, row in df.iterrows():
sql = "SELECT * FROM `Table_Name` WHERE `key` = ''".format(row.Key)
found = pd.read_sql(sql, con=Engine)
if len(found) == 0:
df.iloc[i:i+1].to_sql(name="Table_Name",if_exists='append',con = Engine)
【讨论】:
【参考方案7】:在我的例子中,我试图在一个空表中插入新数据,但是有些行是重复的,这里几乎是同样的问题,我“可能”考虑获取现有数据并与我得到的新数据合并并继续进行,但这不是最优的,可能只适用于小数据,而不是大表。
由于 pandas 目前没有为这种情况提供任何处理,我一直在寻找合适的解决方法,所以我自己做了,不确定这是否适合你,但我决定控制我的数据首先是我的数据,而不是等待它是否有效,所以我所做的是在调用.to_sql
之前删除重复项,所以如果发生任何错误,我会更多地了解我的数据并确保我知道发生了什么:
import pandas as pd
def write_to_table(table_name, data):
df = pd.DataFrame(data)
# Sort by price, so we remove the duplicates after keeping the lowest only
data.sort(key=lambda row: row['price'])
df.drop_duplicates(subset=['id_key'], keep='first', inplace=True)
#
df.to_sql(table_name, engine, index=False, if_exists='append', schema='public')
所以在我的情况下,我想保持行的最低价格(顺便说一句,我为 data
传递了一个 dict
数组),为此,我先进行了排序,没有必要,但这是一个示例我的意思是控制我想要保留的数据。
我希望这会对与我的情况几乎相同的人有所帮助。
【讨论】:
【参考方案8】:上面的 for 循环方法显着减慢了速度。您可以将一个方法参数传递给 panda.to_sql 以帮助实现 sql 查询的自定义
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html#pandas.DataFrame.to_sql
下面的代码应该适用于 postgres,如果与主键“unique_code”发生冲突,则什么也不做。更改数据库的插入方言。
def insert_do_nothing_on_conflicts(sqltable, conn, keys, data_iter):
"""
Execute SQL statement inserting data
Parameters
----------
sqltable : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
Column names
data_iter : Iterable that iterates the values to be inserted
"""
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import table, column
columns=[]
for c in keys:
columns.append(column(c))
if sqltable.schema:
table_name = '.'.format(sqltable.schema, sqltable.name)
else:
table_name = sqltable.name
mytable = table(table_name, *columns)
insert_stmt = insert(mytable).values(list(data_iter))
do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=['unique_code'])
conn.execute(do_nothing_stmt)
pd.to_sql('mytable', con=sql_engine, method=insert_do_nothing_on_conflicts)
【讨论】:
以上是关于Pandas to_sql 在重复主键上失败的主要内容,如果未能解决你的问题,请参考以下文章
在DataPhin基于PySpark实现主键重复就自动失败以提高运维的半自动化水平