Spark 是不是仅将我的 UDF 应用于正在显示的记录?

Posted

技术标签:

【中文标题】Spark 是不是仅将我的 UDF 应用于正在显示的记录?【英文标题】:Is Spark only applying my UDF on records being shown?Spark 是否仅将我的 UDF 应用于正在显示的记录? 【发布时间】:2017-03-28 19:20:25 【问题描述】:

我觉得 Spark 比我更聪明,并且重新排序(或至少与编写的代码相比)在执行程序等上运行的内容。

假设我在 scala 中有一个非常简单的 spark 查询,如下所示。

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val rawData = sqlContext.sql("FROM mytable SELECT *")

然后,我使用 UDF 中的某些功能创建一个新列,该功能不是轻量级的(或至少在某些时候),并且依赖于数据中的多个列。粗略地说,我的 UDF 看起来与此类似,尽管处理只是一个示例。

def method1(s1:String, s2:String):String = 
  List(s1, s2).mkString(" ")


val method1UDF = udf(method1 _)

val dataWithCol = rawData
                  .withColumn("newcol", method1UDF($"c1",$"c2"))

dataWithCol.show(100)

我的问题实际上围绕着最后一个陈述,或者至少我认为是这样。

如果我的数据集有 10 亿条记录,Spark 实际上只是将我的 withColumn 应用于 100 条记录,还是将其应用于所有 100 万条记录,然后只返回前 100 条?

在 Hive 中,我认为等价的应该是:

SELECT t.c1, t.c2, CONCAT_WS(" ",t.c1,t.c2) as newCol from (
    SELECT c1,c2 as newCol FROM mytable limit 100
) t

即使在代码中看起来我已经编写了与以下查询等效的代码

SELECT * from (
    SELECT c1,c2, CONCAT_WS(" ",c1,c2) as newCol FROM mytable  
) t limit 100

我怀疑是前者,因为在新列上添加过滤器会大大减慢操作速度。如果我将最后一行更改为:

dataWithCol.filter($"newCol" === "H i").show(100)

现在必须将该函数应用于更多数据(可能是整个数据集),然后才能达到 100 的限制,类似于以下 Hive 查询:

SELECT * from (
    SELECT c1,c2, CONCAT_WS(" ",c1,c2) as newCol FROM mytable  
) t where t.newCol == "H i" limit 100

我是否符合 Spark 在后台所做的事情?是否仅通过对最终将被查看的记录应用处理来优化我的查询?

【问题讨论】:

提示:检查“解释”结果:) 【参考方案1】:

如果你不确定你总是可以做一个实验:

Spark context available as 'sc' (master = local[*], app id = local-1490732267478).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

val rawData = spark.range(0, 1000000000, 1, 1000)
  .toDF("id")
  .select(
    $"id".cast("string").alias("s1"), 
    $"id".cast("string").alias("s2"))

val counter = sc.longAccumulator("counter")

def f = udf((s1: String, s2: String) => 
  counter.add(1)
  s"$s1 $s2"
)

rawData.select(f($"s1", $"s2")).show(10)



// Exiting paste mode, now interpreting.
+-----------+
|UDF(s1, s2)|
+-----------+
|        0 0|
|        1 1|
|        2 2|
|        3 3|
|        4 4|
|        5 5|
|        6 6|
|        7 7|
|        8 8|
|        9 9|
+-----------+
only showing top 10 rows

rawData: org.apache.spark.sql.DataFrame = [s1: string, s2: string]
counter: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(counter), value: 12)
f: org.apache.spark.sql.expressions.UserDefinedFunction

scala> counter.value
res1: Long = 12

如您所见,Spark 限制了要处理的记录数,但并不十分精确。您还应该记住,这些结果取决于版本和查询。

例如,早期的 Spark 版本在对 UDF 调用应用优化时相当有限。此外,上游范围的转换可能会影响此行为并导致处理更多(甚至所有)记录。

【讨论】:

【参考方案2】:

Spark 应用了一种称为“延迟执行”的方法。这意味着它仅在必要时评估操作。所以,它实际上是在你写的两个语句之间做一些事情。执行计划者足够聪明,可以弄清楚什么需要做,什么不需要。要查看更多详细信息,请浏览 localhost:4040(为您正在运行的每个上下文将端口增加 1)。

【讨论】:

感谢您的回答,两位都帮我弄清楚了发生了什么,但只能选择一个作为答案。

以上是关于Spark 是不是仅将我的 UDF 应用于正在显示的记录?的主要内容,如果未能解决你的问题,请参考以下文章

独立运行 UDF 的 Spark 错误

将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException

Spark 根据现有列的映射值创建新列

UI-Bootstrap 仅将模态背景应用于元素

Google Maps是否仅将我的IP用于地理定位?

将 UDF 应用于 Spark Dataframe 中的多个列