如何在 PostgreSQL 中进行 UPSERT(合并、插入……重复更新)?

Posted

技术标签:

【中文标题】如何在 PostgreSQL 中进行 UPSERT(合并、插入……重复更新)?【英文标题】:How to UPSERT (MERGE, INSERT ... ON DUPLICATE UPDATE) in PostgreSQL? 【发布时间】:2013-06-20 11:37:15 【问题描述】:

这里一个非常常见的问题是如何进行 upsert,这就是 mysql 所称的 INSERT ... ON DUPLICATE UPDATE 并且标准支持作为 MERGE 操作的一部分。

鉴于 PostgreSQL 不直接支持它(在 pg 9.5 之前),你如何做到这一点?考虑以下几点:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

现在假设您要“插入”元组 (2, 'Joe')(3, 'Alan'),因此新的表格内容将是:

(1, 'fred'),
(2, 'Joe'),    -- Changed value of existing tuple
(3, 'Alan')    -- Added new tuple

这就是人们在讨论upsert 时所谈论的内容。至关重要的是,任何方法都必须在存在多个事务在同一个表上工作的情况下是安全的 - 通过使用显式锁定或以其他方式防御由此产生的竞争条件。

Insert, on duplicate update in PostgreSQL? 广泛讨论了该主题,但这是关于 MySQL 语法的替代方案,并且随着时间的推移,它的一些不相关的细节越来越多。我正在寻找明确的答案。

这些技术对于“如果不存在就插入,否则什么都不做”也很有用,即“在重复键忽略时插入 ...”。

【问题讨论】:

Insert, on duplicate update in PostgreSQL?的可能重复 @MichaelHampton 的目标是创建一个不会被多个过时答案混淆的最终版本 - 并且被锁定,因此没有人可以对此做任何事情。我不同意近距离投票。 为什么,那么这很快就会过时 - 并且被锁定,所以没有人可以做任何事情。 @MichaelHampton 如果您担心,也许您可​​以标记您链接的那个并要求将其解锁以便清理,然后我们可以合并它。我只是厌倦了唯一明显的关闭作为 upsert 如此混乱和错误的混乱。 那个问答没有锁定! 【参考方案1】:

9.5 及更高版本:

PostgreSQL 9.5 和更新版本支持INSERT ... ON CONFLICT (key) DO UPDATE(和ON CONFLICT (key) DO NOTHING),即upsert。

Comparison with ON DUPLICATE KEY UPDATE.

Quick explanation.

有关用法,请参阅 the manual - 特别是语法图中的 conflict_action 子句,以及 the explanatory text。

与下面给出的 9.4 及更早版本的解决方案不同,此功能适用于多个冲突的行,并且不需要排他锁定或重试循环。

The commit adding the feature is here 和 the discussion around its development is here。


如果您使用的是 9.5 并且不需要向后兼容,您现在可以停止阅读


9.4 及以上:

PostgreSQL 没有任何内置的UPSERT(或MERGE)设施,面对并发使用,高效地做到这一点非常困难。

This article discusses the problem in useful detail.

一般来说,您必须在两个选项之间进行选择:

重试循环中的单独插入/更新操作;或 锁定表并进行批量合并

单行重试循环

如果您希望多个连接同时尝试执行插入,则在重试循环中使用单独的行 upsert 是合理的选择。

The PostgreSQL documentation contains a useful procedure that'll let you do this in a loop inside the database。与大多数幼稚的解决方案不同,它可以防止丢失更新和插入竞争。它只能在READ COMMITTED 模式下工作,并且只有当它是您在事务中唯一做的事情时才是安全的。如果触发器或辅助唯一键导致唯一违规,该功能将无法正常工作。

这种策略非常低效。只要可行,您应该排队工作并按照如下所述进行批量更新。

许多针对此问题的尝试解决方案都没有考虑回滚,因此会导致更新不完整。两笔交易相互竞争;其中之一成功INSERTs;另一个得到一个重复的键错误,而是执行UPDATEUPDATE 阻塞等待 INSERT 回滚或提交。当它回滚时,UPDATE 条件重新检查匹配零行,因此即使 UPDATE 提交它实际上并没有完成您预期的 upsert。您必须检查结果行数并在必要时重试。

一些尝试的解决方案也未能考虑 SELECT 比赛。如果您尝试显而易见且简单的方法:

-- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE.

BEGIN;

UPDATE testtable
SET somedata = 'blah'
WHERE id = 2;

-- Remember, this is WRONG. Do NOT COPY IT.

INSERT INTO testtable (id, somedata)
SELECT 2, 'blah'
WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2);

COMMIT;

那么当两个同时运行时,会出现几种故障模式。一个是已经讨论过的更新重新检查问题。另一个是UPDATE 同时匹配零行并继续。然后他们都进行EXISTS 测试,该测试发生在INSERT 之前。两者都得到零行,所以都做INSERT。一个因重复键错误而失败。

这就是您需要重试循环的原因。您可能认为可以使用巧妙的 SQL 防止重复键错误或丢失更新,但事实并非如此。您需要检查行数或处理重复键错误(取决于选择的方法)并重试。

请不要为此推出您自己的解决方案。就像消息队列一样,它可能是错误的。

带锁的批量更新插入

有时您想进行批量更新插入,其中您有一个新数据集,您希望将其合并到旧的现有数据集中。这大大比单独的行 upserts 更有效,并且应该在可行时首选。

在这种情况下,您通常遵循以下流程:

CREATETEMPORARY

COPY 或将新数据批量插入到临时表中

LOCK 目标表IN EXCLUSIVE MODE。这允许对SELECT 进行其他事务处理,但不允许对表进行任何更改。

使用临时表中的值对现有记录执行UPDATE ... FROM

对目标表中尚不存在的行执行INSERT

COMMIT,释放锁。

例如,对于问题中给出的示例,使用多值INSERT 填充临时表:

BEGIN;

CREATE TEMPORARY TABLE newvals(id integer, somedata text);

INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan');

LOCK TABLE testtable IN EXCLUSIVE MODE;

UPDATE testtable
SET somedata = newvals.somedata
FROM newvals
WHERE newvals.id = testtable.id;

INSERT INTO testtable
SELECT newvals.id, newvals.somedata
FROM newvals
LEFT OUTER JOIN testtable ON (testtable.id = newvals.id)
WHERE testtable.id IS NULL;

COMMIT;

相关阅读

UPSERT wiki page UPSERTisms in Postgres Insert, on duplicate update in PostgreSQL? http://petereisentraut.blogspot.com/2010/05/merge-syntax.html Upsert with a transaction Is SELECT or INSERT in a function prone to race conditions? SQL MERGE on the PostgreSQL wiki Most idiomatic way to implement UPSERT in Postgresql nowadays

MERGE 呢?

SQL 标准 MERGE 实际上定义不明确的并发语义,不适合在不先锁定表的情况下进行更新插入。

对于数据合并来说,这是一个非常有用的 OLAP 语句,但对于并发安全的 upsert,它实际上并不是一个有用的解决方案。对于使用其他 DBMS 的人有很多建议可以使用 MERGE 进行更新插入,但实际上是错误的。

其他数据库:

INSERT ... ON DUPLICATE KEY UPDATE in MySQL MERGE from MS SQL Server(但请参阅上面关于 MERGE 的问题) MERGE from Oracle(但请参阅上面关于 MERGE 问题的内容)

【讨论】:

在批量 upsert 中,从 newvals 中删除而不是过滤 INSERT 是否有可能的价值?例如。 WITH upd AS (UPDATE ... RETURNING newvals.id) DELETE FROM newvals USING upd WHERE newvals.id = upd.id,然后是一个裸插入测试表 SELECT * FROM newvals?我的想法是:不要在 INSERT 中过滤两次(对于 JOIN/WHERE 和唯一约束),重用来自 UPDATE 的存在检查结果,这些结果已经在 RAM 中,并且可能要小得多。如果匹配的行数很少和/或 newvals 比 testtable 小得多,这可能是一个胜利。 仍有未解决的问题,对于其他供应商来说,尚不清楚哪些有效,哪些无效。 1. 如前所述,Postgres 循环解决方案在多个唯一键的情况下不起作用。 2. mysql 的 on duplicate key 也不适用于多个唯一键。 3. 上面发布的 MySQL、SQL Server 和 Oracle 的其他解决方案是否有效?在这些情况下是否可能出现异常,我们是否必须循环? Postgres 现在支持 UPSERT - git.postgresql.org/gitweb/… 示例是黄金 - 那为什么没有呢?实际工作示例,不是三个点的东西,是实际 UPSERT 的语法正确示例。请问可以给我们吗?编辑 - 在这里找到一些示例kb.objectrocket.com/postgresql/… 如果在谷歌搜索时 SO 可以是最后一站,那就太好了。编辑 #2 在下面的答案中找到了一些示例,遗憾的是它没有被接受。 @Petr 我链接到用户手册。我在你所说的功能出现之前就写了这个。想要改变吗?提出修改建议并添加您自己想要的示例!【参考方案2】:

以下是insert ... on conflict ... (pg 9.5+) 的一些示例:

插入,冲突时 - 什么都不做
insert into dummy(id, name, size) values(1, 'new_name', 3)
on conflict do nothing;`  
插入,冲突时 - 更新,通过指定冲突目标。
insert into dummy(id, name, size) values(1, 'new_name', 3)
on conflict(id)
do update set name = 'new_name', size = 3;  
插入,冲突时 - 更新,通过约束名称指定冲突目标。
insert into dummy(id, name, size) values(1, 'new_name', 3)
on conflict on constraint dummy_pkey
do update set name = 'new_name', size = 4;

【讨论】:

很好的答案 - 问题:为什么或在什么情况下应该通过列或约束名称使用目标规范?各种用例是否有优势/劣势? @NathanBenton 我认为至少有两个区别:(1)列名由程序员指定,而约束名可能由程序员指定,或者由数据库根据表/列名生成。 (2) 每列可能有多个约束。也就是说,选择使用哪一个取决于您的情况。 简单易懂,kudo!【参考方案3】:

我正在尝试为 PostgreSQL 9.5 之前版本的单插入问题提供另一种解决方案。这个想法只是尝试首先执行插入,如果记录已经存在,则更新它:

do $$
begin 
  insert into testtable(id, somedata) values(2,'Joe');
exception when unique_violation then
  update testtable set somedata = 'Joe' where id = 2;
end $$;

请注意,只有在没有删除表行的情况下,才能应用此解决方案

我不知道这个解决方案的效率,但在我看来它足够合理。

【讨论】:

谢谢,这正是我想要的。不明白为什么这么难找。 是的。当且仅当没有删除时,这种简化才有效。 @CraigRinger 你能解释一下如果删除会发生什么吗? @turbanoff 插入可能会失败,因为记录已经存在,然后它被同时删除,然后更新会影响零行,因为该行已被删除。 @CraigRinger 所以。 删除是同时发生的。如果这个 is 工作正常,有什么可能的出路?如果删除是同时进行的——那么它可以在我们的块之后执行。我想说的是 - 如果我们有并发删除 - 那么这段代码以 same 的方式正常工作 insert on update【参考方案4】:

用于 Postgres 的 SQLAlchemy upsert >=9.5

由于上面的大型帖子涵盖了 Postgres 版本的许多不同 SQL 方法(不仅是问题中的非 9.5),如果您使用的是 Postgres 9.5,我想在 SQLAlchemy 中添加如何做到这一点。除了实现自己的 upsert,您还可以使用 SQLAlchemy 的函数(在 SQLAlchemy 1.1 中添加)。就个人而言,如果可能的话,我会推荐使用这些。不仅因为方便,还因为它让 PostgreSQL 可以处理任何可能发生的竞争条件。

我昨天给出的另一个答案的交叉发布 (https://***.com/a/44395983/2156909)

SQLAlchemy 现在支持ON CONFLICT,有两种方法on_conflict_do_update()on_conflict_do_nothing()

从文档中复制:

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
    )
conn.execute(stmt)

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert

【讨论】:

问题中没有提到 Python 和 SQLAlchemy。 我在写的解决方案中经常使用Python。但我还没有研究过 SQLAlchemy(或者没有意识到)。这似乎是一个优雅的选择。谢谢你。如果审核通过,我会将其提交给我的组织。【参考方案5】:
WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 
RETURNING ID),
INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD))
INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS

在 Postgresql 9.3 上测试

【讨论】:

@CraigRinger:你能详细说明一下吗? cte 不是原子的吗? @parisni 否。如果执行写入,每个 CTE 术语都会获得自己的快照。此外,没有对 not 找到的行执行任何谓词锁定,因此它们仍然可以由另一个会话同时创建。如果您使用SERIALIZABLE 隔离,您会因序列化失败而中止,否则您可能会遇到独特的违规行为。不要重新发明 u​​psert,重新发明将是错误的。使用INSERT ... ON CONFLICT ...。如果您的 PostgreSQL 太旧,请更新它。 @CraigRinger INSERT ... ON CLONFLICT ... 不适用于批量加载。从您的帖子来看,CTE 中的 LOCK TABLE testtable IN EXCLUSIVE MODE; 是一种获得原子事物的解决方法。没有? @parisni 它不适合批量加载?谁说的? postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT 。当然,它比没有类似 upsert 行为的批量加载要慢得多,但这很明显,无论你做什么都会如此。它比使用子事务要快得多,这是肯定的。最快的方法是锁定目标表,然后执行insert ... where not exists ... 或类似的操作,当然。 WITH upsert AS ( UPDATE tbl SET foo = 42 RETURNING * ) INSERT INTO tbl(foo) SELECT 42 WHERE NOT EXISTS (SELECT * FROM upsert); - 这对我有用【参考方案6】:

由于this question 已关闭,我将在此处发布您如何使用 SQLAlchemy 进行操作。通过递归,它重试批量插入或更新以对抗race conditions 和验证错误。

首先导入

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

现在有几个辅助函数

def chunk(content, chunksize=None):
    """Groups data into chunks each with (at most) `chunksize` items.
    https://***.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)


def gen_resources(records):
    """Yields a dictionary if the record's id already exists, a row object 
    otherwise.
    """
    ids = item[0] for item in session.query(Posts.id)

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # It's a row but the id already exists, so we need to convert it 
            # to a dict that updates the existing record. Since it is duplicate,
            # also yield True
            yield record.to_dict(), True
        elif is_row:
            # It's a row and the id doesn't exist, so no conversion needed. 
            # Since it's not a duplicate, also yield False
            yield record, False
        elif record['id'] in ids:
            # It's a dict and the id already exists, so no conversion needed. 
            # Since it is duplicate, also yield True
            yield record, True
        else:
            # It's a dict and the id doesn't exist, so we need to convert it. 
            # Since it's not a duplicate, also yield False
            yield Posts(**record), False

最后是 upsert 函数

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # A record was added or deleted after we checked, so retry
                # 
                # modify accordingly by adding additional exceptions, e.g.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Some other error occurred so reduce chunksize to isolate the 
                # offending row(s)
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Error adding record '.format(items[0]))

这是你如何使用它

>>> data = [
...     'id': 1, 'text': 'updated post1', 
...     'id': 5, 'text': 'updated post5', 
...     'id': 1000, 'text': 'new post1000']
... 
>>> upsert(data)

bulk_save_objects 相比,它的优势在于它可以在插入时处理关系、错误检查等(与bulk operations 不同)。

【讨论】:

这对我来说也看起来不对劲。如果在您收集 ID 列表后并发会话插入一行怎么办?还是删掉一个? 好点 @CraigRinger 我做了类似的事情,但只有 1 个会话执行这项工作。那么处理多个会话的最佳方法是什么?也许是一笔交易? 事务并不是所有并发问题的神奇解决方案。您可以使用SERIALIZABLE 事务并处理序列化失败,但速度很慢。您需要错误处理和重试循环。请参阅我的答案和其中的“相关阅读”部分。 @CraigRinger 明白了。由于其他验证失败,我实际上在自己的情况下实现了重试循环。我会相应地更新这个答案。

以上是关于如何在 PostgreSQL 中进行 UPSERT(合并、插入……重复更新)?的主要内容,如果未能解决你的问题,请参考以下文章

如何确定 upsert 是不是是 PostgreSQL 9.5+ UPSERT 的更新?

如何在 flask_sqlalchemy 中使用 PostgreSQL 的“INSERT...ON CONFLICT”(UPSERT)功能?

如何使用 clojure.java.jdbc 进行 UPSERT

如何将 PostgreSQL“merge_db”(又名 upsert)函数翻译成 MySQL

SQLAlchemy - 在 postgresql 中执行批量 upsert(如果存在,更新,否则插入)

PostgreSQL Upsert 用于几乎相似的值