SQLAlchemy - 在 postgresql 中执行批量 upsert(如果存在,更新,否则插入)
Posted
技术标签:
【中文标题】SQLAlchemy - 在 postgresql 中执行批量 upsert(如果存在,更新,否则插入)【英文标题】:SQLAlchemy - performing a bulk upsert (if exists, update, else insert) in postgresql 【发布时间】:2014-11-15 07:18:48 【问题描述】:我正在尝试使用 SQLAlchemy 模块(不在 SQL 中!)在 python 中编写批量 upsert。
我在 SQLAlchemy 添加时收到以下错误:
sqlalchemy.exc.IntegrityError: (IntegrityError) duplicate key value violates unique constraint "posts_pkey"
DETAIL: Key (id)=(TEST1234) already exists.
我有一个名为posts
的表,主键位于id
列。
在这个例子中,我已经在数据库中有一行id=TEST1234
。当我尝试将id
设置为TEST1234
的新帖子对象db.session.add()
时,出现上述错误。我的印象是,如果主键已经存在,记录就会更新。
如何仅基于主键使用 Flask-SQLAlchemy 进行 upsert?有简单的解决方案吗?
如果没有,我总是可以检查并删除任何具有匹配 id 的记录,然后插入新记录,但这对于我的情况来说似乎很昂贵,我不希望有很多更新。
【问题讨论】:
如果原始问题没有提到 SQLAlchemy,那怎么重复? 你能考虑接受exhuma's answer吗?它利用了 PosgreSQL 的INSERT … ON CONFLICT DO UPDATE
功能并且效果很好。
【参考方案1】:
SQLAlchemy 中有一个 upsert-esque 操作:
db.session.merge()
找到这个命令后,我可以执行 upserts,但值得一提的是,这个操作对于批量“upsert”来说很慢。
另一种方法是获取您想要更新插入的主键列表,并在数据库中查询任何匹配的 ID:
# Imagine that post1, post5, and post1000 are posts objects with ids 1, 5 and 1000 respectively
# The goal is to "upsert" these posts.
# we initialize a dict which maps id to the post object
my_new_posts = 1: post1, 5: post5, 1000: post1000
for each in posts.query.filter(posts.id.in_(my_new_posts.keys())).all():
# Only merge those posts which already exist in the database
db.session.merge(my_new_posts.pop(each.id))
# Only add those posts which did not exist in the database
db.session.add_all(my_new_posts.values())
# Now we commit our modifications (merges) and inserts (adds) to the database!
db.session.commit()
【讨论】:
合并不处理 intigirtyError 上述过程很慢,不能使用 如果您在唯一索引上捕获duplicate key
错误,合并将无济于事,它仅适用于主键
合并没有麻烦【参考方案2】:
您可以利用on_conflict_do_update
变体。一个简单的例子如下:
from sqlalchemy.dialects.postgresql import insert
class Post(Base):
"""
A simple class for demonstration
"""
id = Column(Integer, primary_key=True)
title = Column(Unicode)
# Prepare all the values that should be "upserted" to the DB
values = [
"id": 1, "title": "mytitle 1",
"id": 2, "title": "mytitle 2",
"id": 3, "title": "mytitle 3",
"id": 4, "title": "mytitle 4",
]
stmt = insert(Post).values(values)
stmt = stmt.on_conflict_do_update(
# Let's use the constraint name which was visible in the original posts error msg
constraint="post_pkey",
# The columns that should be updated on conflict
set_=
"title": stmt.excluded.title
)
session.execute(stmt)
有关ON CONFLICT DO UPDATE
的更多详细信息,请参阅the Postgres docs。
有关on_conflict_do_update
的更多详细信息,请参阅the SQLAlchemy docs。
关于重复列名的旁注
上面的代码在values
列表和set_
的参数中都使用列名作为字典键。如果在类定义中更改了列名,则需要在任何地方进行更改,否则它将中断。这可以通过访问列定义来避免,使代码更难看,但更健壮:
coldefs = Post.__table__.c
values = [
coldefs.id.name: 1, coldefs.title.name: "mytitlte 1",
...
]
stmt = stmt.on_conflict_do_update(
...
set_=
coldefs.title.name: stmt.excluded.title
...
)
【讨论】:
我的constraint="post_pkey"
代码失败,因为 sqlalchemy 找不到我在原始 sql CREATE UNIQUE INDEX post_pkey...
中创建的唯一约束,然后使用 metadata.reflect(eng, only="my_table")
加载到 sqlalchemy 中,之后我收到警告 @987654333 @ 有什么解决办法吗?
@user1071182 我认为最好将其作为一个单独的问题发布。它将允许您添加更多细节。如果没有看到完整的CREATE INDEX
声明,很难猜出这里出了什么问题。我不能保证任何事情,因为我还没有使用 SQLAlchemy 处理部分索引。但也许其他人可能有解决方案。
这是最干净的解决方案!【参考方案3】:
使用编译扩展的替代方法 (https://docs.sqlalchemy.org/en/13/core/compiler.html):
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def compile_upsert(insert_stmt, compiler, **kwargs):
"""
converts every SQL insert to an upsert i.e;
INSERT INTO test (foo, bar) VALUES (1, 'a')
becomes:
INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar)
(assuming foo is a primary key)
:param insert_stmt: Original insert statement
:param compiler: SQL Compiler
:param kwargs: optional arguments
:return: upsert statement
"""
pk = insert_stmt.table.primary_key
insert = compiler.visit_insert(insert_stmt, **kwargs)
ondup = f'ON CONFLICT (",".join(c.name for c in pk)) DO UPDATE SET'
updates = ', '.join(f"c.name=EXCLUDED.c.name" for c in insert_stmt.table.columns)
upsert = ' '.join((insert, ondup, updates))
return upsert
这应该确保所有插入语句都表现为 upsert。这个实现是用 Postgres 方言实现的,但是对于 mysql 方言,它应该很容易修改。
【讨论】:
使用该 sn-p 时出现此错误:sqlalchemy.exc.ProgrammingError: (psycopg2.errors.SyntaxError) syntax error at or near ")" LINE 1: ...on) VALUES ('US^WYOMING^ALBANY', '') ON CONFLICT () DO UPDAT...
啊,不错的收获!如果您的表中没有主键,这将不起作用。让我添加一个修复程序。
实际上,如果你没有主键,我不确定你为什么需要这个 - 你能详细说明这个问题吗?
将 all 插入转换为 upsert 是有风险的。有时您需要获取数据一致性的完整性错误并避免意外覆盖。如果您 120% 了解此解决方案的所有含义,我只会使用此解决方案!【参考方案4】:
我开始研究这个,我认为我找到了一种在 sqlalchemy 中使用 bulk_insert_mappings
和 bulk_update_mappings
而不是 merge
的组合进行 upserts 的非常有效的方法。
import time
import sqlite3
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from contextlib import contextmanager
engine = None
Session = sessionmaker()
Base = declarative_base()
def creat_new_database(db_name="sqlite:///bulk_upsert_sqlalchemy.db"):
global engine
engine = create_engine(db_name, echo=False)
local_session = scoped_session(Session)
local_session.remove()
local_session.configure(bind=engine, autoflush=False, expire_on_commit=False)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
@contextmanager
def db_session():
local_session = scoped_session(Session)
session = local_session()
session.expire_on_commit = False
try:
yield session
except BaseException:
session.rollback()
raise
finally:
session.close()
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))
def bulk_upsert_mappings(customers):
entries_to_update = []
entries_to_put = []
with db_session() as sess:
t0 = time.time()
# Find all customers that needs to be updated and build mappings
for each in (
sess.query(Customer.id).filter(Customer.id.in_(customers.keys())).all()
):
customer = customers.pop(each.id)
entries_to_update.append("id": customer["id"], "name": customer["name"])
# Bulk mappings for everything that needs to be inserted
for customer in customers.values():
entries_to_put.append("id": customer["id"], "name": customer["name"])
sess.bulk_insert_mappings(Customer, entries_to_put)
sess.bulk_update_mappings(Customer, entries_to_update)
sess.commit()
print(
"Total time for upsert with MAPPING update "
+ str(len(customers))
+ " records "
+ str(time.time() - t0)
+ " sec"
+ " inserted : "
+ str(len(entries_to_put))
+ " - updated : "
+ str(len(entries_to_update))
)
def bulk_upsert_merge(customers):
entries_to_update = 0
entries_to_put = []
with db_session() as sess:
t0 = time.time()
# Find all customers that needs to be updated and merge
for each in (
sess.query(Customer.id).filter(Customer.id.in_(customers.keys())).all()
):
values = customers.pop(each.id)
sess.merge(Customer(id=values["id"], name=values["name"]))
entries_to_update += 1
# Bulk mappings for everything that needs to be inserted
for customer in customers.values():
entries_to_put.append("id": customer["id"], "name": customer["name"])
sess.bulk_insert_mappings(Customer, entries_to_put)
sess.commit()
print(
"Total time for upsert with MERGE update "
+ str(len(customers))
+ " records "
+ str(time.time() - t0)
+ " sec"
+ " inserted : "
+ str(len(entries_to_put))
+ " - updated : "
+ str(entries_to_update)
)
if __name__ == "__main__":
batch_size = 10000
# Only inserts
customers_insert =
i: "id": i, "name": "customer_" + str(i) for i in range(batch_size)
# 50/50 inserts update
customers_upsert =
i: "id": i, "name": "customer_2_" + str(i)
for i in range(int(batch_size / 2), batch_size + int(batch_size / 2))
creat_new_database()
bulk_upsert_mappings(customers_insert.copy())
bulk_upsert_mappings(customers_upsert.copy())
bulk_upsert_mappings(customers_insert.copy())
creat_new_database()
bulk_upsert_merge(customers_insert.copy())
bulk_upsert_merge(customers_upsert.copy())
bulk_upsert_merge(customers_insert.copy())
基准测试结果:
Total time for upsert with MAPPING: 0.17138004302978516 sec inserted : 10000 - updated : 0
Total time for upsert with MAPPING: 0.22074174880981445 sec inserted : 5000 - updated : 5000
Total time for upsert with MAPPING: 0.22307634353637695 sec inserted : 0 - updated : 10000
Total time for upsert with MERGE: 0.1724097728729248 sec inserted : 10000 - updated : 0
Total time for upsert with MERGE: 7.852903842926025 sec inserted : 5000 - updated : 5000
Total time for upsert with MERGE: 15.11970829963684 sec inserted : 0 - updated : 10000
【讨论】:
【参考方案5】:这不是最安全的方法,但它非常简单且非常快速。我只是想有选择地覆盖表格的一部分。我删除了我知道会发生冲突的已知行,然后我从 pandas 数据框中附加了新行。您的 pandas 数据框列名需要与您的 sql 表列名匹配。
eng = create_engine('postgresql://...')
conn = eng.connect()
conn.execute("DELETE FROM my_table WHERE col = %s", val)
df.to_sql('my_table', con=eng, if_exists='append')
【讨论】:
【参考方案6】:我知道这有点晚了,但我以@Emil Wåreus 给出的答案为基础,并将其变成了可用于任何模型(表)的函数,
def upsert_data(self, entries, model, key):
entries_to_update = []
entries_to_insert = []
# get all entries to be updated
for each in session.query(model).filter(getattr(model, key).in_(entries.keys())).all():
entry = entries.pop(str(getattr(each, key)))
entries_to_update.append(entry)
# get all entries to be inserted
for entry in entries.values():
entries_to_insert.append(entry)
session.bulk_insert_mappings(model, entries_to_insert)
session.bulk_update_mappings(model, entries_to_update)
session.commit()
entries
应该是一个字典,以主键值作为键,值应该是映射(值与数据库列的映射)。
model
是您要插入的 ORM 模型。
key
是表的主键。
您甚至可以使用此函数从字符串中获取要插入的表的模型,
def get_table(self, table_name):
for c in self.base._decl_class_registry.values():
if hasattr(c, '__tablename__') and c.__tablename__ == table_name:
return c
使用它,您可以将表的名称作为字符串传递给upsert_data
函数,
def upsert_data(self, entries, table, key):
model = get_table(table)
entries_to_update = []
entries_to_insert = []
# get all entries to be updated
for each in session.query(model).filter(getattr(model, key).in_(entries.keys())).all():
entry = entries.pop(str(getattr(each, key)))
entries_to_update.append(entry)
# get all entries to be inserted
for entry in entries.values():
entries_to_insert.append(entry)
session.bulk_insert_mappings(model, entries_to_insert)
session.bulk_update_mappings(model, entries_to_update)
session.commit()
【讨论】:
以上是关于SQLAlchemy - 在 postgresql 中执行批量 upsert(如果存在,更新,否则插入)的主要内容,如果未能解决你的问题,请参考以下文章
使用 sqlalchemy 连接到本地 postgresql
使用 PostgreSQL 在 SQLAlchemy 中输入 ENUM
使用 PostgreSQL 在 SQLAlchemy 测试中创建数据库
Flask 和 SQLAlchemy 在 PostgreSQL 的事务连接中导致大量 IDLE