在不知道上限的情况下使用 spark 从 sql 数据库中并行读取

Posted

技术标签:

【中文标题】在不知道上限的情况下使用 spark 从 sql 数据库中并行读取【英文标题】:Read from sql database in parallel using spark without knowing upper bound 【发布时间】:2017-07-11 04:04:02 【问题描述】:

Spark 允许您从 sql db 源并行读取,并且可以基于滑动窗口进行分区,例如(来自book,第 7 章)

 val colName = "count"
 val lowerBound = 0L
 val upperBound = 348113L // this is the max count in our table
 val numPartitions = 10

 spark.read.jdbc(url,
                tablename,
                colName,
                lowerBound,
                upperBound,
                numPartitions,
                props).count()

在这里,上限是事先知道的。

比方说,一张表在一天内获得了“x”行(可能在 1-2 百万之间),在一天结束时,我们提交了一个 spark 作业,进行一些转换并写入 Parquet/ CSV/JSON。如果我们事先不知道将有多少行(从 1 到 2 百万不等)写入 SQL 源数据库,那么在这种情况下,进行分区的最佳方法或做法是什么。

一种方法是估计您的上限,但我不确定这是一种正确的方法。

【问题讨论】:

【参考方案1】:

我遇到了完全相同的问题。我也需要随机分布。所以我选择一个 int 列并在其上获取 mod 10。这样我就可以在不关心的情况下分割任何东西 界限或分布。

options += ("numPartitions" -> numPartitions,"partitionColumn"->"mod(my_int_column,10)","lowerBound"->"0","upperBound"->"9")

【讨论】:

此解决方案在 Spark 2.4.0 中不再适用,因为 partitionColumn 现在已解析以查明它们是否存在于架构中 (github.com/apache/spark/commit/…)。作为替代,您可以使用predicates 参数(spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/…)。生成这样的谓词:Array.range(0, partitionCount).map(partNumber => s"mod(my_int_column, $partitionCount) = $partNumber"). 关于 Spark 2.4,您可以将此列添加为您正在执行的查询的一部分:Select mod(id,10) as partition_key,* from table,然后将用户 partition_key 作为分区列【参考方案2】:

每天的行数都不同这一事实并没有太大变化。假设您想要有 20 个分区,那么有一天您在单个分区中大约有 1M/20 行,而另一天大约有 2M/20。如果有更多的数据并且分区的数量是固定的,那么显然分区中会有更多的数据。

如果对下限和上限有混淆,我想澄清一下,lowerBound 和 upperBound 指的是您要分区的列的值,而不是行数。此外,表格不会根据这些值进行过滤,这意味着如果您的行的值小于 lowerBound 或大于 upperBound,这些行仍将被包括在内。

见: Whats meaning of partitionColumn, lowerBound, upperBound, numPartitions parameters?

来自文档:http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

partitionColumn、lowerBound、upperBound:如果指定了这些选项中的任何一个,则必须全部指定。此外,必须指定 numPartitions。他们描述了从多个工作人员并行读取时如何对表进行分区。 partitionColumn 必须是相关表中的数字列。请注意,lowerBound 和 upperBound 仅用于决定分区步长,而不是用于过滤表中的行。所以表中的所有行都会被分区并返回。此选项仅适用于阅读。

【讨论】:

是的,我明白分区是基于列的值(这就是传递列名的原因)。但不是做滑动窗口分区,而是 spark 可以为 JDBC 源做自动分区(就像它为 CSV/Parquet 做的那样)? 您可以不指定 partitionColumn、lowerBound、upperBound 和 numPartitions。然后 spark 将打开一个到您的 sql server 的单个 jdbc 连接,并按照它认为合适的方式进行分区。所以它几乎肯定是一个性能明显差的解决方案(但在你的 sql 服务器上更容易)。在我给出的文档链接中,有关于通过 jdbc 查询的 sn-ps 而不指定 partitionColumn 和边界。 不给出任何界限,spark 将创建 1 个分区(使用 df.rdd.partition.size 检查),这无济于事,因为 spark 不会自行进行任何类型的分区。 @ds_user 该问题中的回答者写道,以回应未包含所有实际运行的查询的其他回答者。因此,该问题中公认的答案和我说的是同一件事 - 无论您选择什么边界,您都不会丢失任何数据。 他说“实际上上面的列表遗漏了一些东西,特别是第一个和最后一个查询。没有它们你会丢失一些数据”。但他确实包含了第一个和最后一个查询,因此不会丢失任何数据。如果您有“SELECT * FROM table WHERE partitionColumn > 9000”,您将如何丢失超出上限的数据?

以上是关于在不知道上限的情况下使用 spark 从 sql 数据库中并行读取的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL 在不配置 Hive 的情况下加载数据?

spark sql 在不使用 where 子句的情况下将所有数据加载到内存中

在不使用操作的情况下对 Spark 进行基准测试

如何在不使用databricks csv api的情况下直接将CSV文件读入spark DataFrame?

在不知道工作表名称的情况下使用 SSIS 从 Excel 导入数据

在不计算的情况下获取 Spark 数据框中的行数