Spark SQL/Hive 查询永远需要加入

Posted

技术标签:

【中文标题】Spark SQL/Hive 查询永远需要加入【英文标题】:Spark SQL/Hive Query Takes Forever With Join 【发布时间】:2015-12-02 06:31:21 【问题描述】:

所以我正在做一些应该很简单的事情,但显然它不在 Spark SQL 中。

如果我在 mysql 中运行以下查询,查询会在几分之一秒内完成:

SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;

但是,在 Spark (1.5.1) 下的 HiveContext 中运行相同的查询需要超过 13 秒。添加更多连接会使查询运行很长时间(超过 10 分钟)。我不确定我在这里做错了什么以及如何加快速度。

这些表是作为临时表加载到 Hive 上下文中的 MySQL 表。它在单个实例中运行,数据库位于远程计算机上。

user 表有大约 480 万行。 user_address 表有 350,000 行。

这些表有外键字段,但在 db.xml 中没有定义明确的 fk 关系。我正在使用 InnoDB。

Spark 中的执行计划:

计划:

扫描 JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, user=, 密码=, url=jdbc:mysql://, dbtable=user) [address_id#0L,user_address_id#27L]

过滤器 (user_id#0L = 123) 扫描 JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,user=, 密码=, url=jdbc:mysql://, dbtable=user_address)[address_id#52L]

ConvertToUnsafe 转换为Unsafe

TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC],假,0 TungstenSort [user_address_id#27L ASC],假,0

SortMergeJoin [user_address_id#27L], [address_id#52L]

== 物理计划 == TungstenProject [address_id#0L]

【问题讨论】:

请添加物理计划,以及针对数据库运行的有效 SQL 查询。进一步添加创建数据框和查询的代码。 你缓存了那些表吗? 【参考方案1】:

首先,您执行的查询类型效率极低。至于现在(Spark 1.5.0*)要像这样执行连接,每次执行查询时都必须对两个表进行洗牌/哈希分区。对于users 表,其中user_id = 123 谓词很可能被下推但仍需要在user_address 上完全洗牌,这应该不是问题。

此外,如果表只注册而不缓存,则每次执行此查询都会从 MySQL 获取整个 user_address 表到 Spark。

我不确定我在这里做错了什么以及如何加快速度。

尚不清楚您为什么要使用 Spark 进行应用程序,但单机设置、小数据和查询类型表明 Spark 不适合这里。

一般来说,如果应用程序逻辑需要单个记录访问,那么 Spark SQL 将无法正常执行。它是为分析查询而设计的,而不是作为 OLTP 数据库的替代品。

如果单个表/数据框要小得多,您可以尝试广播。

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast

val user: DataFrame = ???
val user_address: DataFrame = ???

val userFiltered = user.where(???)

user_addresses.join(
  broadcast(userFiltered), $"address_id" === $"user_address_id")

* 这应该在 Spark 1.6.0 中使用SPARK-11410 进行更改,这应该启用持久表分区。

【讨论】:

只是为了澄清,上面的例子只是为了证明这发生在一条记录上。不将查询限制为单个记录会使情况变得更糟。我在这里想要完成的是(ab)使用 Spark 作为 SQL + 文件的统一数据源。我尝试缓存表,但这导致查询挂起超过 15 分钟。 这不会改变我的答案。它只会使加入的 LHS 更加昂贵。它仍然需要获取所有数据并随机播放。 那么,您认为更可行的方法是在 Spring/Hibernate 上运行 MySQL 查询,在 Spark 上运行文件查询,然后在 Spark 中将两者连接起来? 一开始我不会使用 Spark :) 但是如果不使用 MySQL 进行在线查询,那么您可以使用子查询推送整个联接:***.com/a/32585936/1560062 在单机设置中?不是真的...不过,您可以查看不同的 SQL/MED(例如 PostgreSQL FDW)。在分布式设置中,我实际上会首先考虑 Apache Drill。【参考方案2】:

我在类似情况下也遇到过同样的问题(Spark 1.5.1、PostgreSQL 9.4)。

鉴于这两个表像

val t1 = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "t1")).load()

val t2 = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "t2")).load()

然后在 HQL 中对已注册临时表的联接会导致对其中一个表(在我的情况下是子表)进行全表扫描。

无论如何,一种解决方法是将查询推送到底层 RDBMS:

val joined = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "(select t1.*, t2.* from t1 inner join t2 on ...) as t")).load()

这样,底层 RDBMS 的查询优化器就会启动,在我的例子中它切换到索引扫描。另一方面,Spark 下推了两个独立的查询,RDBMS 无法真正优化它。

【讨论】:

以上是关于Spark SQL/Hive 查询永远需要加入的主要内容,如果未能解决你的问题,请参考以下文章

来自 Spark 的 Hive 查询 - 无法解析

实例化“org.apache.spark.sql.hive.HiveExternalCatalog”时出错

Spark SQL Hive Datanucleus jar 类路径

在 SQL/Hive 中解析/查询键值对

SQL/Hive 查询以计算特定值每天的行数

sql Hive导出查询结果到文件