Spark SQL/Hive 查询永远需要加入
Posted
技术标签:
【中文标题】Spark SQL/Hive 查询永远需要加入【英文标题】:Spark SQL/Hive Query Takes Forever With Join 【发布时间】:2015-12-02 06:31:21 【问题描述】:所以我正在做一些应该很简单的事情,但显然它不在 Spark SQL 中。
如果我在 mysql 中运行以下查询,查询会在几分之一秒内完成:
SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;
但是,在 Spark (1.5.1) 下的 HiveContext 中运行相同的查询需要超过 13 秒。添加更多连接会使查询运行很长时间(超过 10 分钟)。我不确定我在这里做错了什么以及如何加快速度。
这些表是作为临时表加载到 Hive 上下文中的 MySQL 表。它在单个实例中运行,数据库位于远程计算机上。
user 表有大约 480 万行。 user_address 表有 350,000 行。这些表有外键字段,但在 db.xml 中没有定义明确的 fk 关系。我正在使用 InnoDB。
Spark 中的执行计划:
计划:
扫描 JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, user=, 密码=, url=jdbc:mysql://, dbtable=user) [address_id#0L,user_address_id#27L]
过滤器 (user_id#0L = 123) 扫描 JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,user=, 密码=, url=jdbc:mysql://, dbtable=user_address)[address_id#52L]
ConvertToUnsafe 转换为Unsafe
TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC],假,0 TungstenSort [user_address_id#27L ASC],假,0
SortMergeJoin [user_address_id#27L], [address_id#52L]
== 物理计划 == TungstenProject [address_id#0L]
【问题讨论】:
请添加物理计划,以及针对数据库运行的有效 SQL 查询。进一步添加创建数据框和查询的代码。 你缓存了那些表吗? 【参考方案1】:首先,您执行的查询类型效率极低。至于现在(Spark 1.5.0*)要像这样执行连接,每次执行查询时都必须对两个表进行洗牌/哈希分区。对于users
表,其中user_id = 123
谓词很可能被下推但仍需要在user_address
上完全洗牌,这应该不是问题。
此外,如果表只注册而不缓存,则每次执行此查询都会从 MySQL 获取整个 user_address
表到 Spark。
我不确定我在这里做错了什么以及如何加快速度。
尚不清楚您为什么要使用 Spark 进行应用程序,但单机设置、小数据和查询类型表明 Spark 不适合这里。
一般来说,如果应用程序逻辑需要单个记录访问,那么 Spark SQL 将无法正常执行。它是为分析查询而设计的,而不是作为 OLTP 数据库的替代品。
如果单个表/数据框要小得多,您可以尝试广播。
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast
val user: DataFrame = ???
val user_address: DataFrame = ???
val userFiltered = user.where(???)
user_addresses.join(
broadcast(userFiltered), $"address_id" === $"user_address_id")
* 这应该在 Spark 1.6.0 中使用SPARK-11410 进行更改,这应该启用持久表分区。
【讨论】:
只是为了澄清,上面的例子只是为了证明这发生在一条记录上。不将查询限制为单个记录会使情况变得更糟。我在这里想要完成的是(ab)使用 Spark 作为 SQL + 文件的统一数据源。我尝试缓存表,但这导致查询挂起超过 15 分钟。 这不会改变我的答案。它只会使加入的 LHS 更加昂贵。它仍然需要获取所有数据并随机播放。 那么,您认为更可行的方法是在 Spring/Hibernate 上运行 MySQL 查询,在 Spark 上运行文件查询,然后在 Spark 中将两者连接起来? 一开始我不会使用 Spark :) 但是如果不使用 MySQL 进行在线查询,那么您可以使用子查询推送整个联接:***.com/a/32585936/1560062 在单机设置中?不是真的...不过,您可以查看不同的 SQL/MED(例如 PostgreSQL FDW)。在分布式设置中,我实际上会首先考虑 Apache Drill。【参考方案2】:我在类似情况下也遇到过同样的问题(Spark 1.5.1、PostgreSQL 9.4)。
鉴于这两个表像
val t1 = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:db",
"dbtable" -> "t1")).load()
val t2 = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:db",
"dbtable" -> "t2")).load()
然后在 HQL 中对已注册临时表的联接会导致对其中一个表(在我的情况下是子表)进行全表扫描。
无论如何,一种解决方法是将查询推送到底层 RDBMS:
val joined = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:db",
"dbtable" -> "(select t1.*, t2.* from t1 inner join t2 on ...) as t")).load()
这样,底层 RDBMS 的查询优化器就会启动,在我的例子中它切换到索引扫描。另一方面,Spark 下推了两个独立的查询,RDBMS 无法真正优化它。
【讨论】:
以上是关于Spark SQL/Hive 查询永远需要加入的主要内容,如果未能解决你的问题,请参考以下文章
实例化“org.apache.spark.sql.hive.HiveExternalCatalog”时出错