如何将一些额外的字符串参数传递给每行的 spark udf?
Posted
技术标签:
【中文标题】如何将一些额外的字符串参数传递给每行的 spark udf?【英文标题】:How do I pass some extra string arguments to a spark udf for each row? 【发布时间】:2017-11-13 14:45:01 【问题描述】:我正在使用 Spark 1.5 CDH5.5
我的要求是我需要从数据框中读取每个可选列并检查该列的长度是否大于 maxLimit, 如果它大于 maxLimit,那么我需要在该列上应用一个子字符串
如果 description 的值大于 33 那么我需要应用 substring(0,33) 如果 emr_id 的值大于 34 那么我需要应用 substring(0,34)
为此,我需要将一些额外的字符串参数传递给 spark udf,它会引发以下错误
我为每一列调用一个udf,如下所示
请帮我看看出了什么问题?
scala> updatedDF.printSchema
root
|-- data_id: string (nullable = true)
|-- description: string (nullable = true)
|-- emr_id: string (nullable = true)
val METADATA_ATTRIBUTES_SIZE_SCHEMA = List(
("data_id","data_id","string","mandatory","40"),
("description","description","string","optional","33"),
("emr_id","emr_id","string","optional","34")
)
updatedDF = updatedDF.formatMetaData(METADATA_ATTRIBUTES_SIZE_SCHEMA)
我有以下特征,它有很多可以被任何数据框直接调用的方法
trait MetaDataOperations
val df: DataFrame
def formatMetaData(schema: List [(String,String,String,String,String)]) :DataFrame =
var formattedDF = df
val getFormattedMetaData= udf(getFormattedMetaDataUdf _)
for(column <- formattedDF.columns)
val schemaList = schema.filter(tuple => tuple._1.equals(column))
val attributeDataType = schemaList.head._3.toString
val attributeConformance = schemaList.head._4.toString
val attributeMaxSize = schemaList.head._5.toString
formattedDF = formattedDF.withColumn(column, getFormattedMetaData(formattedDF(column),lit(attributeDataType),lit(attributeConformance),lit(attributeMaxSize)))
formattedDF
def getFormattedMetaDataUdf (colValue:String,attributeDataType:String,attributeConformance:String,attributeMaxSize:String): String =
var maxLength = 0
var formattedColValue = attributeConformance match
case "optional" =>
if (attributeDataType == "string")
maxLength = attributeMaxSize.toInt
if (colValue.length > maxLength)
colValue.substring(0, maxLength) + "..."
else
colValue
case _ => colValue
formattedColValue
我收到以下错误
java.lang.ClassCastException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1 cannot be cast to scala.Function4
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:107)
at org.apache.spark.sql.UserDefinedFunction.apply(UserDefinedFunction.scala:50)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:55)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:78)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:80)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:82)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:84)
at $iwC$$iwC$$iwC.<init>(<console>:86)
at $iwC$$iwC.<init>(<console>:88)
at $iwC.<init>(<console>:90)
at <init>(<console>:92)
at .<init>(<console>:96)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
【问题讨论】:
How to pass the parameter to User-Defined Function?的可能重复 @Jacek :我在将它们传递给 udf 之前应用了 lit(),我不知道为什么会出现此错误。 【参考方案1】:getFormattedMetaDataUdf
不是和 UDF。它是标准的 Scala Function4
。你必须使用udf
包装器
val getFormattedMetaDataUdf = udf(getFormattedMetaData _)
【讨论】:
以上是关于如何将一些额外的字符串参数传递给每行的 spark udf?的主要内容,如果未能解决你的问题,请参考以下文章