Python-PostgreSQL psycopg2 接口 --> executemany

Posted

技术标签:

【中文标题】Python-PostgreSQL psycopg2 接口 --> executemany【英文标题】:Python-PostgreSQL psycopg2 interface --> executemany 【发布时间】:2010-09-28 15:05:28 【问题描述】:

我目前正在分析一个***转储文件;我正在使用 python 从中提取一堆数据并将其保存到 PostgreSQL 数据库中。我一直在努力让事情变得更快,因为这个文件很大(18GB)。为了与 PostgreSQL 交互,我使用 psycopg2,但这个模块似乎模仿了许多其他这样的 DBAPI。

无论如何,我有一个关于 cursor.executemany(command, values); 的问题。在我看来,每 1000 个值左右执行一次 executemany 比为这 500 万个值中的每一个调用 cursor.execute(command % value) 更好(请确认或纠正我!)。

但是,你看,我正在使用 executemany 将 1000 行插入到具有唯一完整性约束的表中;这个约束事先没有在 python 中验证,因为这要么需要我一直选择(这似乎适得其反),要么需要我获得超过 3 GB 的 RAM。所有这一切都说明当我的脚本试图通过捕获 psycopg2.DatabaseError 来插入已经存在的行时,我依靠 Postgres 来警告我。

当我的脚本检测到这样一个非唯一的插入时,它 connection.rollback() (每次最多可包含 1000 行,并且有点使执行操作毫无价值),然后一个一个地插入所有值。

由于 psycopg2 的文档记录很差(还有很多很棒的模块......),我找不到有效的解决方法。我已将每个 executemany 插入的值的数量从 1000 减少到 100,以减少每个 executemany 的非唯一插入的可能性,但我很确定它们是一种告诉 psycopg2 忽略这些异常或告诉光标继续执行。

基本上,这似乎是一种解决方案如此简单和流行的问题,我所能做的就是询问以了解它。

再次感谢!

【问题讨论】:

我不确定,但我认为 executemany 只是迭代您的字典(行)列表并在每个上调用“插入”。因此,如果您在循环中调用 execute 或调用 executemany,这并没有什么区别。只是不应该在循环中调用“提交”,而是每 100 或 1000 一次,视情况而定。 所以它就像:outerloop->从列表中获取 1000 行 -> 给内部循环 -> 为每个执行 -> 内部循环退出 -> 提交 -> 外部循环继续直到数据持续。您可以在 100,000 个数据集上针对 excutemany 进行尝试,并检查它是否有所作为。 JV,所以你的意思是一个executemany仍然IPC与每个INSERT的postgeSQL通信?我希望通过使用 executemany 来消除 IPC 固有的开销;如果它不能消除这一点,我就没有足够的理由使用它。谢谢,但我仍然需要更多的信念! -尼克 executemany 准备了一条 SQL 语句,以便在您需要经常调用它时可以更快地执行它。请不要在数据库查询中使用字符串插值 (%)。使用 cursor.execute(sql_stmt, (arg1, arg2, ...)。这会正确地转义您的数据。 好的,感谢 cursor.execute 上的字符串插值信息。所以你说 executemany 准备 SQL 语句,这样接下来的 500 万次调用会更快;在我看来, cursor.execute 也是这样做的。我仍然想知道 executemany 是否只会发送一个 IPC 调用? 【参考方案1】:

只需使用 psql \copy 命令将所有数据复制到临时表中,或使用 psycopg cursor.copy_in() 方法。那么:

insert into mytable
select * from (
    select distinct * 
    from scratch
) uniq
where not exists (
    select 1 
    from mytable 
    where mytable.mykey = uniq.mykey
);

与任何插入组合相比,这将消除重复数据并运行得更快。

-dg

【讨论】:

【参考方案2】:

我遇到了同样的问题,在这里搜索了很多天,收集了很多提示,形成了一个完整的解决方案。即使问题已过时,我希望这对其他人有用。

1) 忘记删除索引/约束并在以后重新创建它们,好处是微不足道的或更糟。

2) executemany 比 execute 更好,因为它为您创建了准备语句。您可以使用以下命令自己获得相同的结果,以提高 300% 的速度:

# To run only once:
sqlCmd = """PREPARE myInsert (int, timestamp, real, text) AS
   INSERT INTO myBigTable (idNumber, date_obs, result, user)
     SELECT $1, $2, $3, $4 WHERE NOT EXISTS
     (SELECT 1 FROM myBigTable WHERE (idNumber, date_obs, user)=($1, $2, $4));"""
curPG.execute(sqlCmd)
cptInsert = 0   # To let you commit from time to time

#... inside the big loop:
curPG.execute("EXECUTE myInsert(%s,%s,%s,%s);", myNewRecord)
allreadyExists = (curPG.rowcount < 1)
if not allreadyExists:
   cptInsert += 1
   if cptInsert % 10000 == 0:
      conPG.commit()

这个虚拟表示例对 (idNumber, date_obs, user) 有一个唯一约束。

3) 最好的解决方案是使用 COPY_FROM 和 TRIGGER 来管理插入前的唯一键。这让我的速度提高了 36 倍。我开始以 500 条记录/秒的速度正常插入。通过“复制”,我每秒获得了超过 18,000 条记录。带有 Psycopg2 的 Python 示例代码:

ioResult = StringIO.StringIO() #To use a virtual file as a buffer
cptInsert = 0 # To let you commit from time to time - Memory has limitations
#... inside the big loop:
   print >> ioResult, "\t".join(map(str, myNewRecord))
   cptInsert += 1
   if cptInsert % 10000 == 0:
      ioResult = flushCopyBuffer(ioResult, curPG)
#... after the loop:
ioResult = flushCopyBuffer(ioResult, curPG)

def flushCopyBuffer(bufferFile, cursorObj):
   bufferFile.seek(0)   # Little detail where lures the deamon...
   cursorObj.copy_from(bufferFile, 'myBigTable',
      columns=('idNumber', 'date_obs', 'value', 'user'))
   cursorObj.connection.commit()
   bufferFile.close()
   bufferFile = StringIO.StringIO()
   return bufferFile

这就是 Python 部分的内容。现在 Postgresql 触发器没有异常 psycopg2.IntegrityError 然后所有 COPY 命令的记录被拒绝:

CREATE OR REPLACE FUNCTION chk_exists()
  RETURNS trigger AS $BODY$
DECLARE
    curRec RECORD;
BEGIN
   -- Check if record's key already exists or is empty (file's last line is)
   IF NEW.idNumber IS NULL THEN
      RETURN NULL;
   END IF;
   SELECT INTO curRec * FROM myBigTable
      WHERE (idNumber, date_obs, user) = (NEW.idNumber, NEW.date_obs, NEW.user);
   IF NOT FOUND THEN -- OK keep it
      RETURN NEW;
   ELSE    
      RETURN NULL; -- Oups throw it or update the current record
   END IF;
END;
$BODY$ LANGUAGE plpgsql;

现在将此函数链接到表的触发器:

CREATE TRIGGER chk_exists_before_insert
   BEFORE INSERT ON myBigTable FOR EACH ROW EXECUTE PROCEDURE chk_exists();

这似乎需要做很多工作,但是当 Postgresql 不必一遍又一遍地解释 SQL 时,它是一个非常快速的野兽。玩得开心。

【讨论】:

【参考方案3】:

“当我的脚本检测到这样的非 UNIQUE INSERT 时,它 connection.rollback() (每次最多包含 1000 行,并且有点使 executemany 毫无价值)然后一个一个地插入所有值。”

这个问题真的没有多大意义。

每个 1,000 行的块是否由于非唯一行而失败?

1,000 行中的 1 个块是否失败(在 5,000 个这样的块中)?如果是这样,那么执行很多对 5,000 中的 4,999 有帮助,并且远非“毫无价值”。

您是否担心这种非独特的插入?或者您有关于这种情况发生次数的实际统计数据吗?

如果您已从 1,000 行块切换到 100 行块,那么您显然可以确定 1,000 行块、100 行块和 1 行块是否具有性能优势。

请用实际数据库和不同大小的块实际运行实际程序并发布数字。

【讨论】:

【参考方案4】:

使用 MERGE 语句而不是 INSERT 语句可以解决您的问题。

【讨论】:

从 8.5 beta 2 开始,PostgreSQL 根本不支持 MERGE 语句。

以上是关于Python-PostgreSQL psycopg2 接口 --> executemany的主要内容,如果未能解决你的问题,请参考以下文章

ImportError:没有名为“psycopg2._psycopg”的模块

Django,mod_wsgi,psycopg2 配置不当:加载 psycopg2 模块时出错:没有名为 _psycopg 的模块

为啥我的 psycopg2 查询因 `psycopg2.errors.InternalError_: Assert` 而失败?

psycopg2“选择更新”

尝试使用“pip install psycopg2”在 Mac 上安装 psycopg2 时出现权限错误

安装 psycopg2 osx 10.12 python 3.5.2 - psycopg2 的构建轮失败