获取从 pandas to_sql 函数插入的 ID 列表

Posted

技术标签:

【中文标题】获取从 pandas to_sql 函数插入的 ID 列表【英文标题】:Obtain list of IDs inserted from pandas to_sql function 【发布时间】:2022-01-22 02:47:38 【问题描述】:

以下 Python 代码通过先前配置的 SqlAlchemy 引擎成功地将属于 pandas 数据帧的行附加到 MS SQL 表中。

df.to_sql(schema='stg', name = 'TEST', con=engine, if_exists='append', index=False)

我想为插入到stg.Test 表中的每一行获取自动生成的 ID 编号。换句话说,什么是SqlAlchemy 相当于Sql Server OUTPUT clause during an INSERT statement

【问题讨论】:

【参考方案1】:

不幸的是,对于您的问题没有简单的解决方案,例如您的语句中的附加参数。您必须使用新行获得最高 id + 1 分配的行为。有了这些知识,您就可以计算出所有行的 id。

选项 1: 在 this answer 中解释。您在插入语句之前选择当前的最大 id。然后,您为DataFrame 中大于先前最大值的所有条目分配ID。最后,插入已经包含 id 的 df。

选项 2: 您插入 DataFrame,然后获取最高 id。通过插入的条目数,您可以计算所有条目的 id。这是这样一个插入函数的样子:

def insert_df_and_return_ids(df, engine):
    # It is important to use same connection for both statements if
    # something like last_insert_rowid() is used
    conn = engine.connect()
    
    # Insert the df into the database
    df.to_sql('students', conn, if_exists='append', index=False)
    
    # Aquire the maximum id
    result = conn.execute('SELECT max(id) FROM students') # Should work for all SQL variants
    # result = conn.execute('Select last_insert_rowid()') # Specifically for SQLite
    # result = conn.execute('Select last_insert_id()') # Specifically for mysql


    entries = df.shape[0]
    last_id = -1
    
    # Iterate over result to get last inserted id
    for row in result:
        last_id = int(str(row[0]))
    conn.close()
    
    # Generate list of ids
    list_of_ids = list(range(last_id - entries + 1, last_id + 1))

    return list_of_ids

PS:我无法在 MS SQL 服务器上测试该功能,但行为应该是相同的。为了测试一切是否正常,你可以使用这个:

import numpy as np
import pandas as pd
import sqlalchemy as sa

# Change connection to MS SQL server
engine = sa.create_engine('sqlite:///test.lite', echo=False)

# Create table
meta = sa.MetaData()
students = sa.Table(
   'students', meta, 
   sa.Column('id', sa.Integer, primary_key = True), 
   sa.Column('name', sa.String), 
)
meta.create_all(engine)

# DataFrame to insert with two entries
df = pd.DataFrame('name': ['Alice', 'Bob'])

ids = insert_df_and_return_ids(df, engine)
print(ids) # [1,2]

conn = engine.connect()
# Insert any entry with a high id in order to check if new ids are always the maximum
result = conn.execute("Insert into students (id, name) VALUES (53, 'Charlie')")
conn.close()

# Insert data frame again
ids = insert_df_and_return_ids(df, engine)
print(ids) # [54, 55]

编辑:如果使用多个线程,事务可用于使选项至少对 SQLite 是线程安全的:

conn = engine.connect()
transaction = conn.begin()
df.to_sql('students', conn, if_exists='append', index=False)
result = conn.execute('SELECT max(id) FROM students')
transaction.commit()

【讨论】:

这些选项不能防止来自另一个数据库连接的同时插入,即另一个用户在我们的 Python 代码运行时插入行。你是绝对正确的,似乎不存在一个简单的解决方案,目前,我正在研究一个基于 MS SQL 的解决方案,它利用 OUTPUT 子句并返回适当的 ID,可能使用一个过程。我会看看情况如何。如果您有其他好主意,请随时分享。 你说得对,并发是两个选项的问题。有可能使用事务来防止任何竞争条件。如果我的解决方案对您没有帮助,我认为您最好的选择是构建自己的插入 SQL 语句,将数据帧值复制到语句中,然后使用 connection.execute(statement) 返回查询结果。

以上是关于获取从 pandas to_sql 函数插入的 ID 列表的主要内容,如果未能解决你的问题,请参考以下文章

Pandas DataFrame.to_sql() 函数是不是需要后续的 commit()?

pandas to_sql if_exist参数指南

在 pandas.to_sql() 中使用“可调用”方法的示例?

pandas DataFrame.to_sql() 函数 if_exists 参数不起作用

pandas DataFrame.to_sql 和 nan 值

使用 Pandas .to_sql 将 JSON 列写入 Postgres