Spark SQL 中的 udf

Posted

技术标签:

【中文标题】Spark SQL 中的 udf【英文标题】:udf in Spark SQL 【发布时间】:2015-10-14 04:51:16 【问题描述】:

我有两个数据框:dataDf 和 regexDf。 dataDf 有大量记录,而 regexDf 有两列正则表达式。我的问题是,我需要根据 regexDef 中两列中的两列匹配正则表达式来过滤 dataDf。我想出了这个

dataDf.registerTempTable("dataTable")
sqlContext.udf.register("matchExpressionCombination", matchExpressionCombination _)

val matchingResults = sqlContext.sql("SELECT * FROM dataTable WHERE matchExpressionCombination(col1, col2)")
def matchExpressionCombination(col1Text: String, col2Text: String): Boolean = 
  val regexDf = getRegexDf()
  var isMatch = false
  for(row <- regexDf.collect) 
    if(col1Text.matches(row(0).toString) && col2Text.matches(row(1).toString)) 
      isMatch = true
    
  
  isMatch

当我说

matchingResults.count().println

​我收到以下错误:-

Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[count#118L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#121L])
   Union
    TungstenProject
     Filter UDF(Col1Text#97,Col2Text#109)
      Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2Text#109,UpgradedStacktrace#110,UpgradedTime#111]
    TungstenProject
     Filter UDF(Col1Text#2,Col2Text#14)
      Scan CsvRelation(playWithSpark/data/dataDf2.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#2,BaselineStacktrace#3,BaselineTime#4,ClassId#5,ClassName#6,Id#7,IsDataSilo#8,MethodName#9,Namespace#10,Organization#11,PackageName#12,TestResultKey#13,Col2Text#14,UpgradedStacktrace#15,UpgradedTime#16]


    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
    at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
    at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
    at org.apache.spark.sql.DataFrame.count(DataFrame.scala:1403)
    at com.salesforce.hammer.clusterer.collector.DeduperTest$.applyBaselineAndUpgradeOnlyPatternTest(DeduperTest.scala:83)
    at com.salesforce.hammer.clusterer.collector.Application$.main(Main.scala:53)
    at com.salesforce.hammer.clusterer.collector.Application.main(Main.scala)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenExchange SinglePartition
 TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#121L])
  Union
   TungstenProject
     Filter UDF(Col1Text#97,Col2Text#109)
      Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2Text#109,UpgradedStacktrace#110,UpgradedTime#111]
    TungstenProject
     Filter UDF(Col1Text#2,Col2Text#14)
      Scan CsvRelation(playWithSpark/data/dataDf2.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#2,BaselineStacktrace#3,BaselineTime#4,ClassId#5,ClassName#6,Id#7,IsDataSilo#8,MethodName#9,Namespace#10,Organization#11,PackageName#12,TestResultKey#13,Col2Text#14,UpgradedStacktrace#15,UpgradedTime#16]
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
    at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:141)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
    ... 15 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#121L])
 Union

TungstenProject
     Filter UDF(Col1Text#97,Col2Text#109)
      Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2Text#109,UpgradedStacktrace#110,UpgradedTime#111]
    TungstenProject
     Filter UDF(Col1Text#2,Col22Text#14)
      Scan CsvRelation(playWithSpark/data/dataDf2.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#2,BaselineStacktrace#3,BaselineTime#4,ClassId#5,ClassName#6,Id#7,IsDataSilo#8,MethodName#9,Namespace#10,Organization#11,PackageName#12,TestResultKey#13,Col2Text#14,UpgradedStacktrace#15,UpgradedTime#16]


    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142)
    at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
    ... 23 more
Caused by: org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2021)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
    at org.apache.spark.sql.execution.Filter.doExecute(basicOperators.scala:113)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:184)
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:184)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:309)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:184)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
    ... 31 more
Caused by: java.io.NotSerializableException: com.salesforce.RegexDeduper
Serialization stack:
    - object not serializable (class: com.salesforce.RegexDeduper, value: com.salesforce.RegexDeduper@67b464f4)
    - field (class: com.salesforce.RegexDeduper$$anonfun$applyCol1TextAndCol2TextPattern$1, name: $outer, type: class com.salesforce.RegexDeduper)
    - object (class com.salesforce.RegexDeduper$$anonfun$applyBaselineAndUpgradePattern$1, <function2>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$3, name: func$3, type: interface scala.Function2)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$3, <function1>)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(Col1Text#97,Col2Text#109))
    - field (class: org.apache.spark.sql.execution.Filter, name: condition, type: class org.apache.spark.sql.catalyst.expressions.Expression)
    - object (class org.apache.spark.sql.execution.Filter, Filter UDF(Col1Text#97,Col2Text#109)
 Scan CsvRelation(playWithSpark/data/dataDf1.csv,true,,,",null,#,PERMISSIVE,COMMONS,false,false,null,UTF-8,false)[Col1Text#97,BaselineStacktrace#98,BaselineTime#99,ClassId#100,ClassName#101,Id#102,IsDataSilo#103,MethodName#104,Namespace#105,Organization#106,PackageName#107,TestResultKey#108,Col2TText#109,UpgradedStacktrace#110,UpgradedTime#111]
)
    - field (class: org.apache.spark.sql.execution.Filter$$anonfun$4, name: $outer, type: class org.apache.spark.sql.execution.Filter)
    - object (class org.apache.spark.sql.execution.Filter$$anonfun$4, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)

【问题讨论】:

【参考方案1】:

您不能在 UDF 中使用收集操作,因为所有数据都将传送到该节点,并且收集应仅用于 spark-shell 类型的环境中的实验,而不是在生产环境中 除此之外,您不能使用任何使用 spark 上下文的操作,因为这些操作是在驱动程序上执行的,但是 UDF 代码被发送到 Executor 节点并且 Executor 没有 spark 上下文对象

【讨论】:

【参考方案2】:

将为dataTable 中的每一行调用您的UDF matchExpressionCombination,但它涉及收集RDD (regexDf.collect)。这将导致每行“dataTable”执行一次收集操作,这应该是非常低效的。

您应该加入 RDD,使用 UDF 函数来确定表匹配的位置,或者将正则表达式 RDD 在 UDF 之外收集到本地 val 中,然后在 UDF 中使用该 val。

您的异常显示Caused by: java.io.NotSerializableException: com.salesforce.RegexDeduper,因此您可能应该提供更多关于此类在您的代码中的使用位置的详细信息。

【讨论】:

是的,成功了。 RegexDeduper 需要实现 Serializable,我不得不移除一些我不想序列化的类成员

以上是关于Spark SQL 中的 udf的主要内容,如果未能解决你的问题,请参考以下文章

Scala Spark 中的 udf 运行时错误

Spark SQL下推Cassandra UDF?

SparkSession 中的 udf 和 pyspark.sql.functions 中的 udf 有啥区别

Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF

如何在 Spark SQL 中将额外参数传递给 UDF?

Spark SQL - 非确定性 UDF 的单一评估