Spark 是不是仅将我的 UDF 应用于正在显示的记录?
Posted
技术标签:
【中文标题】Spark 是不是仅将我的 UDF 应用于正在显示的记录?【英文标题】:Is Spark only applying my UDF on records being shown?Spark 是否仅将我的 UDF 应用于正在显示的记录? 【发布时间】:2017-03-28 19:20:25 【问题描述】:我觉得 Spark 比我更聪明,并且重新排序(或至少与编写的代码相比)在执行程序等上运行的内容。
假设我在 scala 中有一个非常简单的 spark 查询,如下所示。
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val rawData = sqlContext.sql("FROM mytable SELECT *")
然后,我使用 UDF 中的某些功能创建一个新列,该功能不是轻量级的(或至少在某些时候),并且依赖于数据中的多个列。粗略地说,我的 UDF 看起来与此类似,尽管处理只是一个示例。
def method1(s1:String, s2:String):String =
List(s1, s2).mkString(" ")
val method1UDF = udf(method1 _)
val dataWithCol = rawData
.withColumn("newcol", method1UDF($"c1",$"c2"))
dataWithCol.show(100)
我的问题实际上围绕着最后一个陈述,或者至少我认为是这样。
如果我的数据集有 10 亿条记录,Spark 实际上只是将我的 withColumn 应用于 100 条记录,还是将其应用于所有 100 万条记录,然后只返回前 100 条?
在 Hive 中,我认为等价的应该是:
SELECT t.c1, t.c2, CONCAT_WS(" ",t.c1,t.c2) as newCol from (
SELECT c1,c2 as newCol FROM mytable limit 100
) t
即使在代码中看起来我已经编写了与以下查询等效的代码
SELECT * from (
SELECT c1,c2, CONCAT_WS(" ",c1,c2) as newCol FROM mytable
) t limit 100
我怀疑是前者,因为在新列上添加过滤器会大大减慢操作速度。如果我将最后一行更改为:
dataWithCol.filter($"newCol" === "H i").show(100)
现在必须将该函数应用于更多数据(可能是整个数据集),然后才能达到 100 的限制,类似于以下 Hive 查询:
SELECT * from (
SELECT c1,c2, CONCAT_WS(" ",c1,c2) as newCol FROM mytable
) t where t.newCol == "H i" limit 100
我是否符合 Spark 在后台所做的事情?是否仅通过对最终将被查看的记录应用处理来优化我的查询?
【问题讨论】:
提示:检查“解释”结果:) 【参考方案1】:如果你不确定你总是可以做一个实验:
Spark context available as 'sc' (master = local[*], app id = local-1490732267478).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.
scala> :paste
// Entering paste mode (ctrl-D to finish)
val rawData = spark.range(0, 1000000000, 1, 1000)
.toDF("id")
.select(
$"id".cast("string").alias("s1"),
$"id".cast("string").alias("s2"))
val counter = sc.longAccumulator("counter")
def f = udf((s1: String, s2: String) =>
counter.add(1)
s"$s1 $s2"
)
rawData.select(f($"s1", $"s2")).show(10)
// Exiting paste mode, now interpreting.
+-----------+
|UDF(s1, s2)|
+-----------+
| 0 0|
| 1 1|
| 2 2|
| 3 3|
| 4 4|
| 5 5|
| 6 6|
| 7 7|
| 8 8|
| 9 9|
+-----------+
only showing top 10 rows
rawData: org.apache.spark.sql.DataFrame = [s1: string, s2: string]
counter: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(counter), value: 12)
f: org.apache.spark.sql.expressions.UserDefinedFunction
scala> counter.value
res1: Long = 12
如您所见,Spark 限制了要处理的记录数,但并不十分精确。您还应该记住,这些结果取决于版本和查询。
例如,早期的 Spark 版本在对 UDF 调用应用优化时相当有限。此外,上游范围的转换可能会影响此行为并导致处理更多(甚至所有)记录。
【讨论】:
【参考方案2】:Spark 应用了一种称为“延迟执行”的方法。这意味着它仅在必要时评估操作。所以,它实际上是在你写的两个语句之间做一些事情。执行计划者足够聪明,可以弄清楚什么需要做,什么不需要。要查看更多详细信息,请浏览 localhost:4040(为您正在运行的每个上下文将端口增加 1)。
【讨论】:
感谢您的回答,两位都帮我弄清楚了发生了什么,但只能选择一个作为答案。以上是关于Spark 是不是仅将我的 UDF 应用于正在显示的记录?的主要内容,如果未能解决你的问题,请参考以下文章
将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException