Python-PostgreSQL psycopg2 接口 --> executemany
Posted
技术标签:
【中文标题】Python-PostgreSQL psycopg2 接口 --> executemany【英文标题】:Python-PostgreSQL psycopg2 interface --> executemany 【发布时间】:2010-09-28 15:05:28 【问题描述】:我目前正在分析一个***转储文件;我正在使用 python 从中提取一堆数据并将其保存到 PostgreSQL 数据库中。我一直在努力让事情变得更快,因为这个文件很大(18GB)。为了与 PostgreSQL 交互,我使用 psycopg2,但这个模块似乎模仿了许多其他这样的 DBAPI。
无论如何,我有一个关于 cursor.executemany(command, values); 的问题。在我看来,每 1000 个值左右执行一次 executemany 比为这 500 万个值中的每一个调用 cursor.execute(command % value) 更好(请确认或纠正我!)。
但是,你看,我正在使用 executemany 将 1000 行插入到具有唯一完整性约束的表中;这个约束事先没有在 python 中验证,因为这要么需要我一直选择(这似乎适得其反),要么需要我获得超过 3 GB 的 RAM。所有这一切都说明当我的脚本试图通过捕获 psycopg2.DatabaseError 来插入已经存在的行时,我依靠 Postgres 来警告我。
当我的脚本检测到这样一个非唯一的插入时,它 connection.rollback() (每次最多可包含 1000 行,并且有点使执行操作毫无价值),然后一个一个地插入所有值。
由于 psycopg2 的文档记录很差(还有很多很棒的模块......),我找不到有效的解决方法。我已将每个 executemany 插入的值的数量从 1000 减少到 100,以减少每个 executemany 的非唯一插入的可能性,但我很确定它们是一种告诉 psycopg2 忽略这些异常或告诉光标继续执行。
基本上,这似乎是一种解决方案如此简单和流行的问题,我所能做的就是询问以了解它。
再次感谢!
【问题讨论】:
我不确定,但我认为 executemany 只是迭代您的字典(行)列表并在每个上调用“插入”。因此,如果您在循环中调用 execute 或调用 executemany,这并没有什么区别。只是不应该在循环中调用“提交”,而是每 100 或 1000 一次,视情况而定。 所以它就像:outerloop->从列表中获取 1000 行 -> 给内部循环 -> 为每个执行 -> 内部循环退出 -> 提交 -> 外部循环继续直到数据持续。您可以在 100,000 个数据集上针对 excutemany 进行尝试,并检查它是否有所作为。 JV,所以你的意思是一个executemany仍然IPC与每个INSERT的postgeSQL通信?我希望通过使用 executemany 来消除 IPC 固有的开销;如果它不能消除这一点,我就没有足够的理由使用它。谢谢,但我仍然需要更多的信念! -尼克 executemany 准备了一条 SQL 语句,以便在您需要经常调用它时可以更快地执行它。请不要在数据库查询中使用字符串插值 (%)。使用 cursor.execute(sql_stmt, (arg1, arg2, ...)。这会正确地转义您的数据。 好的,感谢 cursor.execute 上的字符串插值信息。所以你说 executemany 准备 SQL 语句,这样接下来的 500 万次调用会更快;在我看来, cursor.execute 也是这样做的。我仍然想知道 executemany 是否只会发送一个 IPC 调用? 【参考方案1】:只需使用 psql \copy 命令将所有数据复制到临时表中,或使用 psycopg cursor.copy_in() 方法。那么:
insert into mytable
select * from (
select distinct *
from scratch
) uniq
where not exists (
select 1
from mytable
where mytable.mykey = uniq.mykey
);
与任何插入组合相比,这将消除重复数据并运行得更快。
-dg
【讨论】:
【参考方案2】:我遇到了同样的问题,在这里搜索了很多天,收集了很多提示,形成了一个完整的解决方案。即使问题已过时,我希望这对其他人有用。
1) 忘记删除索引/约束并在以后重新创建它们,好处是微不足道的或更糟。
2) executemany 比 execute 更好,因为它为您创建了准备语句。您可以使用以下命令自己获得相同的结果,以提高 300% 的速度:
# To run only once:
sqlCmd = """PREPARE myInsert (int, timestamp, real, text) AS
INSERT INTO myBigTable (idNumber, date_obs, result, user)
SELECT $1, $2, $3, $4 WHERE NOT EXISTS
(SELECT 1 FROM myBigTable WHERE (idNumber, date_obs, user)=($1, $2, $4));"""
curPG.execute(sqlCmd)
cptInsert = 0 # To let you commit from time to time
#... inside the big loop:
curPG.execute("EXECUTE myInsert(%s,%s,%s,%s);", myNewRecord)
allreadyExists = (curPG.rowcount < 1)
if not allreadyExists:
cptInsert += 1
if cptInsert % 10000 == 0:
conPG.commit()
这个虚拟表示例对 (idNumber, date_obs, user) 有一个唯一约束。
3) 最好的解决方案是使用 COPY_FROM 和 TRIGGER 来管理插入前的唯一键。这让我的速度提高了 36 倍。我开始以 500 条记录/秒的速度正常插入。通过“复制”,我每秒获得了超过 18,000 条记录。带有 Psycopg2 的 Python 示例代码:
ioResult = StringIO.StringIO() #To use a virtual file as a buffer
cptInsert = 0 # To let you commit from time to time - Memory has limitations
#... inside the big loop:
print >> ioResult, "\t".join(map(str, myNewRecord))
cptInsert += 1
if cptInsert % 10000 == 0:
ioResult = flushCopyBuffer(ioResult, curPG)
#... after the loop:
ioResult = flushCopyBuffer(ioResult, curPG)
def flushCopyBuffer(bufferFile, cursorObj):
bufferFile.seek(0) # Little detail where lures the deamon...
cursorObj.copy_from(bufferFile, 'myBigTable',
columns=('idNumber', 'date_obs', 'value', 'user'))
cursorObj.connection.commit()
bufferFile.close()
bufferFile = StringIO.StringIO()
return bufferFile
这就是 Python 部分的内容。现在 Postgresql 触发器没有异常 psycopg2.IntegrityError 然后所有 COPY 命令的记录被拒绝:
CREATE OR REPLACE FUNCTION chk_exists()
RETURNS trigger AS $BODY$
DECLARE
curRec RECORD;
BEGIN
-- Check if record's key already exists or is empty (file's last line is)
IF NEW.idNumber IS NULL THEN
RETURN NULL;
END IF;
SELECT INTO curRec * FROM myBigTable
WHERE (idNumber, date_obs, user) = (NEW.idNumber, NEW.date_obs, NEW.user);
IF NOT FOUND THEN -- OK keep it
RETURN NEW;
ELSE
RETURN NULL; -- Oups throw it or update the current record
END IF;
END;
$BODY$ LANGUAGE plpgsql;
现在将此函数链接到表的触发器:
CREATE TRIGGER chk_exists_before_insert
BEFORE INSERT ON myBigTable FOR EACH ROW EXECUTE PROCEDURE chk_exists();
这似乎需要做很多工作,但是当 Postgresql 不必一遍又一遍地解释 SQL 时,它是一个非常快速的野兽。玩得开心。
【讨论】:
【参考方案3】:“当我的脚本检测到这样的非 UNIQUE INSERT 时,它 connection.rollback() (每次最多包含 1000 行,并且有点使 executemany 毫无价值)然后一个一个地插入所有值。”
这个问题真的没有多大意义。
每个 1,000 行的块是否由于非唯一行而失败?
1,000 行中的 1 个块是否失败(在 5,000 个这样的块中)?如果是这样,那么执行很多对 5,000 中的 4,999 有帮助,并且远非“毫无价值”。
您是否担心这种非独特的插入?或者您有关于这种情况发生次数的实际统计数据吗?
如果您已从 1,000 行块切换到 100 行块,那么您显然可以确定 1,000 行块、100 行块和 1 行块是否具有性能优势。
请用实际数据库和不同大小的块实际运行实际程序并发布数字。
【讨论】:
【参考方案4】:使用 MERGE 语句而不是 INSERT 语句可以解决您的问题。
【讨论】:
从 8.5 beta 2 开始,PostgreSQL 根本不支持 MERGE 语句。以上是关于Python-PostgreSQL psycopg2 接口 --> executemany的主要内容,如果未能解决你的问题,请参考以下文章
ImportError:没有名为“psycopg2._psycopg”的模块
Django,mod_wsgi,psycopg2 配置不当:加载 psycopg2 模块时出错:没有名为 _psycopg 的模块
为啥我的 psycopg2 查询因 `psycopg2.errors.InternalError_: Assert` 而失败?