使用 JooQ 从 CSV 中“批量插入”并同时跟踪插入的记录?

Posted

技术标签:

【中文标题】使用 JooQ 从 CSV 中“批量插入”并同时跟踪插入的记录?【英文标题】:Using JooQ to "batch insert" from a CSV _and_ keep track of inserted records at the same time? 【发布时间】:2015-05-19 03:41:14 【问题描述】:

我有一个 CSV,它有 3400 万行长。是的,不开玩笑。

这是由parser tracer 生成的 CSV 文件,然后将其导入corresponding debugging program。

问题出在后者。

现在我一一导入所有行:

private void insertNodes(final DSLContext jooq)
    throws IOException

    try (
        final Stream<String> lines = Files.lines(nodesPath, UTF8);
    ) 
        lines.map(csvToNode)
            .peek(ignored -> status.incrementProcessedNodes())
            .forEach(r -> jooq.insertInto(NODES).set(r).execute());
    

csvToNode 只是一个映射器,它将String(CSV 的一行)转换为NodesRecord 以供插入。

现在,一行:

            .peek(ignored -> status.incrementProcessedNodes())

嗯...方法名称几乎说明了一切;它增加了status 中的一个计数器,它反映了到目前为止处理的行数。

发生的情况是每秒都会查询此status 对象以获取有关加载过程状态的信息(我们在这里谈论的是 3400 万行;加载它们大约需要 15 分钟)。

但现在 jooq 有了这个(取自他们的文档),可以直接从 CSV 加载:

create.loadInto(AUTHOR)
      .loadCSV(inputstream)
      .fields(ID, AUTHOR_ID, TITLE)
      .execute();

(虽然我个人从不使用 .loadCSV() 重载,因为它没有考虑 CSV 编码)。

当然,JooQ 会设法将其转化为合适的构造,以便为这个或那个数据库引擎最大化吞吐量。

然而,问题是我丢失了从当前代码中获得的“按秒”信息......如果我用select count(*) from the_victim_table 替换查询,那就有点不合时宜了,更不用说这可能是需要很长时间。

那么,我如何获得“两全其美”?也就是说,有没有办法使用“优化的 CSV 加载”和查询,足够快,并且在任何时候,到目前为止已经插入了多少行?

(注意:如果有关系,我目前使用 H2;还计划推出 PostgreSQL 版本)

【问题讨论】:

【参考方案1】:

有很多方法可以优化它。

自定义负载分区

优化查询执行的一种方法是将值集收集到:

批量语句(如INSERT INTO t VALUES(1), (2), (3), (4)) 批处理语句(如 JDBC 批处理) 提交段(在 N 条语句后提交)

... 而不是一一执行。这也是Loader API 所做的(见下文)。所有这些措施都可以大大提高加载速度。

这是您目前可以“收听”加载进度的唯一方式。

使用 jOOQ 3.6+ 加载分区

(这还没有发布,但很快就会发布)

jOOQ 在 jOOQ 3.6 中原生实现了上述三个分区措施

使用供应商特定的 CSV 加载机制

jOOQ 将始终需要通过 JDBC,因此可能不会为您提供 fastest 选项。大多数数据库都有自己的加载 API,例如你提到的那些:

H2:http://www.h2database.com/html/tutorial.html#csv PostgreSQL:http://www.postgresql.org/docs/current/static/sql-copy.html

这将是更底层的,但肯定比其他任何东西都快。

一般说明

发生的情况是每秒查询一次此状态对象以获取有关加载过程状态的信息(我们在这里讨论的是 3400 万行;加载它们大约需要 15 分钟)。

这是一个非常有趣的想法。将其注册为Loader API 的功能请求:Using JooQ to "batch insert" from a CSV _and_ keep track of inserted records at the same time?

虽然我个人从不使用 .loadCSV() 重载,因为它没有考虑 CSV 编码

感谢您的评论,我们已经为 jOOQ 3.6 修复了该问题:https://github.com/jOOQ/jOOQ/issues/4141

当然,JooQ 会设法将其转化为合适的构造,以便为这个或那个数据库引擎最大化吞吐量。

不,jOOQ 没有对最大化吞吐量做出任何假设。这非常困难,并且取决于您的数据库供应商以外的许多其他因素,例如:

桌子上的约束 表上的索引 开启/关闭日志记录 等

jOOQ 可以帮助您自己最大化吞吐量。例如,在 jOOQ 3.5+ 中,您可以:

设置提交率(例如,每 1000 行提交一次)以避免在打开日志记录的情况下插入较长的 UNDO / REDO 日志。这可以通过commitXXX() 方法完成。

在 jOOQ 3.6+ 中,您还可以:

设置批量语句速率(例如,在一条语句中组合 10 行)以显着加快执行速度。这可以通过bulkXXX() 方法完成。 设置批处理语句速率(例如,在单个 JDBC 批处理中组合 10 条语句)以显着加快执行速度 (see this blog post for details)。这可以通过batchXXX() 方法完成。

【讨论】:

好吧,我得试试这一切!一如既往,感谢您详细准确的回答!不幸的是,我暂时没有时间实际实现它......

以上是关于使用 JooQ 从 CSV 中“批量插入”并同时跟踪插入的记录?的主要内容,如果未能解决你的问题,请参考以下文章

从 Excel / CSV 批量插入到 SQL Server

如何使用 JOOQ 批量执行

jOOQ - 寻求有关从 CSV 快速插入的建议

从缺少值的 csv 文件中批量插入 (SQL)

JOOQ 插入到具有大量记录的选择中

从 csv 文件批量插入 - 忽略有错误的行 - SQL Server