使用 jdbc 将行批量插入 Spanner 时加载性能低

Posted

技术标签:

【中文标题】使用 jdbc 将行批量插入 Spanner 时加载性能低【英文标题】:low loading performance while batch inserting rows into Spanner using jdbc 【发布时间】:2020-12-08 02:04:24 【问题描述】:

背景:我正在尝试将 TSV 格式的数据文件(从 mysql 数据库转储)加载到 GCP Spanner 表中。

客户端库:官方 Spanner JDBC 依赖 v1.15.0 表架构:两个字符串类型的列和十个 int 类型的列 GCP Spanner 实例:配置为具有 5 个节点的多区域 nam6

我的加载程序在 GCP VM 中运行,并且是访问 Spanner 实例的专有客户端。启用自动提交。批量插入是我的程序执行的唯一 DML 操作,批量大小约为 1500。在每次提交中,它完全使用了突变限制,即 20000。同时,提交大小低于 5MB(值两个字符串类型的列是小型的)。行根据主键的第一列进行分区,以便每次提交都可以发送到很少的分区以获得更好的性能。

通过上述所有配置和优化,插入速率仅为每秒 1k 行左右。这真的让我很失望,因为我有超过 8 亿行要插入。我确实注意到the official doc 提到了大约。多区域 Spanner 实例的峰值写入(总 QPS)为 1800。

所以我在这里有两个问题:

    考虑到如此低的峰值写入 QPS,是否意味着 GCP 不希望或不支持客户将大型数据集迁移到多区域 Spanner 实例? 我发现 Spanner 监控的读取延迟很高。我没有任何阅读请求。我的猜测是,在写入行时 Spanner 需要首先读取并检查是否存在具有相同主键的行。如果我的猜测是正确的,为什么要花这么多时间?如果没有,我能否获得有关这些读取操作如何发生的任何指导?

【问题讨论】:

我之前也遇到过类似的问题,我使用了 Knut 建议的 Mutations API 解决方案——(maven 指令:cloud.google.com/spanner/docs/reference/libraries)。一旦添加到您的项目中,这里是一些关于如何使用突变的示例代码:cloud.google.com/spanner/docs/modify-mutation-api 不知道为什么,但是对于批量插入,突变往往比使用常规 DML 语句更有效。 @CowZow 请查看 Knut 添加的附加信息。 【参考方案1】:

我不太清楚您是如何设置加载数据的客户端应用程序。我最初的印象是您的客户端应用程序可能没有并行执行足够的事务。您通常应该能够每秒插入超过 1,000 行,但这需要您并行执行多个事务,可能来自多个 VM。我使用以下简单示例来测试从本地计算机到 节点 Spanner 实例的负载吞吐量,这给了我大约 1,500 行/秒的吞吐量。

使用在与 Spanner 实例位于同一网络区域的一个或多个 VM 中运行的客户端应用程序的多节点设置应该能够实现更高的容量。

import com.google.api.client.util.Base64;
import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TestJdbc 

  public static void main(String[] args) 
    final int threads = 512;
    ExecutorService executor = Executors.newFixedThreadPool(threads);
    watch = Stopwatch.createStarted();
    for (int i = 0; i < threads; i++) 
      executor.submit(new InsertRunnable());
    
  

  static final AtomicLong rowCount = new AtomicLong();
  static Stopwatch watch;

  static final class InsertRunnable implements Runnable 
    @Override
    public void run() 
      try (Connection connection =
          DriverManager.getConnection(
              "jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) 
        while (true) 
          try (PreparedStatement ps =
              connection.prepareStatement("INSERT INTO Test (Id, Col1, Col2) VALUES (?, ?, ?)")) 
            for (int i = 0; i < 150; i++) 
              ps.setLong(1, rnd.nextLong());
              ps.setString(2, randomString(100));
              ps.setString(3, randomString(100));
              ps.addBatch();
              rowCount.incrementAndGet();
            
            ps.executeBatch();
          
          System.out.println("Rows inserted: " + rowCount);
          System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
        
       catch (SQLException e) 
        throw new RuntimeException(e);
      
    

    private final Random rnd = new Random();

    private String randomString(int maxLength) 
      byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
      rnd.nextBytes(bytes);
      return Base64.encodeBase64String(bytes);
    
  

您还可以尝试调整其他几项以获得更好的结果:

减少每批次的行数可能会产生更好的整体结果。 如果可能,使用 InsertOrUpdate 突变对象比使用 DML 语句更有效(参见下面的示例)。

使用Mutation 代替 DML 的示例:

import com.google.api.client.util.Base64;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.jdbc.CloudSpannerJdbcConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class TestJdbc 

  public static void main(String[] args) 
    final int threads = 512;
    ExecutorService executor = Executors.newFixedThreadPool(threads);
    watch = Stopwatch.createStarted();
    for (int i = 0; i < threads; i++) 
      executor.submit(new InsertOrUpdateMutationRunnable());
    
  

  static final AtomicLong rowCount = new AtomicLong();
  static Stopwatch watch;

  static final class InsertOrUpdateMutationRunnable implements Runnable 
    @Override
    public void run() 
      try (Connection connection =
          DriverManager.getConnection(
              "jdbc:cloudspanner:/projects/my-project/instances/my-instance/databases/my-db")) 
        CloudSpannerJdbcConnection csConnection = connection.unwrap(CloudSpannerJdbcConnection.class);
        CloudSpannerJdbcConnection csConnection =
            connection.unwrap(CloudSpannerJdbcConnection.class);
        while (true) 
          ImmutableList.Builder<Mutation> builder = ImmutableList.builder();
          for (int i = 0; i < 150; i++) 
            builder.add(
                Mutation.newInsertOrUpdateBuilder("Test")
                    .set("Id")
                    .to(rnd.nextLong())
                    .set("Col1")
                    .to(randomString(100))
                    .set("Col2")
                    .to(randomString(100))
                    .build());
            rowCount.incrementAndGet();
          
          csConnection.write(builder.build());
          System.out.println("Rows inserted: " + rowCount);
          System.out.println("Rows/second: " + rowCount.get() / watch.elapsed(TimeUnit.SECONDS));
        
        
       catch (SQLException e) 
        throw new RuntimeException(e);
      
    

    private final Random rnd = new Random();

    private String randomString(int maxLength) 
      byte[] bytes = new byte[rnd.nextInt(maxLength / 2) + 1];
      rnd.nextBytes(bytes);
      return Base64.encodeBase64String(bytes);
    
  

上面的简单示例为我提供了大约 35,000 行/秒的吞吐量,无需进一步调整。

附加信息 2020-08-21:突变对象比(批量)DML 语句更有效的原因是 DML 语句在内部被 Cloud Spanner 转换为读取查询,然后用于创建突变。这种转换需要对批处理中的每个 DML 语句进行,这意味着具有 1,500 个简单插入语句的 DML 批处理将触发 1,500 个(小)读取查询,并且需要转换为 1,500 个突变。这很可能也是您在监控中看到的读取延迟背后的原因。

您是否介意分享一些有关您的客户端应用程序外观以及您正在运行的实例数量的更多信息?

【讨论】:

此外,通过在 Spanner Java 客户端库和 database.writeAtLeastOnce() 中直接使用 Mutations 可以获得一些小的性能改进——这意味着将只使用一个 RPC 来编写批处理。 您好 Knut,感谢您的回复。我的加载程序与您的第一种方法完全相同。在我采用了你的第二种方法后(只是在数据访问层做一些改变),我看到了巨大的性能提升,并且可以达到每秒 100+k 行,而无需过多的调整,这对我来说绝对足够了。 再次感谢您提供更多信息。对于 Spanner 用户来说,了解并应该在 cloud.google.com/spanner/docs/bulk-loading 页面上提及这一点非常重要。 仅供参考,DML and Mutations - a tale of two data altering techniques in Cloud Spanner 提供了 DML 和突变之间的一些额外差异。 DML 在每条语句之后进行约束检查,这也可以解释为什么它比缓冲突变的突变 API 慢,并且只在提交时检查约束。【参考方案2】:

要插入超过 8 亿行,并且看到您是 Java 程序员,我可以建议在 Dataflow 上使用 Beam 吗?

spanner writer in Beam 旨在尽可能高效地写入 - 按相似键对行进行分组,并按照您的操作对它们进行批处理。 Beam on Dataflow 还可以使用多个工作虚拟机并行执行多个文件读取和 spanner 写入...

使用多区域 spanner 实例,您应该能够获得大约 1800 rows per node per second 插入速度(如果行小且成批处理,则速度更快,正如 Knut 的回复所建议的那样),并且使用 5 个 spanner 节点,您可能有 10 到 20 个并行运行的导入程序线程 - 无论是使用您的导入程序还是使用 Dataflow。

(披露:我是 Beam SpannerIO 维护者)

【讨论】:

以上是关于使用 jdbc 将行批量插入 Spanner 时加载性能低的主要内容,如果未能解决你的问题,请参考以下文章

用于 Cloud Spanner 的 Simba JDBC 驱动程序与 Spark JDBC DataFrame 阅读器一起使用

jdbc-批量插入批量删除批量更新

JDBC批量插入性能简单分析

使用 JDBC 将数据插入雪花

jdbc 批量插入和查询与使用生成键的单次插入

如何从 Oracle 中的 JDBC 批量插入中获取生成的密钥?