psycopg2:用一个查询插入多行

Posted

技术标签:

【中文标题】psycopg2:用一个查询插入多行【英文标题】:psycopg2: insert multiple rows with one query 【发布时间】:2011-12-29 09:27:58 【问题描述】:

我需要用一个查询插入多行(行数不是恒定的),所以我需要像这样执行查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);

我知道的唯一方法是

args = [(1,2), (3,4), (5,6)]
args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args)
cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)

但我想要一些更简单的方法。

【问题讨论】:

【参考方案1】:

execute_batch 自发布此问题以来已添加到 psycopg2。

比execute_values快。

【讨论】:

查看其他 cmets。 psycopg2 的方法execute_valuesexecute_batch【参考方案2】:

上述@jopseph.sheedy (https://***.com/users/958118/joseph-sheedy) (https://***.com/a/30721460/11100064) 提供的cursor.copyfrom 解决方案确实快如闪电。

但是,他给出的示例通常不适用于具有任意数量字段的记录,我花了一段时间才弄清楚如何正确使用它。

IteratorFile 需要像这样用制表符分隔的字段来实例化(r 是一个字典列表,其中每个字典都是一条记录):

    f = IteratorFile("0\t1\t2\t3\t4".format(r["id"],
        r["type"],
        r["item"],
        r["month"],
        r["revenue"]) for r in records)

为了概括任意数量的字段,我们将首先创建一个带有正确数量的制表符和字段占位符的行字符串:"\t\t....\t",然后使用.format() 填写字段值:*list(r.values())) for r in records

        line = "\t".join([""] * len(records[0]))

        f = IteratorFile(line.format(*list(r.values())) for r in records)

gist here中的完整函数。

【讨论】:

【参考方案3】:

executemany接受元组数组

https://www.postgresqltutorial.com/postgresql-python/insert/

    """ array of tuples """
    vendor_list = [(value1,)]

    """ insert multiple vendors into the vendors table  """
    sql = "INSERT INTO vendors(vendor_name) VALUES(%s)"
    conn = None
    try:
        # read database configuration
        params = config()
        # connect to the PostgreSQL database
        conn = psycopg2.connect(**params)
        # create a new cursor
        cur = conn.cursor()
        # execute the INSERT statement
        cur.executemany(sql,vendor_list)
        # commit the changes to the database
        conn.commit()
        # close communication with the database
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

【讨论】:

【参考方案4】:

使用 psycopg2 2.7 更新:

经典的 executemany() 比 @ant32 的实现(称为“折叠”)慢大约 60 倍,如以下线程所述:https://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib.com

此实现在 2.7 版中添加到 psycopg2 中,称为execute_values()

from psycopg2.extras import execute_values
execute_values(cur,
    "INSERT INTO test (id, v1, v2) VALUES %s",
    [(1, 2, 3), (4, 5, 6), (7, 8, 9)])

上一个答案:

要插入多行,使用多行 VALUES 语法和 execute() 比使用 psycopg2 executemany() 快大约 10 倍。事实上,executemany() 只是运行许多单独的 INSERT 语句。

@ant32 的代码在 Python 2 中完美运行。但在 Python 3 中,cursor.mogrify() 返回字节,cursor.execute() 接受字节或字符串,','.join() 需要 str 实例。

所以在 Python 3 中你可能需要修改 @ant32 的代码,添加 .decode('utf-8'):

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str)

或仅使用字节(b''b""):

args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute(b"INSERT INTO table VALUES " + args_bytes) 

【讨论】:

谢谢,更新后的答案效果很好。请不要忘记 conn.commit() 以保持更改。 那么execute_values() 比@ant32 快吗?【参考方案5】:

最后在 SQLalchemy1.2 版本中,添加了这个新实现以在使用 use_batch_mode=True 初始化引擎时使用 psycopg2.extras.execute_batch() 而不是 executemany,例如:

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@host/dbname",
    use_batch_mode=True)

http://docs.sqlalchemy.org/en/latest/changelog/migration_12.html#change-4109

然后有人将不得不使用 SQLalchmey 不会费心尝试 sqla 和 psycopg2 的不同组合并一起直接 SQL..

【讨论】:

【参考方案6】:

如果您使用的是 SQLAlchemy,则不需要手工制作字符串,因为 SQLAlchemy supports generating a multi-row VALUES clause for a single INSERT statement:

rows = []
for i, name in enumerate(rawdata):
    row = 
        'id': i,
        'name': name,
        'valid': True,
    
    rows.append(row)
if len(rows) > 0:  # INSERT fails if no rows
    insert_query = SQLAlchemyModelName.__table__.insert().values(rows)
    session.execute(insert_query)

【讨论】:

在底层 SQLAlchemy 使用 Psychopg2 的 executemany() 来进行这样的调用,因此这个答案对于大型查询会产生严重的性能问题。见执行方法docs.sqlalchemy.org/en/latest/orm/session_api.html。 我不这么认为。自从我看到这个以来已经有点过了,但是 IIRC,这实际上是在 insert_query 行中构建一个插入语句。然后,session.execute() 只是用一个大字符串调用 psycopg2 的execute() 语句。所以“诀窍”是首先构建整个插入语句对象。我正在使用它一次插入 200,000 行,与普通的 executemany() 相比,使用此代码可以显着提高性能。 您链接到的 SQLAlchemy 文档有一个部分准确显示了它的工作原理,甚至说:“必须注意传递多个值与使用传统的 executemany() 形式不同”。所以它明确指出这是有效的。 我的立场是正确的。我没有注意到您对 values() 方法的使用(没有它,SQLAlchemy 只会执行许多操作)。我会说编辑答案以包含指向该文档的链接,以便我可以更改我的投票,但显然您已经包含它。也许提到这与使用 dicts 列表调用带有 execute() 的 insert() 不是一回事? 与 execute_values 相比表现如何?【参考方案7】:

多年来,我一直在使用 ant32 的答案。但是我发现这在 python 3 中是一个错误,因为mogrify 返回一个字节字符串。

显式转换为字节字符串是使代码兼容 python 3 的简单解决方案。

args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) 
cur.execute(b"INSERT INTO table VALUES " + args_str)

【讨论】:

一个更简单的选择是解码cur.mogrify()【参考方案8】:

Psycopg 2.7 中的新 execute_values method:

data = [(1,'x'), (2,'y')]
insert_query = 'insert into t (a, b) values %s'
psycopg2.extras.execute_values (
    cursor, insert_query, data, template=None, page_size=100
)

Psycopg 2.6 中的 Pythonic 方式:

data = [(1,'x'), (2,'y')]
records_list_template = ','.join(['%s'] * len(data))
insert_query = 'insert into t (a, b) values '.format(records_list_template)
cursor.execute(insert_query, data)

说明:如果要插入的数据以元组列表的形式给出,如

data = [(1,'x'), (2,'y')]

那么它已经是完全需要的格式了

    insert 子句的 values 语法需要一个记录列表,如

    insert into t (a, b) values (1, 'x'),(2, 'y')

    Psycopg 将 Python tuple 改编为 Postgresql record

唯一必要的工作就是提供一个记录列表模板供psycopg填写

# We use the data list to be sure of the template length
records_list_template = ','.join(['%s'] * len(data))

并将其放在insert 查询中

insert_query = 'insert into t (a, b) values '.format(records_list_template)

打印insert_query 输出

insert into t (a, b) values %s,%s

现在到通常的Psycopg 参数替换

cursor.execute(insert_query, data)

或者只是测试将发送到服务器的内容

print (cursor.mogrify(insert_query, data).decode('utf8'))

输出:

insert into t (a, b) values (1, 'x'),(2, 'y')

【讨论】:

这种方法的性能与cur.copy_from相比如何? Here's a gist with a benchmark。在拥有 1000 万条记录的机器上,copy_from 的速度提高了大约 6.5 倍。 使用 execute_values 我能够让我的系统以每分钟 1k 条记录到每分钟 128k 条记录的速度运行 直到我在execute_values(...) 之后调用了connection.commit(),我的插入才被正确注册。 @Phillipp 对于每个执行语句都是正常的,除非您处于自动提交模式。【参考方案9】:

我构建了一个程序,可以将多行插入到位于另一个城市的服务器中。

我发现使用这种方法比executemany 快了大约 10 倍。在我的例子中,tup 是一个包含大约 2000 行的元组。使用此方法大约需要 10 秒:

args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup)
cur.execute("INSERT INTO table VALUES " + args_str) 

使用此方法时需要 2 分钟:

cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)

【讨论】:

两年后仍然非常重要。今天的经验表明,随着您要推送的行数的增加,使用execute 策略越好。由于这个原因,我看到了大约 100 倍的加速! 也许executemany 在每次插入后运行一次提交。如果您将整个事情包装在一个事务中,也许这会加快事情的速度? 刚刚自己确认了这一改进。从我读到的 psycopg2 的 executemany 没有做任何优化,只是循环并执行许多 execute 语句。使用这种方法,向远程服务器插入 700 行的时间从 60 秒缩短到 也许我是偏执狂,但是用+ 连接查询似乎可以打开sql 注入,我觉得@Clodoaldo Neto execute_values() 解决方案更安全。 如果有人遇到以下错误:[TypeError: sequence item 0: expected str instance, bytes found] 请改为运行此命令 [args_str = ','.join(cur.mogrify("(% s,%s)", x).decode("utf-8") for x in tup)]【参考方案10】:

在 Postgres 术语中,所有这些技术都称为“扩展插入”,截至 2016 年 11 月 24 日,它仍然比 Psychopg2 的 executemany() 和此线程中列出的所有其他方法(我之前尝试过)快很多来到这个答案)。

这里有一些不使用 cur.mogrify 的代码,它很好,很简单:

valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns.
sqlrows = []
rowsPerInsert = 3 # more means faster, but with diminishing returns..
for row in getSomeData:
        # row == [1, 'a', 'yolo', ... ]
        sqlrows += row
        if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0:
                # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ]
                insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert)
                cur.execute(insertSQL, sqlrows)
                con.commit()
                sqlrows = []
insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows))
cur.execute(insertSQL, sqlrows)
con.commit()

但需要注意的是,如果可以使用copy_from(),则应该使用copy_from ;)

【讨论】:

起死回生,但是最后几行的情况呢?我假设您实际上在最后剩余的行上再次运行最后一个子句,以防您有偶数行? 正确,抱歉,我在编写示例时一定忘记这样做了——这对我来说太愚蠢了。不这样做不会给人们带来错误,这让我担心有多少人复制/粘贴解决方案并继续他们的业务......无论如何,非常感谢 mcpeterson - 谢谢!【参考方案11】:

cursor.copy_from 是迄今为止我为批量插入找到的最快的解决方案。 Here's a gist 我包含一个名为 IteratorFile 的类,它允许生成字符串的迭代器像文件一样被读取。我们可以使用生成器表达式将每个输入记录转换为字符串。所以解决方案是

args = [(1,2), (3,4), (5,6)]
f = IteratorFile(("\t".format(x[0], x[1]) for x in args))
cursor.copy_from(f, 'table_name', columns=('a', 'b'))

对于这种微不足道的 args 大小,它不会产生太大的速度差异,但在处理数千行以上时,我看到了很大的加速。它也将比构建一个巨大的查询字符串更节省内存。迭代器一次只能在内存中保存一个输入记录,在某些时候,您将通过构建查询字符串在 Python 进程或 Postgres 中耗尽内存。

【讨论】:

Here is a benchmark 将 copy_from/IteratorFile 与查询构建器解决方案进行比较。在拥有 1000 万条记录的机器上,copy_from 的速度提高了大约 6.5 倍。 您是否需要处理转义字符串和时间戳等问题? 是的,您必须确保您拥有格式良好的 TSV 记录。【参考方案12】:

如果您想在一个插入语句中插入多行(假设您没有使用 ORM),到目前为止对我来说最简单的方法是使用字典列表。这是一个例子:

 t = ['id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6,
      'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7,
      'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8]

conn.execute("insert into campaign_dates
             (id, start_date, end_date, campaignid) 
              values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);",
             t)

如您所见,只会执行一个查询:

INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);
INFO sqlalchemy.engine.base.Engine ['campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00', 'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00', 'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00']
INFO sqlalchemy.engine.base.Engine COMMIT

【讨论】:

显示来自 sqlalchemy 引擎的日志记录并不是仅运行单个查询的演示,它只是意味着 sqlalchemy 引擎运行了一个命令。在幕后,这是使用 Psychopg2 的 executemany,这是非常低效的。见执行方法docs.sqlalchemy.org/en/latest/orm/session_api.html。【参考方案13】:

另一种不错且有效的方法 - 将插入的行作为 1 个参数传递, 这是 json 对象的数组。

例如你传递参数:

[ id: 18, score: 1,  id: 19, score: 5 ]

它是数组,里面可以包含任意数量的对象。 然后你的 SQL 看起来像:

INSERT INTO links (parent_id, child_id, score) 
SELECT 123, (r->>'id')::int, (r->>'score')::int 
FROM unnest($1::json[]) as r 

注意:你的 postgress 必须足够新,才能支持 json

【讨论】:

【参考方案14】:

来自 Psycopg2 的教程页面 Postgresql.org (see bottom) 的 sn-p:

我想向您展示的最后一项是如何使用字典插入多行。如果您有以下情况:

namedict = ("first_name":"Joshua", "last_name":"Drake",
            "first_name":"Steven", "last_name":"Foo",
            "first_name":"David", "last_name":"Bar")

您可以使用以下方法轻松地在字典中插入所有三行:

cur = conn.cursor()
cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)

它并没有节省多少代码,但它确实看起来更好。

【讨论】:

这将运行许多单独的INSERT 语句。有用,但与单个多 VALUEd 插入不同。 并且在同一个文档中写了cur.executemany语句会自动遍历字典,对每一行执行INSERT查询。【参考方案15】:

使用 aiopg - 下面的 sn-p 可以正常工作

    # items = [10, 11, 12, 13]
    # group = 1
    tup = [(gid, pid) for pid in items]
    args_str = ",".join([str(s) for s in tup])
    # insert into group values (1, 10), (1, 11), (1, 12), (1, 13)
    yield from cur.execute("INSERT INTO group VALUES " + args_str)

【讨论】:

此方法对 SQL 注入不安全。正如 psycopg2 文档所述 (that aiopg2 links to):'Never, never, NEVER use Python string concatenation (+) or string parameters interpolation (%) to pass variables to a SQL query string. Not even at gunpoint.'

以上是关于psycopg2:用一个查询插入多行的主要内容,如果未能解决你的问题,请参考以下文章

psycopg2 无法插入特定列

psycopg2“选择更新”

Psycopg2 使用占位符插入表格

Psycopg2 在 postgres 数据库中插入 python 字典

Python/postgres/psycopg2:获取刚刚插入的行的 ID

为啥我的 psycopg2 查询因 `psycopg2.errors.InternalError_: Assert` 而失败?