Spark:UDF多次执行

Posted

技术标签:

【中文标题】Spark:UDF多次执行【英文标题】:Spark: UDF executed many times 【发布时间】:2019-11-04 15:12:40 【问题描述】:

我有一个包含以下代码的数据框:

def test(lat: Double, lon: Double) = 
  println(s"testing $lat / lon")
  Map("one" -> "one", "two" -> "two")


val testUDF = udf(test _)

df.withColumn("test", testUDF(col("lat"), col("lon")))
  .withColumn("test1", col("test.one"))
  .withColumn("test2", col("test.two"))

现在检查日志,我发现每行 UDF 执行 3 次。如果我从“test.three”列中添加“test3”,则会再次执行 UDF。

谁能解释一下为什么?

这是否可以正确避免(在添加“测试”后不缓存数据帧,即使这样可行)?

【问题讨论】:

什么意思?您正在调用 test 函数三遍。这就是为什么它被执行了三遍。不知道你为什么要把它变成一个 UDF。为什么不直接将 Map 设为 val? 这只是一个展示spark行为的例子。对我来说,“测试”是一个包含结构的新列,然后访问结构的任何部分都不应再次执行 UDF。我怎么错了? 我尝试打印架构,“test”的数据类型是Map,而不是结构。现在,如果 UDF 返回一个类似 Test(one String, two: String) 的案例类,而不是返回 Map,那么 test 确实是一个 Struct,但 UDF 的执行次数总是一样多。 相关:***.com/questions/40320563/… 缓存应该根据这个答案工作:***.com/a/40962714/1138523 【参考方案1】:

如果您想避免多次调用 udf(如果 udf 是您工作中的瓶颈,这尤其有用),您可以按以下方式进行:

val testUDF = udf(test _).asNondeterministic()

基本上你告诉 Spark 你的函数不是确定性的,现在 Spark 确保它只被调用一次,因为多次调用它是不安全的(每次调用可能返回不同的结果)。

还要注意,这个技巧不是免费的,通过这样做,您对优化器施加了一些限制,这样做的一个副作用是,例如 Spark 优化器不会通过不确定的表达式推送过滤器,因此您会变成负责查询中过滤器的最佳位置。

【讨论】:

不错!这个答案也属于这里:***.com/questions/40320563/… 在我的例子中,asNondeterministic 强制 UDF 只执行一次。使用explode(array(myUdf($"id"))) 解决方案,它仍然会执行两次。 @David Vrba 你是什么意思? :因此您需要负责查询中过滤器的最佳位置。 @thebluephantom 如果您的表达式是确定性的,Spark 优化器将通过它们推送过滤器。如果表达式是不确定的(udf.asNondeterministic() 就是这种情况),优化器不会推送它,所以你最好尽快调用过滤器。 你能举个例子吗?请

以上是关于Spark:UDF多次执行的主要内容,如果未能解决你的问题,请参考以下文章

spark 能执行udf 不能执行udaf,啥原因

spark 能执行udf 不能执行udaf,啥原因

尝试从 UDF 执行 spark sql 查询

为 Spark UDF 执行提供上下文

Spark迭代算法UDF在每次迭代中被多次触发

如何在spark中使用transform python udf执行hql脚本?