使用 Spark SQL 时可以要求对数据库进行联接操作吗?

Posted

技术标签:

【中文标题】使用 Spark SQL 时可以要求对数据库进行联接操作吗?【英文标题】:Can join operations be demanded to database when using Spark SQL? 【发布时间】:2018-06-05 17:29:16 【问题描述】:

我不是 Spark SQL API 专家,也不是底层 RDD 专家。

但是,了解 Catalyst 优化引擎后,我希望 Spark 会尝试尽量减少内存中的工作量。

这是我的情况: 比如说,我有两张桌子

TABLE GenericOperation (ID, CommonFields...)
TABLE SpecificOperation (OperationID, SpecificFields...)

它们都非常庞大(~500M,不是数据,但在标准应用服务器的内存中作为一个整体存在是不可行的)

也就是说,假设我必须使用 Spark(更大用例的一部分)检索与属于 GenericOperation 的字段上的某些特定条件匹配的所有 SpecificOperation 实例。

这是我正在使用的代码:

val gOps = spark.read.jdbc(db.connection, "GenericOperation", db.properties)
val sOps = spark.read.jdbc(db.connection, "SpecificOperation", db.properties)
val joined = sOps.join(gOps).where("ID = OperationID")
joined.where("CommonField= 'SomeValue'").select("SpecificField").show()

问题是,当涉及到运行上述内容时,我可以从 SQL Profiler 中看到,Spark 不会在数据库上执行连接,而是从 SpecificOperation 检索所有 OperationID,然后我假设它会在内存中运行所有合并。由于没有过滤器适用于SpecificOperation,因此这样的检索会给终端系统带来很多太多的数据。

是否可以将上述内容写成直接要求连接到 dbms? 或者它取决于我不知道的 Spark 的一些神奇配置?

当然,我可以在检索时简单地将连接硬编码为子查询,但在我的情况下这是不可行的:必须在运行时从简单的构建块开始创建语句。因此,我需要从已经建立的两个spark.sql.DataFrame 开始实现这一点

附带说明一下,我使用 Spark 2.3.0 for Scala 2.11 运行它,针对 SQL Server 2016 数据库实例。

【问题讨论】:

In Apache Spark 2.0.0, is it possible to fetch a query from an external database (rather than grab the whole table)?的可能重复 不完全,但感谢您的建议。我编辑了问题以消除歧义:在我的情况下,我无法在加入之前对逻辑采取行动 我不认为它会那样工作。您可以使用 spark.read.jdbc(query) 将查询推送到数据库 【参考方案1】:

是否可以将上述内容写成直接要求连接到 dbms?或者它取决于我不知道的 Spark 的一些神奇配置?

排除静态生成的查询 (In Apache Spark 2.0.0, is it possible to fetch a query from an external database (rather than grab the whole table)?),Spark 不支持 join 下推。只有谓词和选择可以委托给源。

没有任何神奇的配置或代码可以支持这种类型的过程。

一般来说,如果服务器可以处理连接,数据通常不足以从 Spark 中受益。

【讨论】:

谢谢,有帮助。选择 Spark 的原因有很多,其中没有一个是数据量(巨大,但不是那么多)......不过,您知道到目前为止还没有实现它们是否很好,或者是否有在任何情况下都可能阻止其发展成为一项功能的障碍?例如,我看到他们正在研究类似的东西slideshare.net/databricks/…

以上是关于使用 Spark SQL 时可以要求对数据库进行联接操作吗?的主要内容,如果未能解决你的问题,请参考以下文章

如何加快 Spark SQL 单元测试?

使用 Spark SQL GROUP BY 对 DataFrame 进行高效的 PairRDD 操作

sql 级联删除问题

Apache Spark 使用 SQL 函数 nTile 对数据进行分区

如何使用 Spark SQL 对均值列进行排序?

spark必知必会的基本概念