Pandas to_sql 在重复主键上失败

Posted

技术标签:

【中文标题】Pandas to_sql 在重复主键上失败【英文标题】:Pandas to_sql fails on duplicate primary key 【发布时间】:2015-07-31 23:25:17 【问题描述】:

我想使用 pandas df.to_sql() 函数附加到现有表。

我设置了if_exists='append',但我的表有主键。

在尝试对现有表使用append 时,我想做与insert ignore 等效的操作,这样我就可以避免重复输入错误。

熊猫可以做到这一点,还是我需要写一个明确的查询?

【问题讨论】:

Appending Pandas dataframe to sqlite table by primary key的可能重复 【参考方案1】:

很遗憾,没有指定“INSERT IGNORE”的选项。这就是我绕过该限制的方法,将不重复的行插入到该数据库中(数据框名称为 df)

for i in range(len(df)):
    try:
        df.iloc[i:i+1].to_sql(name="Table_Name",if_exists='append',con = Engine)
    except IntegrityError:
        pass #or any other action

【讨论】:

别忘了添加if_exists='append'作为参数 这解决了问题,...但它减慢了查询 VEEEEEERY MUCH 对于那些使用 sqlalchemy 的人来说,这对我有用:添加此导入:from sqlalchemy import exc 并将异常更改为:except exc.IntegrityError as e:。就像@miro 说的那样,它确实减慢了这个过程。 如果表中有 created_atupdated_at 等列是自动填充的。这种方法行不通!【参考方案2】:

请注意"if_exists='append'" 与表的存在有关,如果 不存在该怎么办。 if_exists 与表的内容无关。 在此处查看文档:https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_sql.html

if_exists : ‘fail’, ‘replace’, ‘append’,默认‘fail’ 失败:如果表存在,什么也不做。 replace:如果表存在,则删除它,重新创建它,然后插入数据。 append:如果表存在,则插入数据。如果不存在则创建。

【讨论】:

【参考方案3】:

Pandas 目前没有选择,但这里是the Github issue。如果您也需要此功能,请为它投票。

【讨论】:

同时还有 pangres pypi.org/project/pangres【参考方案4】:

您可以使用to_sqlmethod 参数来做到这一点:

from sqlalchemy.dialects.mysql import insert

def insert_on_duplicate(table, conn, keys, data_iter):
    insert_stmt = insert(table.table).values(list(data_iter))
    on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(insert_stmt.inserted)
    conn.execute(on_duplicate_key_stmt)

df.to_sql('trades', dbConnection, if_exists='append', chunksize=4096, method=insert_on_duplicate)

对于旧版本的 sqlalchemy,您需要将 dict 传递给 on_duplicate_key_update。即on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(dict(insert_stmt.inserted))

【讨论】:

得到一个错误 raise ValueError("update parameter must be a non-empty dictionary") ValueError: update parameter must be a non-empty dictionary @HuyTran 我不确定你为什么会得到那个。 db 表是否已经存在?您的数据框的列是否与表的列匹配? @HuyTran 你用的是什么版本的熊猫? 嗨@Jayden,panda=v1.2.1,sqlalchmy=1.3.22 我发现错误是panda table.table 并插入方言。似乎 ValueError 指的是 insert() 需要一个表对象而不是字符串。 @HuyTran 如果你有一些不同的代码,你能编辑我的答案来澄清吗?我最近在 sqlalchemy 1.3.22 上试过这个,但那个版本的 on_duplicate_key_update 不接受 ColumnCollection,我必须创建一个 dict【参考方案5】:

Pandas 不支持编辑 .to_sql 方法的实际 SQL 语法,因此您可能不走运。有一些实验性的编程解决方法(例如,使用CALCHIPAN 将 Dataframe 读取到 SQLAlchemy 对象并使用 SQLAlchemy 进行事务),但最好将 DataFrame 写入 CSV 并使用显式 MySQL 函数加载它。

CALCHIPAN 回购:https://bitbucket.org/zzzeek/calchipan/

【讨论】:

pandas.pydata.org/pandas-docs/stable/whatsnew/…pandas.DataFrame.to_sql() 获得了控制SQL插入子句的方法参数。请参阅文档中的插入方法部分。 (GH8953)【参考方案6】:

我在仍然收到 IntegrityError 时遇到了麻烦

...奇怪,但我只是把上面的内容倒过来:

for i, row in df.iterrows():
    sql = "SELECT * FROM `Table_Name` WHERE `key` = ''".format(row.Key)
    found = pd.read_sql(sql, con=Engine)
    if len(found) == 0:
        df.iloc[i:i+1].to_sql(name="Table_Name",if_exists='append',con = Engine)

【讨论】:

【参考方案7】:

在我的例子中,我试图在一个空表中插入新数据,但是有些行是重复的,这里几乎是同样的问题,我“可能”考虑获取现有数据并与我得到的新数据合并并继续进行,但这不是最优的,可能只适用于小数据,而不是大表。

由于 pandas 目前没有为这种情况提供任何处理,我一直在寻找合适的解决方法,所以我自己做了,不确定这是否适合你,但我决定控制我的数据首先是我的数据,而不是等待它是否有效,所以我所做的是在调用.to_sql 之前删除重复项,所以如果发生任何错误,我会更多地了解我的数据并确保我知道发生了什么:

import pandas as pd


def write_to_table(table_name, data):
    df = pd.DataFrame(data)
    # Sort by price, so we remove the duplicates after keeping the lowest only
    data.sort(key=lambda row: row['price'])
    df.drop_duplicates(subset=['id_key'], keep='first', inplace=True)
    #
    df.to_sql(table_name, engine, index=False, if_exists='append', schema='public')

所以在我的情况下,我想保持行的最低价格(顺便说一句,我为 data 传递了一个 dict 数组),为此,我先进行了排序,没有必要,但这是一个示例我的意思是控制我想要保留的数据。

我希望这会对与我的情况几乎相同的人有所帮助。

【讨论】:

【参考方案8】:

上面的 for 循环方法显着减慢了速度。您可以将一个方法参数传递给 panda.to_sql 以帮助实现 sql 查询的自定义

https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html#pandas.DataFrame.to_sql

下面的代码应该适用于 postgres,如果与主键“unique_code”发生冲突,则什么也不做。更改数据库的插入方言。

def insert_do_nothing_on_conflicts(sqltable, conn, keys, data_iter):
    """
    Execute SQL statement inserting data

    Parameters
    ----------
    sqltable : pandas.io.sql.SQLTable
    conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
    keys : list of str
        Column names
    data_iter : Iterable that iterates the values to be inserted
    """
    from sqlalchemy.dialects.postgresql import insert
    from sqlalchemy import table, column
    columns=[]
    for c in keys:
        columns.append(column(c))

    if sqltable.schema:
        table_name = '.'.format(sqltable.schema, sqltable.name)
    else:
        table_name = sqltable.name

    mytable = table(table_name, *columns)

    insert_stmt = insert(mytable).values(list(data_iter))
    do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=['unique_code'])

    conn.execute(do_nothing_stmt)

pd.to_sql('mytable', con=sql_engine, method=insert_do_nothing_on_conflicts)

【讨论】:

以上是关于Pandas to_sql 在重复主键上失败的主要内容,如果未能解决你的问题,请参考以下文章

为啥插入重复主键时 Kudu 不会失败?

在DataPhin基于PySpark实现主键重复就自动失败以提高运维的半自动化水平

在DataPhin基于PySpark实现主键重复就自动失败以提高运维的半自动化水平

SQL Sever表添加主键失败

在 Enter 键上提交登录信息 [重复]

Mysql数据库中多条重复数据,如何只删除一条?