如何有效地与多个线程并行查询 google-cloud-spanner?

Posted

技术标签:

【中文标题】如何有效地与多个线程并行查询 google-cloud-spanner?【英文标题】:How to efficiently query google-cloud-spanner in parallel with multiple threads? 【发布时间】:2021-06-22 23:38:23 【问题描述】:

(对不起,这是 TL;DR;但我很绝望,想要彻底!)

我们正在将一项服务从 AWS 迁移到 GCP,并从 DynamoDB 切换到 Cloud Spanner 作为后端数据存储。

数据存储 (spanner) 包含 Web 服务用户查询的数据。在生产负载中,被查询的数据在 1% 到 10% 的时间内被发现。我有一个简单的多线程 Java 测试客户端,它查询我们的服务,只要过去 1 分钟的平均吞吐量增加,就会不断添加新线程。

我的测试客户端在 GCE 虚拟机(64 CPU)上运行,当使用 DynamoDB 数据源时,我可以获得多达 3700 个线程,一旦我们的服务自动扩展到配置的 pod最大节点数。对于每 1000 个请求(10% 的命中率),每个线程从 Dynamo 读取 100 个哈希值。

我现在需要将我的 Java 客户端切换到查询扳手,以获取 10% 的请求中使用的数据。我的查询通常如下所示:

SELECT A, B, C FROM data_table LIMIT 250 OFFSET XXX

理论上,我希望每个线程选择唯一行的块。我使用 OFFSET 来启动从唯一位置读取的每个线程,一旦每个记录块都用完,我将 OFFSET 增加到startingOffset + totalRows 并选择另一个数据块。

我意识到这个查询可能不会转化为每个实现,但是每个线程都可以在线程的生命周期内查询 spanner 以获取唯一数据集的概念应该成立。

我尝试将 java-spanner-jdbc 与 c3p0 连接池一起使用,并且只通过标准 DriverManager.getConnection() 路由。我使用了最小/最大会话配置以及 numChannels,但似乎没有什么可以帮助我扩大规模。 TBH,我还是不明白会话和频道之间的关联。

我还使用 singleUseReadOnlyTransaction()、batchReadOnlyTransaction() 和最近的 txn.partitionQuery() 尝试了原生 SpannerDB 客户端。

因为 partitionQuery() 感觉很像 DynamoDB 代码,所以这感觉是正确的方向,但是因为我的查询(基于 https://cloud.google.com/spanner/docs/reads 的“并行读取数据”示例)有一个 LIMIT 子句,我'我得到错误:

com.google.cloud.spanner.SpannerException:INVALID_ARGUMENT: com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException:INVALID_ARGUMENT:查询不是根 可分区,因为它在根目录下没有 DistributedUnion。 请运行 EXPLAIN 查询计划详情。

删除 LIMIT 子句可以解决这个问题,但是查询需要一个永恒的时间!

所以问题是,如果 partitionQuery() 路由是正确的,我如何使用“分页”限制进行并行查询?如果这不是最佳路线,我应该使用什么来获得最佳的并行读取吞吐量以及每个线程的唯一数据集?

[编辑] 根据 Knut Olav Loite 下面的评论,分区或批量查询不是正确的方法,所以我回到了单次使用的只读查询。

这是我创建 spannerDbClient 的代码:

RetrySettings retrySettings = RetrySettings.newBuilder()
    .setInitialRpcTimeout(Duration.ofSeconds(SPANNER_INITIAL_TIMEOUT_RETRY_SECONDS))
    .setMaxRpcTimeout(Duration.ofSeconds(SPANNER_MAX_TIMEOUT_RETRY_SECONDS))
    .setMaxAttempts(SPANNER_MAX_RETRY_ATTEMPTS)
    .setTotalTimeout(Duration.ofSeconds(SPANNER_TOTAL_TIMEOUT_RETRY_SECONDS))
    .build();

SpannerOptions.Builder builder = SpannerOptions.newBuilder()
        .setSessionPoolOption(SessionPoolOptions.newBuilder()
                .setFailIfPoolExhausted()
                .setMinSessions(SPANNER_MIN_SESSIONS)
                .setMaxSessions(SPANNER_MAX_SESSIONS)
                .build()
        )
        .setNumChannels(SPANNER_NUM_CHANNELS);

if (credentials != null) 
    builder.setCredentials(credentials);


builder.getSpannerStubSettingsBuilder()
        .executeSqlSettings()
        .setRetryableCodes(StatusCode.Code.DEADLINE_EXCEEDED, StatusCode.Code.UNAVAILABLE)
        .setRetrySettings(retrySettings);

spanner = builder.build().getService();
databaseId = DatabaseId.of(
        projectName,
        instanceName,
        databaseName
);

spannerDbClient = spanner.getDatabaseClient(databaseId);

这是我执行实际查询的方法:

List<Entry> entry = new ArrayList<>();

try (ResultSet resultSet = spannerDbClient
        .singleUseReadOnlyTransaction(TimestampBound.ofMaxStaleness(5, TimeUnit.SECONDS))
        .executeQuery(Statement.newBuilder(String.format("SELECT * from %s LIMIT %d OFFSET %d", tableName, limit, offset)).build())) 
    while (resultSet.next()) 
        entry.add(getEntryFromResultSet(resultSet));
    

我添加了计时器代码来显示查询的时间长度,这是 50 个线程的样子。这是使用 maxSession=50,minSession=50,numChannels=4(默认)的共享 spannerDbClient 实例:

  --> [0h:00m:00s] Throughput: Total       0, Interval 0 (0 req/s),   0/0 threads reporting  
[tId:099][00:00:00.335] Spanner query, LIMIT 250 OFFSET 99000  
[tId:146][00:00:00.382] Spanner query, LIMIT 250 OFFSET 146000  
[tId:140][00:00:00.445] Spanner query, LIMIT 250 OFFSET 140000  
[tId:104][00:00:00.494] Spanner query, LIMIT 250 OFFSET 104000  
[tId:152][00:00:00.363] Spanner query, LIMIT 250 OFFSET 152000  
[tId:149][00:00:00.643] Spanner query, LIMIT 250 OFFSET 149000  
[tId:143][00:00:00.748] Spanner query, LIMIT 250 OFFSET 143000  
[tId:163][00:00:00.682] Spanner query, LIMIT 250 OFFSET 163000  
[tId:155][00:00:00.799] Spanner query, LIMIT 250 OFFSET 155000  
[tId:166][00:00:00.872] Spanner query, LIMIT 250 OFFSET 166000  
[tId:250][00:00:00.870] Spanner query, LIMIT 250 OFFSET 250000  
[tId:267][00:00:01.319] Spanner query, LIMIT 250 OFFSET 267000  
[tId:229][00:00:01.917] Spanner query, LIMIT 250 OFFSET 229000  
[tId:234][00:00:02.256] Spanner query, LIMIT 250 OFFSET 234000  
[tId:316][00:00:02.401] Spanner query, LIMIT 250 OFFSET 316000  
[tId:246][00:00:02.844] Spanner query, LIMIT 250 OFFSET 246000  
[tId:312][00:00:02.989] Spanner query, LIMIT 250 OFFSET 312000  
[tId:176][00:00:03.497] Spanner query, LIMIT 250 OFFSET 176000  
[tId:330][00:00:03.140] Spanner query, LIMIT 250 OFFSET 330000  
[tId:254][00:00:03.879] Spanner query, LIMIT 250 OFFSET 254000  
[tId:361][00:00:03.816] Spanner query, LIMIT 250 OFFSET 361000  
[tId:418][00:00:03.635] Spanner query, LIMIT 250 OFFSET 418000  
[tId:243][00:00:04.503] Spanner query, LIMIT 250 OFFSET 243000  
[tId:414][00:00:04.006] Spanner query, LIMIT 250 OFFSET 414000  
[tId:324][00:00:04.457] Spanner query, LIMIT 250 OFFSET 324000  
[tId:498][00:00:03.865] Spanner query, LIMIT 250 OFFSET 498000  
[tId:252][00:00:04.945] Spanner query, LIMIT 250 OFFSET 252000  
[tId:494][00:00:04.211] Spanner query, LIMIT 250 OFFSET 494000  
[tId:444][00:00:04.780] Spanner query, LIMIT 250 OFFSET 444000  
[tId:422][00:00:04.951] Spanner query, LIMIT 250 OFFSET 422000  
[tId:397][00:00:05.234] Spanner query, LIMIT 250 OFFSET 397000  
[tId:420][00:00:05.106] Spanner query, LIMIT 250 OFFSET 420000  
[tId:236][00:00:05.985] Spanner query, LIMIT 250 OFFSET 236000  
[tId:406][00:00:05.429] Spanner query, LIMIT 250 OFFSET 406000  
[tId:449][00:00:05.291] Spanner query, LIMIT 250 OFFSET 449000  
[tId:437][00:00:05.929] Spanner query, LIMIT 250 OFFSET 437000  
[tId:341][00:00:06.611] Spanner query, LIMIT 250 OFFSET 341000  
[tId:475][00:00:06.223] Spanner query, LIMIT 250 OFFSET 475000  
[tId:490][00:00:06.186] Spanner query, LIMIT 250 OFFSET 490000  
[tId:416][00:00:06.460] Spanner query, LIMIT 250 OFFSET 416000  
[tId:328][00:00:07.446] Spanner query, LIMIT 250 OFFSET 328000  
[tId:322][00:00:07.679] Spanner query, LIMIT 250 OFFSET 322000  
[tId:158][00:00:09.357] Spanner query, LIMIT 250 OFFSET 158000  
[tId:496][00:00:08.183] Spanner query, LIMIT 250 OFFSET 496000  
[tId:256][00:00:09.250] Spanner query, LIMIT 250 OFFSET 256000  
  --> [0h:00m:10s] Throughput: Total    9848, Interval +9848 (984 req/s),  44/50 threads reporting  
[tId:492][00:00:08.646] Spanner query, LIMIT 250 OFFSET 492000  
[tId:390][00:00:09.810] Spanner query, LIMIT 250 OFFSET 390000  
[tId:366][00:00:10.142] Spanner query, LIMIT 250 OFFSET 366000  
[tId:320][00:00:10.451] Spanner query, LIMIT 250 OFFSET 320000  
[tId:318][00:00:10.619] Spanner query, LIMIT 250 OFFSET 318000  
  --> [0h:00m:20s] Throughput: Total   56051, Interval +46203 (4620 req/s),  50/50 threads reporting  
  --> [0h:00m:30s] Throughput: Total  102172, Interval +46121 (4612 req/s),  50/50 threads reporting  

请注意,无论偏移量如何,查询时间都会增加,并且初始 spanner 查询需要 10 到 20 秒才能返回所有 50 个线程的数据,然后才开始报告结果。如果我将限制增加到 1000,那么所有 50 个线程需要将近 2 分钟才能从 Spanner 中获取结果。

将其与 DynamoDb 等效项(限制为 1000 个除外)进行比较,其中所有查询都在不到 1 秒的时间内返回,并且所有 50 个线程都在显示 10 秒状态更新之前报告结果:

  --> [0h:00m:00s] Throughput: Total       0, Interval 0 (0 req/s),   0/0 threads reporting  
[tId:045] Dynamo query, LIMIT 1000 [00:00:00.851]  
[tId:138] Dynamo query, LIMIT 1000 [00:00:00.463]  
[tId:183] Dynamo query, LIMIT 1000 [00:00:00.121]  
[tId:122] Dynamo query, LIMIT 1000 [00:00:00.576]  
[tId:095] Dynamo query, LIMIT 1000 [00:00:00.708]  
[tId:072] Dynamo query, LIMIT 1000 [00:00:00.778]  
[tId:115] Dynamo query, LIMIT 1000 [00:00:00.619]  
[tId:166] Dynamo query, LIMIT 1000 [00:00:00.296]  
[tId:058] Dynamo query, LIMIT 1000 [00:00:00.814]  
[tId:179] Dynamo query, LIMIT 1000 [00:00:00.242]  
[tId:081] Dynamo query, LIMIT 1000 [00:00:00.745]  
[tId:106] Dynamo query, LIMIT 1000 [00:00:00.671]  
[tId:162] Dynamo query, LIMIT 1000 [00:00:00.348]  
[tId:035] Dynamo query, LIMIT 1000 [00:00:00.889]  
[tId:134] Dynamo query, LIMIT 1000 [00:00:00.513]  
[tId:187] Dynamo query, LIMIT 1000 [00:00:00.090]  
[tId:158] Dynamo query, LIMIT 1000 [00:00:00.405]  
[tId:191] Dynamo query, LIMIT 1000 [00:00:00.095]  
[tId:195] Dynamo query, LIMIT 1000 [00:00:00.096]  
[tId:199] Dynamo query, LIMIT 1000 [00:00:00.144]  
[tId:203] Dynamo query, LIMIT 1000 [00:00:00.112]  
[tId:291] Dynamo query, LIMIT 1000 [00:00:00.102]  
[tId:303] Dynamo query, LIMIT 1000 [00:00:00.094]  
[tId:312] Dynamo query, LIMIT 1000 [00:00:00.101]  
[tId:318] Dynamo query, LIMIT 1000 [00:00:00.075]  
[tId:322] Dynamo query, LIMIT 1000 [00:00:00.086]  
[tId:326] Dynamo query, LIMIT 1000 [00:00:00.096]  
[tId:330] Dynamo query, LIMIT 1000 [00:00:00.085]  
[tId:334] Dynamo query, LIMIT 1000 [00:00:00.114]  
[tId:342] Dynamo query, LIMIT 1000 [00:00:00.096]  
[tId:391] Dynamo query, LIMIT 1000 [00:00:00.081]  
[tId:395] Dynamo query, LIMIT 1000 [00:00:00.088]  
[tId:406] Dynamo query, LIMIT 1000 [00:00:00.088]  
[tId:415] Dynamo query, LIMIT 1000 [00:00:00.078]  
[tId:421] Dynamo query, LIMIT 1000 [00:00:00.089]  
[tId:425] Dynamo query, LIMIT 1000 [00:00:00.068]  
[tId:429] Dynamo query, LIMIT 1000 [00:00:00.088]  
[tId:433] Dynamo query, LIMIT 1000 [00:00:00.105]  
[tId:437] Dynamo query, LIMIT 1000 [00:00:00.092]  
[tId:461] Dynamo query, LIMIT 1000 [00:00:00.110]  
[tId:483] Dynamo query, LIMIT 1000 [00:00:00.071]  
[tId:491] Dynamo query, LIMIT 1000 [00:00:00.078]  
[tId:495] Dynamo query, LIMIT 1000 [00:00:00.075]  
[tId:503] Dynamo query, LIMIT 1000 [00:00:00.064]  
[tId:499] Dynamo query, LIMIT 1000 [00:00:00.108]  
[tId:514] Dynamo query, LIMIT 1000 [00:00:00.163]  
[tId:518] Dynamo query, LIMIT 1000 [00:00:00.135]  
[tId:529] Dynamo query, LIMIT 1000 [00:00:00.163]  
[tId:533] Dynamo query, LIMIT 1000 [00:00:00.079]  
[tId:541] Dynamo query, LIMIT 1000 [00:00:00.060]  
  --> [0h:00m:10s] Throughput: Total   24316, Interval +24316 (2431 req/s),  50/50 threads reporting  
  --> [0h:00m:20s] Throughput: Total   64416, Interval +40100 (4010 req/s),  50/50 threads reporting  

我在配置中遗漏了什么吗?如果我让它自动缩放,性能问题会大大放大。

【问题讨论】:

【参考方案1】:

我怀疑是为了产生准确的结果

SELECT A, B, C FROM data_table LIMIT 250 OFFSET XXX

后端需要获取 250 + XXX 行,然后跳过其中的 XXX 行。因此,如果 XXX 非常大,这可能是一个非常昂贵的查询,并且需要扫描一大块 data_table

改为限制表键是否有意义?类似:

SELECT A, B, C FROM data_table WHERE TableKey1 > 'key_restriction' LIMIT 250;

这种类型的查询最多只能读取 250 行。

独立地,最好了解此类查询对于您的生产工作负载的代表性。您能解释一下您期望在生产中使用哪种类型的查询吗?

【讨论】:

您引用的查询不代表生产查询。在生产中,查询更像是 'SELECT A, B, C FROM data_table WHERE A = ' 表的主键是“A”列。为了为命中率介于 1% 和 10% 之间的测试客户端代码生成测试数据,我想选择已知存在的有效“A”值的块/页。 我明白了,有道理。我同意上面 Knut 的分析。问题是对于较大的 OFFSET 值,查询变得越来越昂贵。在您的实验中似乎也是如此。您可以通过从较大的偏移量开始并减少它们来运行您的实验来确认这种情况。以 Knut 或我建议的一种方式重写查询应该会有所帮助。【参考方案2】:

编辑基于附加信息:

正如下面Panagiotis Voulgaris 所指出的,我认为这种情况下的问题与客户端配置无关,而是与查询本身有关。查询似乎很慢,尤其是对于较高的OFFSET 值。我用大约 1,000,000 行的表进行了尝试,对于 900,000 的 OFFSET 值,单个查询运行 4-5 秒。当你扩大规模时问题变得更糟的原因可能是你用大量需要很长时间的并行查询压倒了后端,而不是因为客户端配置错误。

如果您可以重新编写查询以根据主键值而不是使用 LIMIT x OFFSET y 构造来选择一系列行,那么最好的方法是。因此,您的查询将如下所示:

SELECT A, B, C
FROM data_table
WHERE A >= x AND A < (x+250)

如果您的键列包含值之间的间隙,这显然不能保证您在每个分区中准确地获得 250 行。在这种情况下,您还可以稍微增加+250 的值以获得合理的分区。

如果由于键值是完全随机的值(或不均匀分布)而无法实现上述操作,那么我认为以下查询会比您当前的查询更有效:

SELECT A, B, C
FROM data_table
WHERE A >= (
  SELECT ANY_VALUE(A)
  FROM data_table
  GROUP BY A
  LIMIT 1 OFFSET y
)
ORDER BY A
LIMIT 250

在这种情况下,我并不清楚你的最终目标是什么,当涉及到具体问题时,这会有所不同:

...如果 partitionQuery() 路由正确 (?)

BatchReadOnlyTransactionpartitionQuery() 路由用于在单个时间点读取大型数据集。例如,这可能是当您想要创建表中所有数据的转储时。 Spanner 将为您对查询进行分区并返回一个分区列表。然后,每个分区都可以由单独的线程(甚至是单独的 VM)处理。可以这么说,这会自动替换查询的 LIMIT 250 OFFSET xxxx 部分,因为 Spanner 会根据表中的实际数据创建不同的分区。

但是,如果您的最终目标是模拟生产负载,那么 BatchReadOnlyTransaction 不是要遵循的路线。

如果您想要有效地查询数据集,那么您应该确保使用single-use read-only transaction 进行查询。这就是您已经使用本机客户端所做的事情。此外,只要连接处于自动提交模式,JDBC 驱动程序还将自动使用一次性只读事务进行查询。如果你关闭自动提交,驱动程序会在你执行查询时自动启动事务。

关于会话和频道:

会话在某种程度上类似于您通常所说的连接。 JDBC 驱动程序和本机客户端都使用内部会话池。在您的案例中,重要的部分是将随时执行的并行读取数。一个会话可以随时处理一个事务(即一个读取操作)。因此,您将需要与并行读取操作一样多的会话。我假设在您使用 c3po 的设置中,您正在为每个正在读取的线程分配一个 JDBC 连接。在这种情况下,最大会话数应设置为等于 c3po 池中的最大连接数。 通道:通道是 gRPC 使用的低级网络连接。一个通道可以并行处理多个同时请求。据我所知,默认最大值是每个通道 100 个同时请求,因此您应该为每 100 个会话使用 1 个通道。这也是 JDBC 驱动程序和本机客户端库中的默认设置。

关于(示例)查询: 如上所述,我不清楚这只是一个测试设置,还是一个实际的生产示例。但是,我希望查询包含显式的 ORDER BY 子句以确保数据按预期顺序返回,并且 ORDER BY 子句显然应该使用索引列。

最后:问题是后端每次查询响应慢吗?还是后端基本处于闲置状态,客户端无法真正提高查询量?

【讨论】:

出于我的目的,我正在查询 Spanner 以获取用于测试生产服务的数据。我只想检索较小的记录块以限制我这边的内存消耗。我已经尝试了 100 行,最多 2500 行。问题是查询太慢而无法响应。所有线程都阻塞等待初始 ResultSet,这需要几分钟。我将 jdbc 客户端配置为使用: :- minSessions = initialThreadCount (1000) - maxSessions = maxThreadCount (5000) - numChannels = 256 当我有超过 100 个左右的线程时,似乎没有任何影响。

以上是关于如何有效地与多个线程并行查询 google-cloud-spanner?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 AWS Glue PySpark 中运行并行线程?

查询处理器未能为执行并行查询启动必要的线程资源啥意思

jmeter如何并行执行多个线程组

并行查询--dba手册

并行查询--dba手册

在同一个套接字上对发送/接收的并行调用是不是有效?