统计 Spark 中 UDF 的调用次数
Posted
技术标签:
【中文标题】统计 Spark 中 UDF 的调用次数【英文标题】:Count calls of UDF in Spark 【发布时间】:2016-10-29 05:45:39 【问题描述】:使用 Spark 1.6.1 我想调用 UDF 被调用的次数。我想这样做是因为我有一个非常昂贵的 UDF(每次调用约 1 秒)并且 我怀疑 UDF 被调用的频率超过了我的数据框中的记录数,这使得我的 spark 工作比必要的慢 .
虽然我无法重现这种情况,但我想出了一个简单的示例,显示对 UDF 的调用次数似乎与行数不同(这里:更少),这怎么可能?
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.sql.functions.udf
object Demo extends App
val conf = new SparkConf().setMaster("local[4]").setAppName("Demo")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val callCounter = sc.accumulator(0)
val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value")
println(df.count) // gives 10000
val myudf = udf((d:Int) => callCounter.add(1);d)
val res = df.withColumn("result",myudf($"value")).cache
println(res.select($"result").collect().size) // gives 10000
println(callCounter.value) // gives 9941
如果使用累加器不是调用 UDF 计数的正确方法,我还能怎么做?
注意:在我实际的 Spark-Job 中,得到的调用计数大约是实际记录数的 1.7 倍。
【问题讨论】:
我试过你同样的代码,它打印 10000 作为 callcounter,所有println
s 都打印相同的数字,我使用的是 spark 2.0
:当我尝试正确使用本地打印时,当我将我的主人更改为local[*]
而不是本地时,我可以重新生成。当我尝试使用本地 [*] 时,它打印的是 9996 而不是 10000
在这种情况下使用累加器是否是一个已知问题?当我们使用 local[*] 时,为什么它没有正确计算它?
【参考方案1】:
Spark 应用程序应该定义一个 main() 方法而不是扩展 scala.App。 scala.App 的子类可能无法正常工作。
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.sql.functions.udf
object Demo extends App
def main(args: Array[String]): Unit =
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]")
val sc = new SparkContext(conf)
// [...]
这应该可以解决您的问题。
【讨论】:
谢谢,这解决了问题。但是在我的实际应用中,我没有 main 方法(我们使用 spark notebook)。也许我设法在一个小例子中重现了这种行为,然后我会问一个新问题。简而言之,我加入了两个数据框(标准内连接),然后使用 withColumn 调用我的 udf。同一行多次调用 udf 老实说,我不确定 spark-notebooks 是如何工作的。有时间我需要看看它。 这是新问题:***.com/questions/40320563/…以上是关于统计 Spark 中 UDF 的调用次数的主要内容,如果未能解决你的问题,请参考以下文章