统计 Spark 中 UDF 的调用次数

Posted

技术标签:

【中文标题】统计 Spark 中 UDF 的调用次数【英文标题】:Count calls of UDF in Spark 【发布时间】:2016-10-29 05:45:39 【问题描述】:

使用 Spark 1.6.1 我想调用 UDF 被调用的次数。我想这样做是因为我有一个非常昂贵的 UDF(每次调用约 1 秒)并且 我怀疑 UDF 被调用的频率超过了我的数据框中的记录数,这使得我的 spark 工作比必要的慢 .

虽然我无法重现这种情况,但我想出了一个简单的示例,显示对 UDF 的调用次数似乎与行数不同(这里:更少),这怎么可能?

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.sql.functions.udf

object Demo extends App 
  val conf = new SparkConf().setMaster("local[4]").setAppName("Demo")
  val sc = new SparkContext(conf)
  sc.setLogLevel("WARN")
  val sqlContext = new SQLContext(sc)
  import sqlContext.implicits._


  val callCounter = sc.accumulator(0)

  val df= sc.parallelize(1 to 10000,numSlices = 100).toDF("value")

  println(df.count) //  gives 10000

  val myudf = udf((d:Int) => callCounter.add(1);d)

  val res = df.withColumn("result",myudf($"value")).cache

  println(res.select($"result").collect().size) // gives 10000
  println(callCounter.value) // gives 9941


如果使用累加器不是调用 UDF 计数的正确方法,我还能怎么做?

注意:在我实际的 Spark-Job 中,得到的调用计数大约是实际记录数的 1.7 倍。

【问题讨论】:

我试过你同样的代码,它打印 10000 作为 callcounter,所有printlns 都打印相同的数字,我使用的是 spark 2.0 :当我尝试正确使用本地打印时,当我将我的主人更改为local[*] 而不是本地时,我可以重新生成。当我尝试使用本地 [*] 时,它打印的是 9996 而不是 10000 在这种情况下使用累加器是否是一个已知问题?当我们使用 local[*] 时,为什么它没有正确计算它? 【参考方案1】:

Spark 应用程序应该定义一个 main() 方法而不是扩展 scala.App。 scala.App 的子类可能无法正常工作。

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext
import org.apache.spark.sql.functions.udf

object Demo extends App 
    def main(args: Array[String]): Unit = 
         val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]")
         val sc = new SparkContext(conf)
         // [...]
       

这应该可以解决您的问题。

【讨论】:

谢谢,这解决了问题。但是在我的实际应用中,我没有 main 方法(我们使用 spark notebook)。也许我设法在一个小例子中重现了这种行为,然后我会问一个新问题。简而言之,我加入了两个数据框(标准内连接),然后使用 withColumn 调用我的 udf。同一行多次调用 udf 老实说,我不确定 spark-notebooks 是如何工作的。有时间我需要看看它。 这是新问题:***.com/questions/40320563/…

以上是关于统计 Spark 中 UDF 的调用次数的主要内容,如果未能解决你的问题,请参考以下文章

Spark UDF 函数怎么实现参数数量变化?

scala实战之spark用户在线时长和登录次数统计实例

spark 省份次数统计实例

scala实战之spark用户在线时长和登录次数统计实例

Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

用于 UDF 性能统计的 MS SQL DMV - 如何找到前 10 个最差的 UDF