使用 JDBC 将 CSV 复制到具有自定义类型数组的 Postgres

Posted

技术标签:

【中文标题】使用 JDBC 将 CSV 复制到具有自定义类型数组的 Postgres【英文标题】:CSV copy to Postgres with array of custom type using JDBC 【发布时间】:2018-11-09 18:35:51 【问题描述】:

我在我的数据库中定义了一个自定义类型

CREATE TYPE address AS (ip inet, port int);

还有一个在数组中使用这种类型的表:

CREATE TABLE my_table (
  addresses  address[] NULL
)

我有一个包含以下内容的示例 CSV 文件

(10.10.10.1,80),(10.10.10.2,443)
(10.10.10.3,8080),(10.10.10.4,4040)

我使用以下代码 sn-p 来执行我的 COPY:

    Class.forName("org.postgresql.Driver");

    String input = loadCsvFromFile();

    Reader reader = new StringReader(input);

    Connection connection = DriverManager.getConnection(
            "jdbc:postgresql://db_host:5432/db_name", "user",
            "password");

    CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();

    String copyCommand = "COPY my_table (addresses) " + 
                         "FROM STDIN WITH (" + 
                           "DELIMITER '\t', " + 
                           "FORMAT csv, " + 
                           "NULL '\\N', " + 
                           "ESCAPE '\"', " +
                           "QUOTE '\"')";

    copyManager.copyIn(copyCommand, reader);

执行此程序会产生以下异常:

Exception in thread "main" org.postgresql.util.PSQLException: ERROR: malformed record literal: "(10.10.10.1"
  Detail: Unexpected end of input.
  Where: COPY only_address, line 1, column addresses: "(10.10.10.1,80),(10.10.10.2,443)"
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2422)
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1114)
    at org.postgresql.core.v3.QueryExecutorImpl.endCopy(QueryExecutorImpl.java:963)
    at org.postgresql.core.v3.CopyInImpl.endCopy(CopyInImpl.java:43)
    at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:185)
    at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:160)

我尝试过输入中括号的不同组合,但似乎无法让 COPY 正常工作。有什么想法我可能会出错吗?

【问题讨论】:

使用复制是硬性要求,还是即使速度较慢也可以使用插入? @GregoryArenius 根据我们的吞吐量和成本分析,我们确实需要支持 COPY。 您是否尝试转储 (pg_dump) 现有数据并检查 CSV 语法和 PostgreSQL 自己生成的 COPY 命令? CSV 文件的确切形状是什么? (10.10.10.1,80),(10.10.10.2,443) (10.10.10.3,8080),(10.10.10.4,4040) 不是 CSV :) @jbet 在两个结构之间有一个新行'\n'(10.10.10.1,80),(10.10.10.2,443)\n(10.10.10.3,8080),(10.10.10.4,4040) 【参考方案1】:

请参阅 https://git.mikael.io/mikaelhg/pg-object-csv-copy-poc/ 了解具有 JUnit 测试的项目,该测试可以满足您的需求。

基本上,您希望能够将逗号用于两件事:分隔数组项和分隔类型字段,但您不希望 CSV 解析将逗号解释为字段描述符。

所以

    您想告诉 CSV 解析器将整行视为一个字符串、一个字段,您可以通过将其括在单引号中并告诉 CSV 解析器这一点来做到这一点,并且 您希望 PG 字段解析器将每个数组项类型实例考虑用双引号括起来。

代码:

copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);

DML 示例 1:

COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''

CSV 示例 1:

'"(10.0.0.1,1)","(10.0.0.2,2)"'
'"(10.10.10.1,80)","(10.10.10.2,443)"'
'"(10.10.10.3,8080)","(10.10.10.4,4040)"'

DML 示例 2,转义双引号:

COPY my_table (addresses) FROM STDIN WITH CSV

CSV 示例 2,转义双引号:

"""(10.0.0.1,1)"",""(10.0.0.2,2)"""
"""(10.10.10.1,80)"",""(10.10.10.2,443)"""
"""(10.10.10.3,8080)"",""(10.10.10.4,4040)"""

完整的 JUnit 测试类:

package io.mikael.poc;

import com.google.common.io.CharStreams;
import org.junit.*;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.testcontainers.containers.PostgreSQLContainer;

import java.io.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;

import static java.nio.charset.StandardCharsets.UTF_8;

public class CopyTest 

    private Reader reader;

    private Connection connection;

    private CopyManager copyManager;

    private static final String CREATE_TYPE = "CREATE TYPE address AS (ip inet, port int)";

    private static final String CREATE_TABLE = "CREATE TABLE my_table (addresses  address[] NULL)";

    private String loadCsvFromFile(final String fileName) throws IOException 
        try (InputStream is = getClass().getResourceAsStream(fileName)) 
            return CharStreams.toString(new InputStreamReader(is, UTF_8));
        
    

    @ClassRule
    public static PostgreSQLContainer db = new PostgreSQLContainer("postgres:10-alpine");

    @BeforeClass
    public static void beforeClass() throws Exception 
        Class.forName("org.postgresql.Driver");
    

    @Before
    public void before() throws Exception 
        String input = loadCsvFromFile("/data_01.csv");
        reader = new StringReader(input);

        connection = DriverManager.getConnection(db.getJdbcUrl(), db.getUsername(), db.getPassword());
        copyManager = connection.unwrap(PGConnection.class).getCopyAPI();

        connection.setAutoCommit(false);
        connection.beginRequest();

        connection.prepareCall(CREATE_TYPE).execute();
        connection.prepareCall(CREATE_TABLE).execute();
    

    @After
    public void after() throws Exception 
        connection.rollback();
    

    @Test
    public void copyTest01() throws Exception 
        copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);

        final StringWriter writer = new StringWriter();
        copyManager.copyOut("COPY my_table TO STDOUT WITH CSV", writer);
        System.out.printf("roundtrip:%n%s%n", writer.toString());

        final ResultSet rs = connection.prepareStatement(
                "SELECT array_to_json(array_agg(t)) FROM (SELECT addresses FROM my_table) t")
                .executeQuery();
        rs.next();
        System.out.printf("json:%n%s%n", rs.getString(1));
    


测试输出:

roundtrip:
"""(10.0.0.1,1)"",""(10.0.0.2,2)"""
"""(10.10.10.1,80)"",""(10.10.10.2,443)"""
"""(10.10.10.3,8080)"",""(10.10.10.4,4040)"""

json:
["addresses":["ip":"10.0.0.1","port":1,"ip":"10.0.0.2","port":2],"addresses":["ip":"10.10.10.1","port":80,"ip":"10.10.10.2","port":443],"addresses":["ip":"10.10.10.3","port":8080,"ip":"10.10.10.4","port":4040]]

【讨论】:

【参考方案2】:

CSV 格式中,当您指定分隔符时,您不能将其用作数据中的字符,除非您对其进行转义! 使用逗号作为分隔符的 csv 文件示例

正确记录:data1, data2 解析结果:[0] => data1 [1] => data2

一个不正确的:data,1, data2 解析结果:[0] => data [1] => 1 [2] => data2

最后您不需要将文件加载为 csv,而是作为一个简单文件,因此将您的方法 loadCsvFromFile(); 替换为

public String loadRecordsFromFile(File file) 
 LineIterator it = FileUtils.lineIterator(file, "UTF-8");
 StringBuilder sb = new StringBuilder();
 try 
   while (it.hasNext()) 
     sb.append(it.nextLine()).append(System.nextLine);
   
  
 finally 
   LineIterator.closeQuietly(iterator);
 

 return sb.toString();

不要忘记在你的 pom 文件中添加这个依赖项

<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->

    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>

或者从commons.apache.org下载JAR

【讨论】:

【参考方案3】:

1NF

首先,我认为您的表格设计是错误的,因为它不符合1NF。每个字段都应该只包含原子属性,但事实并非如此。为什么不是这样的表格:

CREATE TABLE my_table (
    id,
    ip inet,
    port int
)

id 是源文件中的行号,ip/port 是该行中的地址之一? 样本数据:

id | ip         | port
-----------------------
1  | 10.10.10.1 | 80
1  | 10.10.10.2 | 443
2  | 10.10.10.3 | 8080
2  | 10.10.10.4 | 4040
...

因此,您将能够在单个地址上查询您的数据库(查找所有关联的地址,如果两个地址在同一行上,则返回 true,无论您想要什么...)。

加载数据

但是让我们假设您知道自己在做什么。这里的主要问题是您的输入数据文件采用特殊格式。它可能是一个单列 CSV 文件,但它会是一个非常退化的 CSV 文件。无论如何,您必须先转换这些行,然后再将它们插入数据库。你有两个选择:

    您读取输入文件的每一行并创建一个INSERT(这可能需要一段时间); 您将输入文件转换为具有预期格式的文本文件并使用COPY

一一插入

第一个选项似乎很简单:对于 csv 文件的第一行 (10.10.10.1,80),(10.10.10.2,443),您必须运行查询:

INSERT INTO my_table VALUES (ARRAY[('10.10.10.1',80),('10.10.10.2',443)]::address[], 4)

为此,您只需创建一个新字符串:

String value = row.replaceAll("\\", "ARRAY[")
                    .replaceAll("\\", "]::address[]")
                    .replaceAll("\\(([0-9.]+),", "'$1'");
String sql = String.format("INSERT INTO my_table VALUES (%s)", value);

并对输入文件的每一行执行查询(或者为了更好的安全性,使用prepared statement)。

插入COPY

我将详细说明第二个选项。您必须在 Java 代码中使用:

copyManager.copyIn(sql, from);

其中复制查询是COPY FROM STDIN 语句,from 是读取器。声明将是:

COPY my_table (addresses) FROM STDIN WITH (FORMAT text);

要为副本管理器提供数据,您需要以下数据(注意引号):

"(10.10.10.1,80)","(10.10.10.2,443)"
"(10.10.10.3,8080)","(10.10.10.4,4040)"

带有临时文件

以正确格式获取数据的更简单方法是创建一个临时文件。您读取输入文件的每一行并将( 替换为"( 并将) 替换为)"。将此处理后的行写入临时文件。然后将此文件的阅读器传递给副本管理器。

在飞行中

有两个线程 您可以使用两个线程:

线程 1 读取输入文件,逐行处理并将它们写入PipedWriter

线程 2 将连接到前一个 PipedWriterPipedReader 传递给复制管理器。

主要的困难在于以这样一种方式同步线程,即线程 2 在线程 1 开始将数据写入PipedWriter 之前开始读取PipedReader。示例见this project of mine。

使用自定义阅读器 from 阅读器可能是类似(原始版本)的实例:

class DataReader extends Reader 
    PushbackReader csvFileReader;
    private boolean wasParenthese;

    public DataReader(Reader csvFileReader) 
        this.csvFileReader = new PushbackReader(csvFileReader, 1);
        wasParenthese = false;
    

    @Override
    public void close() throws IOException 
        this.csvFileReader.close();
    

    @Override
    public int read(char[] cbuf, int off, int len) throws IOException 
        // rely on read()
        for (int i = off; i < off + len; i++) 
            int c = this.read();
            if (c == -1) 
                return i-off > 0 ? i-off : -1;
            
            cbuf[i] = (char) c;
        
        return len;
    

    @Override
    public int read() throws IOException 
        final int c = this.csvFileReader.read();
        if (c == '(' && !this.wasParenthese) 
            this.wasParenthese = true;
            this.csvFileReader.unread('(');
            return '"'; // add " before (
         else 
            this.wasParenthese = false;
            if (c == ')') 
                this.csvFileReader.unread('"');
                return ')';  // add " after )
             else 
                return c;
            
        
    

(这是一个幼稚的版本,因为正确的做法是仅覆盖public int read(char[] cbuf, int off, int len)。但是您应该处理cbuf以添加引号并存储推到右侧的额外字符:这是有点乏味)。 现在,如果r 是文件的阅读者:

(10.10.10.1,80),(10.10.10.2,443)
(10.10.10.3,8080),(10.10.10.4,4040)

只需使用:

Class.forName("org.postgresql.Driver");
Connection connection = DriverManager
        .getConnection("jdbc:postgresql://db_host:5432/db_base", "user", "passwd");

CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
copyManager.copyIn("COPY my_table FROM STDIN WITH (FORMAT text)", new DataReader(r));

批量加载

如果你正在加载大量数据,不要忘记the basic tips:禁用自动提交,删除索引和约束,并使用TRUNCATEANALYZE,如下所示:

TRUNCATE my_table;
COPY ...;
ANALYZE my_table;

这将加快加载速度。

【讨论】:

以上是关于使用 JDBC 将 CSV 复制到具有自定义类型数组的 Postgres的主要内容,如果未能解决你的问题,请参考以下文章

从 0jdbc6 JDBCthin 驱动程序调用具有自定义对象返回类型的 Oracle PL/SQL 过程

如何使用 pandas 或 python 将具有数百万行的表从 PostgreSQL 复制到 Amazon Redshift

如何为具有映射到多个柴油列的自定义字段的类型派生 Queryable?

如何只用4步,实现一个自定义JDBC驱动?

如何只用4步,实现一个自定义JDBC驱动?

如何只用4步,实现一个自定义JDBC驱动?