Spark:UDF多次执行
Posted
技术标签:
【中文标题】Spark:UDF多次执行【英文标题】:Spark: UDF executed many times 【发布时间】:2019-11-04 15:12:40 【问题描述】:我有一个包含以下代码的数据框:
def test(lat: Double, lon: Double) =
println(s"testing $lat / lon")
Map("one" -> "one", "two" -> "two")
val testUDF = udf(test _)
df.withColumn("test", testUDF(col("lat"), col("lon")))
.withColumn("test1", col("test.one"))
.withColumn("test2", col("test.two"))
现在检查日志,我发现每行 UDF 执行 3 次。如果我从“test.three”列中添加“test3”,则会再次执行 UDF。
谁能解释一下为什么?
这是否可以正确避免(在添加“测试”后不缓存数据帧,即使这样可行)?
【问题讨论】:
什么意思?您正在调用 test 函数三遍。这就是为什么它被执行了三遍。不知道你为什么要把它变成一个 UDF。为什么不直接将 Map 设为 val? 这只是一个展示spark行为的例子。对我来说,“测试”是一个包含结构的新列,然后访问结构的任何部分都不应再次执行 UDF。我怎么错了? 我尝试打印架构,“test”的数据类型是Map
,而不是结构。现在,如果 UDF 返回一个类似 Test(one String, two: String) 的案例类,而不是返回 Map,那么 test
确实是一个 Struct,但 UDF 的执行次数总是一样多。
相关:***.com/questions/40320563/…
缓存应该根据这个答案工作:***.com/a/40962714/1138523
【参考方案1】:
如果您想避免多次调用 udf(如果 udf 是您工作中的瓶颈,这尤其有用),您可以按以下方式进行:
val testUDF = udf(test _).asNondeterministic()
基本上你告诉 Spark 你的函数不是确定性的,现在 Spark 确保它只被调用一次,因为多次调用它是不安全的(每次调用可能返回不同的结果)。
还要注意,这个技巧不是免费的,通过这样做,您对优化器施加了一些限制,这样做的一个副作用是,例如 Spark 优化器不会通过不确定的表达式推送过滤器,因此您会变成负责查询中过滤器的最佳位置。
【讨论】:
不错!这个答案也属于这里:***.com/questions/40320563/… 在我的例子中,asNondeterministic
强制 UDF 只执行一次。使用explode(array(myUdf($"id")))
解决方案,它仍然会执行两次。
@David Vrba 你是什么意思? :因此您需要负责查询中过滤器的最佳位置。
@thebluephantom 如果您的表达式是确定性的,Spark 优化器将通过它们推送过滤器。如果表达式是不确定的(udf.asNondeterministic() 就是这种情况),优化器不会推送它,所以你最好尽快调用过滤器。
你能举个例子吗?请以上是关于Spark:UDF多次执行的主要内容,如果未能解决你的问题,请参考以下文章