如何将一些额外的字符串参数传递给每行的 spark udf?

Posted

技术标签:

【中文标题】如何将一些额外的字符串参数传递给每行的 spark udf?【英文标题】:How do I pass some extra string arguments to a spark udf for each row? 【发布时间】:2017-11-13 14:45:01 【问题描述】:

我正在使用 Spark 1.5 CDH5.5

我的要求是我需要从数据框中读取每个可选列并检查该列的长度是否大于 maxLimit, 如果它大于 maxLimit,那么我需要在该列上应用一个子字符串

如果 description 的值大于 33 那么我需要应用 substring(0,33) 如果 emr_id 的值大于 34 那么我需要应用 substring(0,34)

为此,我需要将一些额外的字符串参数传递给 spark udf,它会引发以下错误

我为每一列调用一个udf,如下所示

请帮我看看出了什么问题?

 scala> updatedDF.printSchema
root
 |-- data_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- emr_id: string (nullable = true)


val METADATA_ATTRIBUTES_SIZE_SCHEMA =  List(
("data_id","data_id","string","mandatory","40"),
("description","description","string","optional","33"),
("emr_id","emr_id","string","optional","34")
)


 updatedDF = updatedDF.formatMetaData(METADATA_ATTRIBUTES_SIZE_SCHEMA)

我有以下特征,它有很多可以被任何数据框直接调用的方法

trait MetaDataOperations 

val df: DataFrame


 def formatMetaData(schema: List  [(String,String,String,String,String)]) :DataFrame = 
var formattedDF = df
val getFormattedMetaData= udf(getFormattedMetaDataUdf _)
for(column <- formattedDF.columns)
  val schemaList = schema.filter(tuple => tuple._1.equals(column))
  val attributeDataType = schemaList.head._3.toString
  val attributeConformance = schemaList.head._4.toString
  val attributeMaxSize = schemaList.head._5.toString
  formattedDF = formattedDF.withColumn(column, getFormattedMetaData(formattedDF(column),lit(attributeDataType),lit(attributeConformance),lit(attributeMaxSize)))

formattedDF
 



  def getFormattedMetaDataUdf (colValue:String,attributeDataType:String,attributeConformance:String,attributeMaxSize:String): String = 
   var maxLength = 0
   var formattedColValue = attributeConformance match 
   case "optional" => 
    if (attributeDataType == "string") 
      maxLength = attributeMaxSize.toInt
    

    if (colValue.length > maxLength) 
      colValue.substring(0, maxLength) + "..."
    
    else 
      colValue
    
  
  case _ => colValue

formattedColValue



 

我收到以下错误

java.lang.ClassCastException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 cannot be cast to scala.Function4
    at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:107)
    at org.apache.spark.sql.UserDefinedFunction.apply(UserDefinedFunction.scala:50)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:55)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:78)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:80)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:82)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:84)
    at $iwC$$iwC$$iwC.<init>(<console>:86)
    at $iwC$$iwC.<init>(<console>:88)
    at $iwC.<init>(<console>:90)
    at <init>(<console>:92)
    at .<init>(<console>:96)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)

【问题讨论】:

How to pass the parameter to User-Defined Function?的可能重复 @Jacek :我在将它们传递给 udf 之前应用了 lit(),我不知道为什么会出现此错误。 【参考方案1】:

getFormattedMetaDataUdf 不是和 UDF。它是标准的 Scala Function4。你必须使用udf 包装器

 val  getFormattedMetaDataUdf = udf(getFormattedMetaData _)

【讨论】:

以上是关于如何将一些额外的字符串参数传递给每行的 spark udf?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 withColumn 将额外的参数传递给 UDF

如何在 Spark SQL 中将额外参数传递给 UDF?

将额外参数传递给事件处理程序?

将额外参数传递给 usort 回调

PySpark 将 Dataframe 作为额外参数传递给映射

如何在 Javascript .filter() 方法中将额外参数传递给回调函数?